Commits

Anonymous committed 12a8cb2

Complete rewrite for efficiency and space conservative scheduling. The
internal scheduling kernel now uses completely mutable state, i.e. the flow
object is no longer threaded through every last monad binding.

Comments (0)

Files changed (1)

 let jout = Cf_journal.stdout
 *)
 
+type ('i, 'o) kernel_t = {
+    k_writeQ_: 'o Queue.t;
+    k_workQ_: (('i, 'o) kernel_t -> unit) Queue.t;
+    k_readQ_: ('i -> unit) Queue.t;
+    mutable k_wireN_: int;
+}
+
+type ('x, 'i, 'o) rx0_t = {
+    rx0_txPtr_: ('x, 'i, 'o) tx0_t Weak.t;
+    rx0_pendQ_: 'x Queue.t;
+}
+and ('x, 'i, 'o) tx0_t = {
+    tx0_rxPtr_: ('x, 'i, 'o) rx0_t Weak.t;
+    tx0_gateQ_: ('x, 'i, 'o) guard0_t Queue.t;
+}
+and ('x, 'i, 'o) guard0_t = {
+    x0_txLst_: (Obj.t, 'i, 'o) tx0_t list;
+    x0_getF_: 'x -> ('i, 'o) kernel_t -> unit;
+}
+
+type ('x, 'i, 'o) gate0_t = {
+    y0_rx_: ('x, 'i, 'o) rx0_t;
+    y0_getF_: 'x -> ('i, 'o) kernel_t -> unit;    
+}
+
+type ('x, 'i, 'o) wire_t =
+    (('x, 'i, 'o) rx0_t * ('x, 'i, 'o) tx0_t) option * string Lazy.t
+
+let kernel_ () = {
+    k_writeQ_ = Queue.create ();
+    k_workQ_ = Queue.create ();
+    k_readQ_ = Queue.create ();
+    k_wireN_ = 0;
+}
+
+let rx0_ () = {
+    rx0_txPtr_ = Weak.create 1;
+    rx0_pendQ_ = Queue.create ();
+}
+
+let tx0_ () = {
+    tx0_rxPtr_ = Weak.create 1;
+    tx0_gateQ_ = Queue.create ();
+}
+
+let null = None, (Lazy.lazy_from_val "wire[null]")
+
+let rec scheduler_ k =
+    match
+        try Some (Queue.take k.k_writeQ_) with Queue.Empty -> None
+    with
+    | Some p ->
+        Cf_flow.P (p, lazy (scheduler_ k))
+    | None ->
+        match
+            try Some (Queue.take k.k_workQ_) with Queue.Empty -> None
+        with
+        | Some z ->
+            z k;
+            scheduler_ k
+        | None ->
+            match
+                try Some (Queue.take k.k_readQ_) with Queue.Empty -> None
+            with
+            | Some q ->
+                Cf_flow.Q (fun i -> q i; scheduler_ k)
+            | None ->
+                Cf_flow.Z
+
+type ('s, 'i, 'o) work_t = ('i, 'o) kernel_t -> ('s -> unit) -> 's -> unit
+type ('s, 'i, 'o) gate_t = ('s -> (Obj.t, 'i, 'o) gate0_t) Cf_seq.t
+
+type ('s, 'i, 'o, 'a) guard_t = (('s, 'i, 'o) gate_t, 'a) Cf_cmonad.t
+type ('s, 'i, 'o, 'a) t = (('s, 'i, 'o) work_t, 'a) Cf_cmonad.t
+
+let work_ m s k = m (fun () _ f s -> f s) k (fun _ -> ()) s
+
+let eval m s =
+    let k = kernel_ () in
+    work_ m s k;
+    lazy (scheduler_ k)
+
+let start m s c k =
+    Queue.add (work_ m s) k.k_workQ_;
+    c () k
+
+type ('x, 'i, 'o) match_t =
+    | M_blocked of
+        ('x, 'i, 'o) tx0_t list *
+        (('x, 'i, 'o) tx0_t * ('x -> ('i, 'o) kernel_t -> unit)) list
+    | M_ready of
+        (('i, 'o) kernel_t -> unit)
+
+let rec match1_ txLst gLst z = 
+    match Lazy.force z with
+    | Cf_seq.Z ->
+        M_blocked (txLst, gLst)
+    | Cf_seq.P (y0, z) ->
+        let { y0_rx_ = rx0; y0_getF_ = getF } = y0 in
+        match Weak.get rx0.rx0_txPtr_ 0 with
+        | None ->
+            match1_ txLst gLst z
+        | Some tx0 ->
+            try
+                let obj = Queue.take rx0.rx0_pendQ_ in
+                M_ready (getF obj)
+            with
+            | Queue.Empty ->
+                match1_ (tx0 :: txLst) ((tx0, getF) :: gLst) z
+
+let pushGuard_ txLst (tx0, getF) =
+    Queue.add { x0_txLst_ = txLst; x0_getF_ = getF } tx0.tx0_gateQ_
+
+let guard m _ k _ s =
+    let z = Cf_seq.map (fun g -> g s) (Cf_seq.evalC m) in
+    match match1_ [] [] z with
+    | M_ready rdy -> Queue.add rdy k.k_workQ_
+    | M_blocked (txLst, gLst) -> List.iter (pushGuard_ txLst) gLst
+
+let wire c k =
+    let n = succ k.k_wireN_ in
+    k.k_wireN_ <- n;
+    let id = lazy (Printf.sprintf "%08u" n) in
+    let rx = rx0_ () in
+    let tx = tx0_ () in
+    Weak.set tx.tx0_rxPtr_ 0 (Some rx);
+    Weak.set rx.rx0_txPtr_ 0 (Some tx);
+    c (Some (rx, tx), id) k
+
+class type connector =
+    object
+        method check: bool
+        method id: string
+    end
+
+class ['x, 'i, 'o] rx (w, id : ('x, 'i, 'o) wire_t) =
+    let rx0 =
+        match w with
+        | Some (rx0, _) -> rx0
+        | None -> rx0_ ()
+    in
+    object        
+        val rx0_ = rx0
+        
+        method id = Lazy.force id
+        method check = Weak.check rx0_.rx0_txPtr_ 0
+
+        method get:
+            's. ('x -> ('s, 'i, 'o, unit) t) -> ('s, 'i, 'o, unit) guard_t =
+            fun f ->
+                if Weak.check rx0_.rx0_txPtr_ 0 then begin
+                    Cf_seq.writeC begin fun s ->
+                        let g obj = work_ (f obj) s in
+                        Obj.magic { y0_rx_ = rx0; y0_getF_ = g }
+                    end
+                end
+                else begin
+                    Queue.clear rx0_.rx0_pendQ_;
+                    Cf_cmonad.return ()
+                end
+    end
+
+class ['x, 'i, 'o] tx (w, id : ('x, 'i, 'o) wire_t) =
+    let tx0 =
+        match w with
+        | Some (_, tx0) -> tx0
+        | None -> tx0_ ()
+    in
+    object        
+        val tx0_ = tx0
+
+        method id = Lazy.force id
+        method check = Weak.check tx0_.tx0_rxPtr_ 0
+        
+        method put:
+            's. 'x -> ('s, 'i, 'o, unit) t =
+            fun obj c k f s ->
+                begin
+                    try
+                        let x0 = Queue.take tx0.tx0_gateQ_ in
+                        let x0: (Obj.t, 'i, 'o) guard0_t = Obj.magic x0 in
+                        let obj = Obj.repr obj in
+                        List.iter begin fun tx0 ->
+                            let q = Queue.create () in
+                            Queue.iter begin fun x0' ->
+                                if x0'.x0_txLst_ != x0.x0_txLst_ then
+                                    Queue.add x0' q
+                            end tx0.tx0_gateQ_;
+                            Queue.clear tx0.tx0_gateQ_;
+                            Queue.transfer q tx0.tx0_gateQ_
+                        end x0.x0_txLst_;
+                        Queue.add (x0.x0_getF_ obj) k.k_workQ_
+                    with
+                    | Queue.Empty ->
+                        match Weak.get tx0_.tx0_rxPtr_ 0 with
+                        | None ->
+                            Queue.clear tx0_.tx0_gateQ_
+                        | Some rx0 ->
+                            Queue.add obj rx0.rx0_pendQ_
+                end;
+                Queue.add (fun _ -> c () k f s) k.k_workQ_
+    end
+
+type ('a, 'i, 'o) simplex_t = ('a, 'i, 'o) rx * ('a, 'i, 'o) tx
+
+let fsimplex ~f g1 =
+    let g2 ch =
+        let rx, tx = f ch in
+        let rx = (rx :> ('a, 'i, 'o) rx) in
+        let tx = (tx :> ('a, 'i, 'o) tx) in
+        g1 (rx, tx)
+    in
+    wire g2
+
+let simplex f = wire (fun ch -> f (new rx ch, new tx ch))
+
+type ('a, 'b, 'i, 'o) duplex_t =
+    (('a, 'i, 'o) rx * ('b, 'i, 'o) tx) * (('b, 'i, 'o) rx * ('a, 'i, 'o) tx)
+
+let read c k f s =
+    Queue.add (fun i -> c i k f s) k.k_readQ_
+
+let write x c k f s =
+    Queue.add x k.k_writeQ_;
+    Queue.add (fun _ -> c () k f s) k.k_workQ_
+
+let load c k f s = (c s) k f s
+let store s c k f _ = c () k f s
+let modify g c k f s = c () k f (g s)
+
+open Cf_cmonad.Op
+
+let duplex f =
+    begin
+        simplex >>= fun (rx1, tx1) ->
+        simplex >>= fun (rx2, tx2) ->
+        Cf_cmonad.return ((rx1, tx2), (rx2, tx1))
+    end f
+
+let wrap x y =
+    let x = (x :> ('x, 'i, 'o) rx) in
+    let y = (y :> ('y, 'i, 'o) tx) in
+    let rec loop w =
+        match Lazy.force w with
+        | Cf_flow.Z ->
+            Cf_cmonad.return ()
+        | Cf_flow.P (hd, tl) ->
+            y#put hd >>= fun () ->
+            loop tl
+        | Cf_flow.Q f ->
+            guard (x#get (fun obj -> loop (lazy (f obj))))
+    in
+    fun w ->
+        start (loop w) ()
+
+(*---------------------------------------------------------------------------*
+
 (*
 module type X_tag = sig
     val tag: string
     mutable k_wireN_: int;
 }
 and ('i, 'o) work0_t =
-    ('i, 'o) kernel_t -> (('i, 'o) Cf_flow.t, unit) Cf_cmonad.t
+    ('i, 'o) kernel_t -> ('i, 'o) Cf_flow.t -> ('i, 'o) Cf_flow.t
 and ('x, 'i, 'o) rx0_t = {
     rx0_txPtr_: ('x, 'i, 'o) tx0_t Weak.t;
     rx0_pendQ_: 'x Queue.t;
 
 let null = None, (Lazy.lazy_from_val "wire[null]")
 
-let rec scheduler_ k c =
+let scheduler_ k w =
+    print_char '(';
+    flush stdout;
     match
         try Some (Queue.take k.k_rdyQ_)
         with Queue.Empty -> None
     with
     | Some r ->
-        Lazy.force r k c
+        print_char '-';
+        flush stdout;
+        let w' = Lazy.force r k w in
+        print_string "+)";
+        flush stdout;
+        w'
     | None ->
         try
             let f = Queue.take k.k_inpQ_ in
-            lazy (Cf_flow.Q (fun i -> Lazy.force (f i k c)))
+            print_string "@)\n";
+            flush stdout;
+            lazy (Cf_flow.Q (fun i -> Lazy.force (f i k w)))
         with
         | Queue.Empty ->
+            print_string "*)\n";
+            flush stdout;
             Lazy.lazy_from_val Cf_flow.Z
 
 type ('s, 'i, 'o) work_t = ('s -> ('i, 'o) work0_t) -> 's -> ('i, 'o) work0_t
 
 let eval m s =
     let k = kernel_ () in
-    let f () = Lazy.lazy_from_val Cf_flow.Z in
-    m downX1_ downX2_ s k f
+    m downX1_ downX2_ s k (Lazy.lazy_from_val Cf_flow.Z)
 
-let start m s0 f c1 s1 k =
+let start m s0 f c s1 k w =
     Queue.add (lazy (m downX1_ downX2_ s0)) k.k_rdyQ_;
-    f () c1 s1 k
+    f () c s1 k w
 
-let wire f c s k =
+let wire f c s k w =
     let n = succ k.k_wireN_ in
     k.k_wireN_ <- n;
     let id = lazy (Printf.sprintf "%08u" n) in
     let tx = tx0_ () in
     Weak.set tx.tx0_rxPtr_ 0 (Some rx);
     Weak.set rx.rx0_txPtr_ 0 (Some tx);
-    f (Some (rx, tx), id) c s k
+    f (Some (rx, tx), id) c s k w
 
 type ('x, 'i, 'o) match_t =
     | M_blocked of
             | Queue.Empty ->
                 match1_ (tx0 :: txLst) ((tx0, getF) :: gLst) z
 
-let guard m _ _ s k =
+let guard m _ _ s k w =
     let z = Cf_seq.map (fun g -> g s) (Cf_seq.evalC m) in
-    begin
+    let () =
         match match1_ [] [] z with
         | M_ready rdy ->
             Queue.add rdy k.k_rdyQ_;
                 let g = { x0_txLst_ = txLst; x0_getF_ = getF } in
                 Queue.add g tx0.tx0_gateQ_
             end gLst
-    end;
-    scheduler_ k
+    in
+    scheduler_ k w
 
 class type connector =
     object
         
         method put:
             's. 'x -> ('s, 'i, 'o, unit) t =
-            fun obj f c1 s k c0 ->
+            fun obj f c s k w ->
                 begin
                     try
                         let x0 = Queue.take tx0.tx0_gateQ_ in
                         | Some rx0 ->
                             Queue.add obj rx0.rx0_pendQ_
                 end;
-                Queue.add (lazy (f () c1 s)) k.k_rdyQ_;
-                scheduler_ k c0
+                if Queue.is_empty k.k_rdyQ_ then begin
+                    f () c s k w
+                end
+                else begin
+                    Queue.add (lazy (f () c s)) k.k_rdyQ_;
+                    scheduler_ k w
+                end
     end
 
 type ('a, 'i, 'o) simplex_t = ('a, 'i, 'o) rx * ('a, 'i, 'o) tx
 type ('a, 'b, 'i, 'o) duplex_t =
     (('a, 'i, 'o) rx * ('b, 'i, 'o) tx) * (('b, 'i, 'o) rx * ('a, 'i, 'o) tx)
 
-let read f g s k c =
+let read f g s k w =
     Queue.add (fun i -> f i g s) k.k_inpQ_;
-    scheduler_ k c
+    scheduler_ k w
 
-let write o =
-    let m f = lazy (Cf_flow.P (o, f ())) in
-    fun c c1 s k c0 -> m (fun () -> c () c1 s k c0)
+let write o f c s k w = lazy (Cf_flow.P (o, f () c s k w))
 
 let load p c s = (p s) c s
 let store s f c _ = f () c s
 let modify f p c s = p () c (f s)
 
-let ( >>= ) m f x = m (fun a -> f a x)
-let return x f = f x
+open Cf_cmonad.Op
 
 let duplex f =
     begin
         simplex >>= fun (rx1, tx1) ->
         simplex >>= fun (rx2, tx2) ->
-        return ((rx1, tx2), (rx2, tx1))
+        Cf_cmonad.return ((rx1, tx2), (rx2, tx1))
     end f
 
 let wrap x y =
     let rec loop w =
         match Lazy.force w with
         | Cf_flow.Z ->
-            return ()
+            Cf_cmonad.return ()
         | Cf_flow.P (hd, tl) ->
             y#put hd >>= fun () ->
             loop tl
     in
     fun w ->
         start (loop w) ()
+ *---------------------------------------------------------------------------*)
 
 (*--- End of File [ cf_nflow.ml ] ---*)