Commits

Anonymous committed 04cd411

Submit of cf-0.3 release.

Comments (0)

Files changed (1)

 (*---------------------------------------------------------------------------*
-  IMPLEMENTATION  cf_gadget.ml
+  IMPLEMENTATION  cf_nflow.ml
 
-  Copyright (c) 2003-2004, James H. Woodyatt
+  Copyright (c) 2004, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(*
+(**)
 module type X_tag = sig
     val tag: string
 end
 end
 
 module X = X_create(struct let tag = "Cf_gadget" end)
-*)
+(**)
 
-module Int_map = Cf_rbtree.Map(Cf_ordered.Int_order)
-
-type ('i, 'o) process0_t =
-    k:('i, 'o) kernel_t -> (('i, 'o) Cf_flow.t, unit) Cf_cmonad.t
-and ('i, 'o) gate0_t = {
-    g_pin_: pin_t;
-    g_bind_: (Obj.t -> ('i, 'o) process0_t);
+type ('i, 'o) kernel_t = {
+    k_rdyQ_: ('i, 'o) work0_t Lazy.t Queue.t;
+    k_inpQ_: ('i -> ('i, 'o) work0_t) Queue.t;
+    mutable k_wireN_: int;
 }
-and pin_t = {
-    x_id_: int;
-    x_ptr_: Obj.t Weak.t;
+and ('i, 'o) work0_t =
+    ('i, 'o) kernel_t -> (('i, 'o) Cf_flow.t, unit) Cf_cmonad.t
+and ('x, 'i, 'o) rx0_t = {
+    rx0_txPtr_: ('x, 'i, 'o) tx0_t Weak.t;
+    rx0_pendQ_: 'x Queue.t;
 }
-and ('i, 'o) kernel_t = {
-    mutable k_cnext_: int;
-    k_cfreeq_: int Queue.t;
-    k_cusedq_: (int * Obj.t Weak.t) Queue.t;
-    mutable k_gs_: ('i, 'o) gate0_t Cf_seq.t list;
-    mutable k_mq_: Obj.t Queue.t Int_map.t;
-    k_inq_: ('i -> ('i, 'o) process0_t) Queue.t;
-    k_rdyq_: ('i, 'o) process0_t Lazy.t Queue.t;
+and ('x, 'i, 'o) tx0_t = {
+    tx0_rxPtr_: ('x, 'i, 'o) rx0_t Weak.t;
+    tx0_gateQ_: ('x, 'i, 'o) guard0_t Queue.t;
+}
+and ('x, 'i, 'o) guard0_t = {
+    x0_txLst_: (Obj.t, 'i, 'o) tx0_t list;
+    x0_getF_: 'x -> ('i, 'o) work0_t;
 }
 
-module Pin_order = struct
-    type t = pin_t
-    let compare a b = b.x_id_ - a.x_id_
-end
-
-module Pin_map = Cf_rbtree.Map(Pin_order)
-module Pin_set = Cf_rbtree.Set(Pin_order)
-
-let create_kernel_ () = {
-    k_cnext_ = 0;
-    k_cfreeq_ = Queue.create ();
-    k_cusedq_ = Queue.create ();
-    k_gs_ = [];
-    k_mq_ = Int_map.nil;
-    k_inq_ = Queue.create ();
-    k_rdyq_ = Queue.create ();
+type ('x, 'i, 'o) gate0_t = {
+    y0_rx_: ('x, 'i, 'o) rx0_t;
+    y0_getF_: 'x -> ('i, 'o) work0_t;    
 }
 
-type ('a, 'i, 'o) wire_t = {
-    e_rx_: pin_t;
-    e_tx_: pin_t;
+type ('x, 'i, 'o) wire_t =
+    (('x, 'i, 'o) rx0_t * ('x, 'i, 'o) tx0_t) option * string Lazy.t
+
+let kernel_ () = {
+    k_rdyQ_ = Queue.create ();
+    k_inpQ_ = Queue.create ();
+    k_wireN_ = 0;
 }
 
-let create_pin_ id = { x_id_ = id; x_ptr_ = Weak.create 1 }
-
-let null = {
-    e_rx_ = create_pin_ 0;
-    e_tx_ = create_pin_ 0;
+let rx0_ () = {
+    rx0_txPtr_ = Weak.create 1;
+    rx0_pendQ_ = Queue.create ();
 }
 
-let rec matchGateFin_ ~aq =
+let tx0_ () = {
+    tx0_rxPtr_ = Weak.create 1;
+    tx0_gateQ_ = Queue.create ();
+}
+
+let null = None, (Lazy.lazy_from_val "wire[null]")
+
+let rec scheduler_ k c =
     try
-        Cf_seq.P (Queue.take aq, lazy (matchGateFin_ ~aq))
+        Lazy.force (Queue.take k.k_rdyQ_) k c
     with
     | Queue.Empty ->
-        Cf_seq.Z
+        try
+            let f = Queue.take k.k_inpQ_ in
+            lazy (Cf_flow.Q (fun i -> Lazy.force (f i k c)))
+        with
+        | Queue.Empty ->
+            Lazy.lazy_from_val Cf_flow.Z
 
-let rec matchGate1_ ~k ~aq z =
-    match Lazy.force z with
-    | Cf_seq.Z ->
-        let z = matchGateFin_ ~aq in
-        if z <> Cf_seq.Z then
-            k.k_gs_ <- (Lazy.lazy_from_val z) :: k.k_gs_
-    | Cf_seq.P (hd, tl) ->
-        let id = hd.g_pin_.x_id_ in
-        match
-            try Some (Int_map.search id k.k_mq_) with Not_found -> None
-        with
-        | None ->
-            if Weak.check hd.g_pin_.x_ptr_ 0 then Queue.add hd aq;
-            matchGate1_ ~k ~aq tl
-        | Some q ->
-            match
-                try Some (Queue.take q) with Queue.Empty -> None
-            with
-            | None ->
-                assert (not true)
-            | Some obj ->
-                if Queue.is_empty q then k.k_mq_ <- Int_map.delete id k.k_mq_;
-                Queue.add (lazy (hd.g_bind_ obj)) k.k_rdyq_
-
-let rec matchGate0_ ~k = function
-    | [] ->
-        ()
-    | hd :: tl ->
-        let aq = Queue.create () in
-        matchGate1_ ~k ~aq hd;
-        matchGate0_ ~k tl
-
-let rec scheduler_ ~k c =
-    match
-        try Some (Queue.take k.k_rdyq_) with Queue.Empty -> None
-    with
-    | Some p ->
-        Lazy.force p ~k c
-    | None ->
-        let gs = k.k_gs_ in
-        k.k_gs_ <- [];
-        matchGate0_ ~k gs;
-        if Queue.is_empty k.k_rdyq_ then
-            match
-                try Some (Queue.take k.k_inq_) with Queue.Empty -> None
-            with
-            | Some f ->
-                lazy (Cf_flow.Q (fun i -> Lazy.force (f i ~k c)))
-            | None ->
-                Lazy.lazy_from_val Cf_flow.Z
-        else
-            scheduler_ ~k c
-
-class ['a, 'i, 'o] connector x =
-    object
-        method pin = x
-        method check = Weak.check x.x_ptr_ 0
-        method id = string_of_int x.x_id_
-    end
-
-open Cf_cmonad.Op
-
-type ('s, 'i, 'o) process_t =
-    ('s -> ('i, 'o) process0_t) -> 's -> ('i, 'o) process0_t
-type ('s, 'i, 'o) gate_t = ('s -> ('i, 'o) gate0_t) Cf_seq.t
+type ('s, 'i, 'o) work_t = ('s -> ('i, 'o) work0_t) -> 's -> ('i, 'o) work0_t
+type ('s, 'i, 'o) gate_t = ('s -> (Obj.t, 'i, 'o) gate0_t) Cf_seq.t
 
 type ('s, 'i, 'o, 'a) guard_t = (('s, 'i, 'o) gate_t, 'a) Cf_cmonad.t
-type ('s, 'i, 'o, 'a) t = (('s, 'i, 'o) process_t, 'a) Cf_cmonad.t
+type ('s, 'i, 'o, 'a) t = (('s, 'i, 'o) work_t, 'a) Cf_cmonad.t
 
 let downX1_ () _ _ = scheduler_
 let downX2_ _ = scheduler_
 
-(*
-let down_ m s = m downX1_ downX2_ s
+let eval m s =
+    let k = kernel_ () in
+    let f () = Lazy.lazy_from_val Cf_flow.Z in
+    m downX1_ downX2_ s k f
 
-let liftP0Arg_ p0 p c s = p0 (fun a -> p a c s)
-let liftP0Cont_ p0 p c s = p0 (p c s)
-let liftP0_ p0 _ _ = p0
-*)
+let start m s0 f c1 s1 k =
+    Queue.add (lazy (m downX1_ downX2_ s0)) k.k_rdyQ_;
+    f () c1 s1 k
 
-let eval m s =
-    let k = create_kernel_ () in
-    let f () = Lazy.lazy_from_val Cf_flow.Z in
-    m downX1_ downX2_ s ~k f
+let wire f c s k =
+    let n = succ k.k_wireN_ in
+    k.k_wireN_ <- n;
+    let id = lazy (Printf.sprintf "%08u" n) in
+    let rx = rx0_ () in
+    let tx = tx0_ () in
+    Weak.set tx.tx0_rxPtr_ 0 (Some rx);
+    Weak.set rx.rx0_txPtr_ 0 (Some tx);
+    f (Some (rx, tx), id) c s k
 
-let start m s0 f c1 s1 ~k =
-    Queue.add (lazy (m downX1_ downX2_ s0)) k.k_rdyq_;
-    f () c1 s1 ~k
+type ('x, 'i, 'o) match_t =
+    | M_blocked of
+        ('x, 'i, 'o) tx0_t list *
+        (('x, 'i, 'o) tx0_t * ('x -> ('i, 'o) work0_t)) list
+    | M_ready of
+        ('i, 'o) work0_t Lazy.t
 
-let wire f c s ~k =
-    let id =
-        begin
+let rec match1_ txLst gLst z = 
+    match Lazy.force z with
+    | Cf_seq.Z ->
+        M_blocked (txLst, gLst)
+    | Cf_seq.P (y0, z) ->
+        let { y0_rx_ = rx0; y0_getF_ = getF } = y0 in
+        match Weak.get rx0.rx0_txPtr_ 0 with
+        | None ->
+            match1_ txLst gLst z
+        | Some tx0 ->
             try
-                for i = 1 to 2 do
-                    let id, ptr as pair = Queue.take k.k_cusedq_ in
-                    if Weak.check ptr 0 || Weak.check ptr 1 then
-                        Queue.add pair k.k_cusedq_
-                    else
-                        Queue.add id k.k_cfreeq_;
-                done
+                let obj = Queue.take rx0.rx0_pendQ_ in
+                M_ready (lazy (getF obj))
             with
             | Queue.Empty ->
-                ()
-        end;
-        try
-            Queue.take k.k_cfreeq_
-        with
-        | Queue.Empty ->
-            let id = k.k_cnext_ in
-            k.k_cnext_ <- succ k.k_cnext_;
-            id
+                match1_ (tx0 :: txLst) ((tx0, getF) :: gLst) z
+
+let guard m _ _ s k =
+    let z = Cf_seq.map (fun g -> g s) (Cf_seq.evalC m) in
+    begin
+        match match1_ [] [] z with
+        | M_ready rdy ->
+            Queue.add rdy k.k_rdyQ_;
+        | M_blocked (txLst, gLst) ->
+            List.iter begin fun (tx0, getF) ->
+                Queue.add { x0_txLst_ = txLst; x0_getF_ = getF } tx0.tx0_gateQ_
+            end gLst
+    end;
+    scheduler_ k
+
+class type connector =
+    object
+        method check: bool
+        method id: string
+    end
+
+class ['x, 'i, 'o] rx (w, id : ('x, 'i, 'o) wire_t) =
+    let rx0 =
+        match w with
+        | Some (rx0, _) -> rx0
+        | None -> rx0_ ()
     in
-    let rx = create_pin_ id and tx = create_pin_ id in
-    let rxObj = Some (Obj.repr rx) and txObj = Some (Obj.repr tx) in
-    Weak.set rx.x_ptr_ 0 txObj;
-    Weak.set tx.x_ptr_ 0 rxObj;
-    let ptr = Weak.create 2 in
-    Weak.set ptr 0 rxObj;
-    Weak.set ptr 1 txObj;
-    Queue.add (id, ptr) k.k_cusedq_;
-    let ch = { e_rx_ = rx; e_tx_ = tx } in
-    f ch c s ~k
+    object        
+        val rx0_ = rx0
+        
+        method id = Lazy.force id
+        method check = Weak.check rx0_.rx0_txPtr_ 0
 
-let guard m _ _ s ~k =
-    k.k_gs_ <- (Cf_seq.map (fun g -> g s) (Cf_seq.evalC m)) :: k.k_gs_;
-    scheduler_ ~k
-
-class ['a, 'i, 'o] rx ch =
-    let x = ch.e_rx_ in
-    object
-        inherit ['a, 'i, 'o] connector x
-        
         method get:
-            's. ('a -> ('s, 'i, 'o, unit) t) -> ('s, 'i, 'o, unit) guard_t =
+            's. ('x -> ('s, 'i, 'o, unit) t) -> ('s, 'i, 'o, unit) guard_t =
             fun f ->
-                Cf_seq.writeC begin fun s ->
-                    let g obj = f obj downX1_ downX2_ s in
-                    { g_pin_ = x; g_bind_ = Obj.magic g }
+                if Weak.check rx0_.rx0_txPtr_ 0 then begin
+                    Cf_seq.writeC begin fun s ->
+                        let g obj = f obj downX1_ downX2_ s in
+                        Obj.magic { y0_rx_ = rx0; y0_getF_ = g }
+                    end
+                end
+                else begin
+                    Queue.clear rx0_.rx0_pendQ_;
+                    Cf_cmonad.return ()
                 end
     end
 
-class ['a, 'i, 'o] tx ch =
-    let x = ch.e_tx_ in
-    object
-        inherit ['a, 'i, 'o] connector x
+class ['x, 'i, 'o] tx (w, id : ('x, 'i, 'o) wire_t) =
+    let tx0 =
+        match w with
+        | Some (_, tx0) -> tx0
+        | None -> tx0_ ()
+    in
+    object        
+        val tx0_ = tx0
+
+        method id = Lazy.force id
+        method check = Weak.check tx0_.tx0_rxPtr_ 0
         
         method put:
-            's. 'a -> ('s, 'i, 'o, unit) t =
-            fun obj f c1 s ~k c0 ->
-                if Weak.check x.x_ptr_ 0 then begin
-                    let obj = Obj.repr obj in
-                    try
-                        Queue.add obj (Int_map.search x.x_id_ k.k_mq_)
-                    with
-                    | Not_found ->
-                        let q = Queue.create () in
-                        Queue.add obj q;
-                        k.k_mq_ <- Int_map.replace (x.x_id_, q) k.k_mq_
-                end
-                else begin
-                    k.k_mq_ <- Int_map.delete x.x_id_ k.k_mq_;
+            's. 'x -> ('s, 'i, 'o, unit) t =
+            fun obj f c1 s k c0 ->
+                begin
+                    match Weak.get tx0_.tx0_rxPtr_ 0 with
+                    | None ->
+                        Queue.clear tx0_.tx0_gateQ_
+                    | Some rx0 ->
+                        try
+                            let x0 = Queue.take tx0.tx0_gateQ_ in
+                            let x0: (Obj.t, 'i, 'o) guard0_t = Obj.magic x0 in
+                            let obj = Obj.repr obj in
+                            List.iter begin fun tx0 ->
+                                let q = Queue.create () in
+                                Queue.iter begin fun x0' ->
+                                    if x0'.x0_txLst_ != x0.x0_txLst_ then
+                                        Queue.add x0' q
+                                end tx0.tx0_gateQ_;
+                                Queue.clear tx0.tx0_gateQ_;
+                                Queue.transfer q tx0.tx0_gateQ_
+                            end x0.x0_txLst_;
+                            Queue.add (lazy (x0.x0_getF_ obj)) k.k_rdyQ_;
+                        with
+                        | Queue.Empty ->
+                            Queue.add obj rx0.rx0_pendQ_
                 end;
-                let q = k.k_rdyq_ in
-                if Queue.is_empty q then
-                    f () c1 s ~k c0
-                else begin
-                    Queue.add (lazy (f () c1 s)) k.k_rdyq_;
-                    Lazy.force (Queue.take k.k_rdyq_) ~k c0
-                end
+                Queue.add (lazy (f () c1 s)) k.k_rdyQ_;
+                scheduler_ k c0
     end
 
 type ('a, 'i, 'o) simplex_t = ('a, 'i, 'o) rx * ('a, 'i, 'o) tx
 type ('a, 'b, 'i, 'o) duplex_t =
     (('a, 'i, 'o) rx * ('b, 'i, 'o) tx) * (('b, 'i, 'o) rx * ('a, 'i, 'o) tx)
 
-let read f g s ~k c =
-    Queue.add (fun i -> f i g s) k.k_inq_;
-    scheduler_ ~k c
+let read f g s k c =
+    Queue.add (fun i -> f i g s) k.k_inpQ_;
+    scheduler_ k c
 
 let write o =
     let m f = lazy (Cf_flow.P (o, f ())) in
-    fun c c1 s ~k c0 -> m (fun () -> c () c1 s ~k c0)
+    fun c c1 s k c0 -> m (fun () -> c () c1 s k c0)
 
 let load p c s = (p s) c s
 let store s f c _ = f () c s
     begin
         simplex >>= fun (rx1, tx1) ->
         simplex >>= fun (rx2, tx2) ->
-        Cf_cmonad.return ((rx1, tx2), (rx2, tx1))
+        return ((rx1, tx2), (rx2, tx1))
     end f
 
 let wrap x y =
     let rec loop w =
         match Lazy.force w with
         | Cf_flow.Z ->
-            Cf_cmonad.return ()
+            return ()
         | Cf_flow.P (hd, tl) ->
             y#put hd >>= fun () ->
             loop tl
     fun w ->
         start (loop w) ()
 
-(*--- End of File [ cf_gadget.ml ] ---*)
+(*--- End of File [ cf_nflow.ml ] ---*)