Commits

Anonymous committed 1ec06f3

[Iom_stream, Iom_layer, Iom_octet_stream] Consolidate [produce] and
[consume] classes into the [transform] class. Swap the meaning of the
[ready/wait] methods with the [block/unblock] methods in the [transform]
state classes.

  • Participants
  • Parent commits 118e4ed

Comments (0)

Files changed (9)

 iom_stream.cmi: iom_gadget.cmi 
 iom_layer.cmi: iom_stream.cmi iom_gadget.cmi 
-iom_octet_stream.cmi: iom_stream.cmi iom_gadget.cmi 
+iom_octet_stream.cmi: iom_stream.cmi iom_layer.cmi iom_gadget.cmi 
 iom_reactor.cmi: iom_stream.cmi iom_octet_stream.cmi iom_gadget.cmi 
 iom_pipe.cmi: iom_stream.cmi iom_reactor.cmi iom_octet_stream.cmi \
     iom_gadget.cmi 
 iom_stream.cmx: iom_gadget.cmx iom_stream.cmi 
 iom_layer.cmo: iom_stream.cmi iom_gadget.cmi iom_layer.cmi 
 iom_layer.cmx: iom_stream.cmx iom_gadget.cmx iom_layer.cmi 
-iom_octet_stream.cmo: iom_stream.cmi iom_gadget.cmi iom_octet_stream.cmi 
-iom_octet_stream.cmx: iom_stream.cmx iom_gadget.cmx iom_octet_stream.cmi 
+iom_octet_stream.cmo: iom_stream.cmi iom_layer.cmi iom_gadget.cmi \
+    iom_octet_stream.cmi 
+iom_octet_stream.cmx: iom_stream.cmx iom_layer.cmx iom_gadget.cmx \
+    iom_octet_stream.cmi 
 iom_reactor.cmo: iom_stream.cmi iom_octet_stream.cmi iom_gadget.cmi \
     iom_reactor.cmi 
 iom_reactor.cmx: iom_stream.cmx iom_octet_stream.cmx iom_gadget.cmx \
 
 ===== Version 0.4 =====
 
+This release is a minor overhaul.  The programming interfaces are basically the
+same, and the implementations are only changed in minor ways, but the code is
+now better organized to make room for new features.  This means that a lot of
+names have changed, but the existing functions remain the same.
+
 Highlights of this release:
 
 + Migrate to using new "Network Extensions" (Nx) library.
++ Consolidated Iom_stream.produce and Iom_stream.consume into
+  Iom_layer.transform and swapped the meaning of block/unblock with ready/wait
+  in the tranformer state and its related process gadgets classes.
 
 ===== Version 0.3 =====
 

File iom/iom_layer.ml

 
 open Cf_cmonad.Op
 
-class ['c, 'n] core jack plug =
-    let jack = (jack :> ('c, 'n) Iom_gadget.jack) in
-    let plug = (plug :> ('c, 'n) Iom_gadget.plug) in
-    let (_, controlTx : ('c, 'n) Iom_gadget.jack) = jack in
-    let (_, notifyTx : ('c, 'n) Iom_gadget.plug) = plug in
-    object(self)
-        inherit ['c, 'n] Iom_stream.notify jack as notify
-        method virtual private stop: 'a. 'a Iom_gadget.t
-        method virtual private fail: 'a. exn -> 'a Iom_gadget.t
-        method virtual private guard: unit Iom_gadget.guard
-        inherit ['c, 'n] Iom_stream.control plug as control
-        inherit Iom_gadget.next
-        inherit Iom_gadget.start
+class virtual ['x, 'y] transform consumeRx produceTx =
+    let consumeRx = (consumeRx :> 'x Iom_gadget.rx) in
+    let produceTx = (produceTx :> 'y Iom_gadget.tx) in
+    object(self:'self)
+        val consume_ = false
+        val produce_ = false
 
-        method private fail x =
-            controlTx#put `Stop >>= fun () ->
-            control#fail x
+        method virtual push: 'x -> 'self Iom_gadget.t
+        method virtual shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
+
+        method consume f =
+            match consume_ with
+            | false ->
+                Cf_cmonad.nil
+            | true ->
+                consumeRx#get begin fun event ->
+                    try
+                        self#push event >>= fun obj ->
+                        f (Cf_exnopt.U obj)
+                    with
+                    | x ->
+                        f (Cf_exnopt.X x)
+                end
         
-        method private notify n =
-            notifyTx#put n >>= fun () ->
-            match n with
-            | #Iom_stream.failed -> self#stop
-            | _ -> self#next
+        method produce =
+            match produce_ with
+            | false ->
+                Cf_cmonad.return (Cf_exnopt.U (Some self))
+            | true ->
+                self#shift >>= function
+                | Cf_exnopt.X x ->
+                    Cf_cmonad.return (Cf_exnopt.X x)
+                | Cf_exnopt.U (s, obj) ->
+                    let z = Cf_seq.map produceTx#put (Cf_seq.of_list s) in
+                    Cf_seq.C.sequence z >>= fun () ->
+                    Cf_cmonad.return (Cf_exnopt.U obj)
         
-        method private control c =
-            control#control c >>= fun () ->
-            controlTx#put c
-        
-        method private guard =
-            control#guard >>= fun () ->
-            notify#guard
+        method unblock: 'self Iom_gadget.t =
+            Cf_cmonad.return {< produce_ = true >}
+        method block: 'self Iom_gadget.t =
+            Cf_cmonad.return {< produce_ = false >}
+                
+        method wait: 'self Iom_gadget.t =
+            Cf_cmonad.return {< consume_ = false >}
+        method ready: 'self Iom_gadget.t =
+            Cf_cmonad.return {< consume_ = true >}
     end
 
 class ['xy, 'c, 'n] simplex transform jack plug =
     object(self:'self)
-        inherit ['c, 'n] core jack plug as super
-        constraint 'xy = ('x, 'y) #Iom_stream.transform
+        inherit ['c, 'n] Iom_stream.process jack plug as super
+        constraint 'xy = ('x, 'y) #transform
 
         val transform_: 'xy = transform
         
             | Cf_exnopt.U None -> self#stop
             | Cf_exnopt.U (Some obj) -> self#loop obj
         
-        method private ready = transform_#ready >>= self#drain
+        method private unblock = transform_#unblock >>= self#drain
+        method private block = transform_#block >>= self#loop
         method private wait = transform_#wait >>= self#loop
-        method private block = transform_#block >>= self#loop
-        method private unblock = transform_#unblock >>= self#loop
+        method private ready = transform_#ready >>= self#loop
         
         method private push = function
             | Cf_exnopt.X x -> self#fail x
     inherit ['xy, 'c, 'n] simplex xy jack plug as super
     
     method private control = function
-        | #Iom_stream.ready -> self#ready
-        | #Iom_stream.wait -> self#wait
+        | #Iom_stream.ready -> self#unblock
+        | #Iom_stream.wait -> self#block
         | c -> super#control c
 end
 
     inherit ['xy, 'c, 'n] simplex xy jack plug as super
     
     method private notify = function
-        | #Iom_stream.ready -> self#ready
-        | #Iom_stream.wait -> self#wait
+        | #Iom_stream.ready -> self#unblock
+        | #Iom_stream.wait -> self#block
         | n -> super#notify n
 end
 
 class ['i, 'o, 'c, 'n] duplex itransform otransform jack plug =
     object(self:'self)
-        inherit ['c, 'n] core jack plug as super
-        constraint 'i = ('ix, 'iy) #Iom_stream.transform
-        constraint 'o = ('oy, 'ox) #Iom_stream.transform
+        inherit ['c, 'n] Iom_stream.process jack plug as super
+        constraint 'i = ('ix, 'iy) #transform
+        constraint 'o = ('oy, 'ox) #transform
         
         val itransform_: 'i option = Some itransform
         val otransform_: 'o option = Some otransform
             | Some obj -> f obj >>= self#idrain
             | None -> assert (not true); self#iloop None
         
+        method private iunblock = self#isignal (fun obj -> obj#unblock)
+        method private iblock = self#isignal (fun obj -> obj#block)
+        method private iwait = self#isignal (fun obj -> obj#wait)
         method private iready = self#isignal (fun obj -> obj#ready)
-        method private iwait = self#isignal (fun obj -> obj#wait)
-        method private iblock = self#isignal (fun obj -> obj#block)
-        method private iunblock = self#isignal (fun obj -> obj#unblock)
         
         method private osignal f =
             match otransform_ with
             | Some obj -> f obj >>= self#odrain
             | None -> assert (not true); self#oloop None
                 
+        method private ounblock = self#osignal (fun obj -> obj#unblock)
+        method private oblock = self#osignal (fun obj -> obj#block)
+        method private owait = self#osignal (fun obj -> obj#wait)
         method private oready = self#osignal (fun obj -> obj#ready)
-        method private owait = self#osignal (fun obj -> obj#wait)
-        method private oblock = self#osignal (fun obj -> obj#block)
-        method private ounblock = self#osignal (fun obj -> obj#unblock)
         
         method private ipush = function
             | Cf_exnopt.U obj -> self#idrain obj
             self#oguard
         
         method private control = function
-            | #Iom_stream.ready -> self#oready
-            | #Iom_stream.wait -> self#owait
+            | #Iom_stream.ready -> self#ounblock
+            | #Iom_stream.wait -> self#oblock
             | c -> super#control c
         
         method private notify = function
-            | #Iom_stream.ready -> self#iready
-            | #Iom_stream.wait -> self#iwait
+            | #Iom_stream.ready -> self#iunblock
+            | #Iom_stream.wait -> self#iblock
             | n -> super#notify n
     end
 
     Iom_gadget.duplex >>= fun (yJack, yPlug) ->
     Iom_gadget.simplex >>= fun (yRx, yTx) ->
     let xRx, (_, xCtrl as xJack) = xJack in
-    let xy = (cons xCtrl xRx yTx :> ('x, 'y) Iom_stream.transform) in
-    xy#unblock >>= fun xy ->
+    let xy = (cons xCtrl xRx yTx :> ('x, 'y) transform) in
+    xy#ready >>= fun xy ->
     let m = new isimplex xy xJack yPlug in
     m#start >>= fun () ->
     xCtrl#put `Ready >>= fun () ->
     Iom_gadget.simplex >>= fun (yRx, yTx) ->
     let xTx, xJack = xJack in
     let _, yNtfy = yPlug in
-    let xy = (cons yNtfy yRx xTx :> ('x, 'y) Iom_stream.transform) in
-    xy#unblock >>= fun xy ->
+    let xy = (cons yNtfy yRx xTx :> ('x, 'y) transform) in
+    xy#ready >>= fun xy ->
     let m = new osimplex xy xJack yPlug in
     m#start >>= fun () ->
     yNtfy#put `Ready >>= fun () ->
     Iom_gadget.duplex >>= fun ((yoRx, yiTx), (yiRx, yoTx)) ->
     let (xiRx, xoTx), (_, xCtrl as xJack) = xJack in
     let _, yNtfy = yPlug in
-    let ixy = (iCons xCtrl xiRx yiTx :> ('xi, 'yi) Iom_stream.transform) in
-    let oxy = (oCons yNtfy yoRx xoTx :> ('yo, 'xo) Iom_stream.transform) in
-    ixy#unblock >>= fun ixy ->
-    oxy#unblock >>= fun oxy ->
+    let ixy = (iCons xCtrl xiRx yiTx :> ('xi, 'yi) transform) in
+    let oxy = (oCons yNtfy yoRx xoTx :> ('yo, 'xo) transform) in
+    ixy#ready >>= fun ixy ->
+    oxy#ready >>= fun oxy ->
     let m = new duplex ixy oxy xJack yPlug in
     m#start >>= fun () ->
     xCtrl#put `Ready >>= fun () ->

File iom/iom_layer.mli

     layer gadgets from initial state objects.
 *)
 
-(** {6 State Classes} *)
+(** {5 Transform Class} *)
+
+(** Use [inherit transform rx tx] to derive a substate object that can consume
+    events of the type carried by [rx] and produce events of the type carried
+    by [tx].
+*)
+class virtual ['x, 'y] transform:
+    'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
+    object('self)
+        val consume_: bool  (** [true] if ready to consume events. *)
+        val produce_: bool  (** [true] if unblocked for producing events. *)
+
+        (** The result of [self#push event] is applied in the [consume] method
+            to obtain the substate object after [event] is processed.
+        *)
+        method virtual push: 'x -> 'self Iom_gadget.t
+
+        (** The result of [self#shift] is evaluated to obtain a list of events
+            to produce and the optional substate to return in the [produce]
+            method.
+        *)
+        method virtual shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
+
+        (** If the stream is ready to consume events, then [self#consume f]
+            guards on the receiver, dispatching received events through
+            [self#push] and applying [f] to the resulting substate object.  If
+            [push] raises an exception, then [f] is applied to the exception
+            instead.  If the stream is not ready to consume events, then the
+            receiver is not guarded.
+        *)
+        method consume:
+            ('self Cf_exnopt.t -> unit Iom_gadget.t) -> unit Iom_gadget.guard
+
+        (** If the stream is unblocked for producing events, then
+            [self#produce] returns an optional substate indicating that further
+            processing is expected, returns [None] to indicate that processing
+            is finished, or returns an exception.  If the stream is not ready
+            for producing events, then the current substate is returned with no
+            other effect.
+        *)
+        method produce: 'self option Cf_exnopt.t Iom_gadget.t
+
+        (** Returns a copy of the substate with [consume_ = false]. *)
+        method wait: 'self Iom_gadget.t
+
+        (** Returns a copy of the substate with [consume_ = true]. *)
+        method ready: 'self Iom_gadget.t
+
+        (** Returns a copy of the substate with [produce_ = true]. *)
+        method unblock: 'self Iom_gadget.t
+
+        (** Returns a copy of the substate with [produce_ = false]. *)
+        method block: 'self Iom_gadget.t
+    end
+
+(** {5 Process Classes} *)
 
 (** The class type describing the common features of both input and output
     simplex stream states.
 *)
 class type ['xy, 'c, 'n] simplex = object('self)
-    inherit ['c, 'n] Iom_stream.notify
-    inherit ['c, 'n] Iom_stream.control
-    inherit Iom_gadget.next
-    inherit Iom_gadget.start
-    constraint 'xy = ('x, 'y) #Iom_stream.transform
+    inherit ['c, 'n] Iom_stream.process
+    constraint 'xy = ('x, 'y) #transform
     
     val transform_: 'xy (** A delegated stream transform object. *)
 
-    (** A delegator for the [block] method on the transform object. *)
-    method private block: unit Iom_gadget.t
+    (** A delegator for the [wait] method on the transform object. *)
+    method private wait: unit Iom_gadget.t
+
+    (** A delegator for the [ready] method on the transform object. *)
+    method private ready: unit Iom_gadget.t
 
     (** A delegator for the [unblock] method on the transform object. *)
     method private unblock: unit Iom_gadget.t
 
-    (** A delegator for the [ready] method on the transform object. *)
-    method private ready: unit Iom_gadget.t
-
-    (** A delegator for the [wait] method on the transform object. *)
-    method private wait: unit Iom_gadget.t
+    (** A delegator for the [block] method on the transform object. *)
+    method private block: unit Iom_gadget.t
 end
 
 (** The class of stream gadget state for input stream commutators.  Control
         inherit Iom_gadget.start
         constraint 'c = [> Iom_stream.flowcontrol ]
         constraint 'n = [> Iom_stream.flownotify ]
-        constraint 'ixy = ('ix, 'iy) #Iom_stream.transform
-        constraint 'oyx = ('oy, 'ox) #Iom_stream.transform
+        constraint 'ixy = ('ix, 'iy) #transform
+        constraint 'oyx = ('oy, 'ox) #transform
         
         (** A delegated input stream transform object. *)
         val itransform_: 'ixy option
         (** A delegated output stream transform object. *)
         val otransform_: 'oyx option
 
-        (** A delegator for [iblock] on the input transform object. *)
-        method private iblock: unit Iom_gadget.t
+        (** A delegator for [iwait] on the input transform object. *)
+        method private iwait: unit Iom_gadget.t
+
+        (** A delegator for [iready] on the input transform object. *)
+        method private iready: unit Iom_gadget.t
 
         (** A delegator for [iunblock] on the input transform object. *)
         method private iunblock: unit Iom_gadget.t
 
-        (** A delegator for [iready] on the input transform object. *)
-        method private iready: unit Iom_gadget.t
+        (** A delegator for [iblock] on the input transform object. *)
+        method private iblock: unit Iom_gadget.t
 
-        (** A delegator for [iwait] on the input transform object. *)
-        method private iwait: unit Iom_gadget.t
+        (** A delegator for [owait] on the output transform object. *)
+        method private owait: unit Iom_gadget.t
 
-        (** A delegator for [oblock] on the output transform object. *)
-        method private oblock: unit Iom_gadget.t
+        (** A delegator for [oready] on the output transform object. *)
+        method private oready: unit Iom_gadget.t
 
         (** A delegator for [ounblock] on the output transform object. *)
         method private ounblock: unit Iom_gadget.t
 
-        (** A delegator for [oready] on the output transform object. *)
-        method private oready: unit Iom_gadget.t
-
-        (** A delegator for [owait] on the output transform object. *)
-        method private owait: unit Iom_gadget.t
+        (** A delegator for [oblock] on the output transform object. *)
+        method private oblock: unit Iom_gadget.t
     end
 
 (** Use [ingest jack cons] to contruct a harness of wires with a new plug and
   ('x, [> Iom_stream.flowcontrol ] as 'c, [> Iom_stream.failed ] as 'n)
     Iom_stream.ijack ->
   ('c Iom_gadget.tx -> 'x Iom_gadget.rx -> 'y Iom_gadget.tx ->
-    ('x, 'y) #Iom_stream.transform) ->
+    ('x, 'y) #transform) ->
   ('y, 'c, 'n) Iom_stream.ijack Iom_gadget.t
 
 (** Use [render jack cons] to contruct a harness of wires with a new plug and
   ('y, [> Iom_stream.stop ] as 'c, [> Iom_stream.flownotify ] as 'n)
     Iom_stream.ojack ->
   ('n Iom_gadget.tx -> 'x Iom_gadget.rx -> 'y Iom_gadget.tx ->
-    ('x, 'y) #Iom_stream.transform) ->
+    ('x, 'y) #transform) ->
   ('x, 'c, 'n) Iom_stream.ojack Iom_gadget.t
 
 (** Use [duplex jack icons ocons] to contruct a harness of wires with a new
     ('xi, 'xo, [> Iom_stream.flowcontrol ] as 'c,
         [> Iom_stream.flownotify ] as 'n) Iom_stream.iojack ->
     ('c Iom_gadget.tx -> 'xi Iom_gadget.rx -> 'yi Iom_gadget.tx ->
-        ('xi, 'yi) #Iom_stream.transform) ->
+        ('xi, 'yi) #transform) ->
     ('n Iom_gadget.tx -> 'yo Iom_gadget.rx -> 'xo Iom_gadget.tx ->
-        ('yo, 'xo) #Iom_stream.transform) ->
+        ('yo, 'xo) #transform) ->
     ('yi, 'yo, 'c, 'n) Iom_stream.iojack Iom_gadget.t
 
 (*--- End of File [ iom_layer.mli ] ---*)

File iom/iom_octet_stream.ml

 
 open Cf_cmonad.Op
 
-class virtual ['x, 'y] transform ~limits waitTx xRx yTx =
-    let waitTx = (waitTx :> 'wait Iom_gadget.tx) in
+class virtual ['x, 'y] transform ~limits flowTx xRx yTx =
+    let flowTx = (flowTx :> 'flow Iom_gadget.tx) in
     object
-        inherit ['x, 'y] Iom_stream.transform xRx yTx
+        inherit ['x, 'y] Iom_layer.transform xRx yTx
         val buffer_: Cf_message.substring Cf_deque.t = Cf_deque.nil
         val mark_ = 0
         val more_ = Iom_stream.More
         val limits_ = limits
         
-        method private push_octets data =
+        method private push_octets more data =
             let length = Cf_message.length data in
             let mark = mark_ + length in
             let buffer = Cf_message.push data buffer_ in
-            if consume_ && not (mark < limits_.high) then begin
-                waitTx#put `Wait >>= fun () ->
+            let consume =
+                match more with Iom_stream.Last -> false | _ -> consume_
+            in
+            if consume && not (mark < limits_.high) then begin
+                flowTx#put `Wait >>= fun () ->
                 Cf_cmonad.return (false, buffer, mark)
             end
             else
         method private shift_octets len =
             let mark = if mark_ - len > 0 then mark_ - len else 0 in
             let buffer = Cf_message.shiftq ~len buffer_ in
-            if not consume_ && mark < limits_.low then begin
-                waitTx#put `Ready >>= fun () ->
+            if more_ = Iom_stream.More && not consume_ && mark < limits_.low
+            then begin
+                flowTx#put `Ready >>= fun () ->
                 Cf_cmonad.return (true, buffer, mark)
             end
             else
 
 let no_limits_emitter_ = { low = 1; high = max_int }
 
-class ['x, 'y] emitter ?limits waitTx xRx yTx =
+class ['x, 'y] emitter ?limits flowTx xRx yTx =
     let limits =
         match limits with
         | None -> no_limits_emitter_
         | Some limits -> normalize_limits limits
     in
     object(self)
-        inherit ['x, 'y] transform ~limits waitTx xRx yTx
+        inherit ['x, 'y] transform ~limits flowTx xRx yTx
         constraint 'y = #fragment
         
         method private fragment: 'x -> Iom_stream.more * Cf_message.t =
-            fun _ ->
-            Iom_stream.Last, []
+            fun _ -> Iom_stream.Last, []
         
         method private emit: 'y list * int = [], 0
         
         method push x =
             let more, data = self#fragment x in
-            self#push_octets data >>= fun (consume, buffer, mark) ->
+            self#push_octets more data >>= fun (consume, buffer, mark) ->
             Cf_cmonad.return {<
                 consume_ = consume; buffer_ = buffer; mark_ = mark;
                 more_ = more;
 
 let no_limits_scanner_ = { low = max_int; high = max_int }
 
-class ['x, 'y] scanner ?limits waitTx xRx yTx =
+class ['x, 'y] scanner ?limits flowTx xRx yTx =
     let limits =
         match limits with
         | None -> no_limits_scanner_
         | Some limits -> normalize_limits limits
     in
     object(self)
-        inherit ['x, 'y] transform ~limits waitTx xRx yTx
+        inherit ['x, 'y] transform ~limits flowTx xRx yTx
         constraint 'x = #fragment
                 
         method private lex:
                     if d < 0 then
                         invalid_arg x_negative_adjust_
                     else
-                        let v = v :: s in
-                        if d = 0 && Lazy.force z = Cf_seq.Z then
-                            Some (List.rev v, n)
+                        let s = v :: s in
+                        if Lazy.force z = Cf_seq.Z then
+                            Some (List.rev s, n)
                         else
-                            loop v (n + d) (Cf_message.shift ~pos:d m) z
+                            loop s (n + d) (Cf_message.shift ~pos:d m) z
                 | Cf_exnopt.U None
                 | Cf_exnopt.X Buffering ->
                     if n > 0 then Some (List.rev s, n) else None
             loop [] 0 m (Cf_message.to_seq ?x m)
         
         method push x =
-            self#push_octets x#data >>= fun (consume, buffer, mark) ->
+            let more = x#more in
+            self#push_octets more x#data >>= fun (consume, buffer, mark) ->
             Cf_cmonad.return {<
                 consume_ = consume; buffer_ = buffer; mark_ = mark;
-                more_ = x#more;
+                more_ = more;
             >}
         
         method shift =
                 Cf_cmonad.return (Cf_exnopt.U (s, obj))
     end
 
+class virtual ['x] buffer ~limits flowTx xRx yTx =
+    object(self)
+        constraint 'x = #fragment
+        inherit ['x, 'x] transform ~limits flowTx xRx yTx
+        
+        method virtual private frame: Iom_stream.more -> Cf_message.t -> 'x
+        
+        method push x =
+            let more = x#more in
+            self#push_octets more x#data >>= fun (consume, buffer, mark) ->
+            Cf_cmonad.return {<
+                consume_ = consume; buffer_ = buffer; mark_ = mark;
+                more_ = more;
+            >}
+        
+        method shift =
+            let rec loop acc buffer mark =
+                if mark < limits.high then begin
+                    match more_ with
+                    | Iom_stream.Last ->
+                        let data = (Cf_message.drain buffer) in
+                        let frag = self#frame more_ data in
+                        Cf_cmonad.return (frag :: acc, None)
+                    | Iom_stream.More when not consume_ && mark < limits.low ->
+                        flowTx#put `Ready >>= fun () ->
+                        Cf_cmonad.return (acc, Some (buffer, mark, true))
+                    | Iom_stream.More ->
+                        Cf_cmonad.return (acc, Some (buffer, mark, consume_))
+                end
+                else begin
+                    let m, buffer = Cf_message.pop ~len:limits.high buffer in
+                    let frag = self#frame Iom_stream.More m in
+                    loop (frag :: acc) buffer (mark - limits.high)
+                end
+            in
+            loop [] buffer_ mark_ >>= fun (s, objOpt) ->
+            match objOpt with
+            | Some (b, m, c) ->
+                let obj = {< buffer_ = b; mark_ = m; consume_ = c; >} in
+                Cf_cmonad.return (Cf_exnopt.U (List.rev s, Some obj))
+            | None ->
+                Cf_cmonad.return (Cf_exnopt.U (List.rev s, None))
+    end
+
+let buffer_aux ~limits flowTx dataRx dataTx = object
+    inherit [fragment] buffer ~limits flowTx dataRx dataTx
+    method private frame = new fragment
+end
+
+let input_buffer ?(limits = no_limits_scanner_) ijack =
+    Iom_layer.ingest ijack (buffer_aux ~limits)
+
+let output_buffer ?(limits = no_limits_emitter_) ojack =
+    Iom_layer.render ojack (buffer_aux ~limits)
+
+let duplex_buffers
+    ?(ilim = no_limits_scanner_) ?(olim = no_limits_emitter_) iojack =
+    Iom_layer.duplex iojack (buffer_aux ~limits:ilim) (buffer_aux ~limits:olim)
+
 (*--- End of File [ iom_octet_stream.ml ] ---*)

File iom/iom_octet_stream.mli

 *)
 val normalize_limits: limits -> limits
 
+(** {4 State Transforms } *)
+
 (** Use [inherit \['x, 'y\] transform ~limits flowTx inRx outTx] to define a
     subclass for transforming streams of ['x] events received at [inRx] into
     streams of ['y] events transmitted at [outTx].  Conventionally, both ['x]
     limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
     object
-        inherit ['x, 'y] Iom_stream.transform
+        inherit ['x, 'y] Iom_layer.transform
 
         (** The pending octet buffer. *)
         val buffer_: Cf_message.substring Cf_deque.t
             pushing the octets in [message].
         *)
         method private push_octets:
-            Cf_message.t ->
+            Iom_stream.more -> Cf_message.t ->
             (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
         
         (** Evaluate the result of [self#shift_octets length] to obtain the new
         method shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
     end
 
+(** {4 Functions} *)
+
+(** Use [input_buffer ?limits ijack] to apply the buffering discipline to the
+    input stream in [jack] according to [limits].
+*)
+val input_buffer:
+    ?limits:limits -> (fragment, 'c, 'n) Iom_stream.ijack ->
+    (fragment, 'c, 'n) Iom_stream.ijack Iom_gadget.t
+
+(** Use [output_buffer ?limits ojack] to apply the buffering discipline to the
+    output stream in [jack] according to [limits].
+*)
+val output_buffer:
+    ?limits:limits -> (fragment, 'c, 'n) Iom_stream.ojack ->
+    (fragment, 'c, 'n) Iom_stream.ojack Iom_gadget.t
+
+(** Use [duplex_buffers ?limits iojack] to apply the buffering discipline to
+    the output stream in [iojack] according to [ilim] and [olim].
+*)
+val duplex_buffers:
+    ?ilim:limits -> ?olim:limits ->
+    (fragment, fragment, 'c, 'n) Iom_stream.iojack ->
+    (fragment, fragment, 'c, 'n) Iom_stream.iojack Iom_gadget.t
+
 (*--- End of File [ iom_octet_stream.mli ] ---*)

File iom/iom_stream.ml

     end >>= fun jack ->
     Cf_cmonad.return (ioJack, jack)
 
-class virtual ['x] consume consumeRx =
-    let consumeRx = (consumeRx :> 'x Iom_gadget.rx) in
-    object(self:'self)
-        val consume_ = false
-
-        method virtual push: 'x -> 'self Iom_gadget.t
-
-        method consume f =
-            match consume_ with
-            | false ->
-                Cf_cmonad.nil
-            | true ->
-                consumeRx#get begin fun event ->
-                    try
-                        self#push event >>= fun obj ->
-                        f (Cf_exnopt.U obj)
-                    with
-                    | x ->
-                        f (Cf_exnopt.X x)
-                end
-                
-        method block: 'self Iom_gadget.t =
-            Cf_cmonad.return {< consume_ = false >}
-        method unblock: 'self Iom_gadget.t =
-            Cf_cmonad.return {< consume_ = true >}
-    end
-
-class virtual ['y] produce produceTx =
-    let produceTx = (produceTx :> 'y Iom_gadget.tx) in
-    object(self:'self)
-        val produce_ = false
-
-        method virtual shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
-        
-        method produce =
-            match produce_ with
-            | false ->
-                Cf_cmonad.return (Cf_exnopt.U (Some self))
-            | true ->
-                self#shift >>= function
-                | Cf_exnopt.X x ->
-                    Cf_cmonad.return (Cf_exnopt.X x)
-                | Cf_exnopt.U (s, obj) ->
-                    let z = Cf_seq.map produceTx#put (Cf_seq.of_list s) in
-                    Cf_seq.C.sequence z >>= fun () ->
-                    Cf_cmonad.return (Cf_exnopt.U obj)
-        
-        method ready: 'self Iom_gadget.t =
-            Cf_cmonad.return {< produce_ = true >}
-        method wait: 'self Iom_gadget.t =
-            Cf_cmonad.return {< produce_ = false >}
-    end
-
-class virtual ['x, 'y] transform consumeRx produceTx =
-    let consumeRx = (consumeRx :> 'x Iom_gadget.rx) in
-    let produceTx = (produceTx :> 'y Iom_gadget.tx) in
-    object
-        inherit ['x] consume consumeRx
-        inherit ['y] produce produceTx
-    end
-
 class virtual ['c, 'n] substate =
     object(self:'self)
         constraint 'self = #Iom_gadget.next
         
         method private fail: 'a. exn -> 'a Iom_gadget.t = fun x ->
             notifyTx#put (`Failed x) >>= fun () ->
-            super#fail x
+            self#stop
         
         method private control = function
             | #stop -> self#stop
         method private guard = notifyRx#get self#notify
     end
 
+class ['c, 'n] engine jack = object
+    inherit ['c, 'n] notify jack
+    inherit Iom_gadget.start
+    inherit Iom_gadget.next
+end
+
+class ['c, 'n] driver plug = object
+    inherit ['c, 'n] control plug
+    inherit Iom_gadget.start
+    inherit Iom_gadget.next
+end
+
+class ['c, 'n] process jack plug =
+    let jack = (jack :> ('c, 'n) Iom_gadget.jack) in
+    let plug = (plug :> ('c, 'n) Iom_gadget.plug) in
+    let (_, controlTx : ('c, 'n) Iom_gadget.jack) = jack in
+    let (_, notifyTx : ('c, 'n) Iom_gadget.plug) = plug in
+    object(self)
+        inherit ['c, 'n] notify jack as notify
+        method virtual private stop: 'a. 'a Iom_gadget.t
+        method virtual private fail: 'a. exn -> 'a Iom_gadget.t
+        method virtual private guard: unit Iom_gadget.guard
+        inherit ['c, 'n] control plug as control
+        inherit Iom_gadget.start
+        inherit Iom_gadget.next
+
+        method private fail x =
+            controlTx#put `Stop >>= fun () ->
+            control#fail x
+        
+        method private notify n =
+            notifyTx#put n >>= fun () ->
+            match n with
+            | #failed -> self#stop
+            | _ -> self#next
+        
+        method private control c =
+            controlTx#put c >>= fun () ->
+            control#control c
+        
+        method private guard =
+            control#guard >>= fun () ->
+            notify#guard
+    end
+
 (*--- End of File [ iom_stream.ml ] ---*)

File iom/iom_stream.mli

   (('i, 'o, 'c, 'n) ioplug -> #Iom_gadget.start) ->
   ('i, 'o, 'c, 'n) iojack Iom_gadget.t
 
-(** {6 State Compositors}
 
-    The virtual classes defined below of useful for combining into concrete
-    classes to serve as the state of a stream gadget.
+(** {6 Gadgets }
+
+    Commonly used gadget classes that operate on the standard stream control
+    and notify events.  The classes below are intended to be used as the basis
+    for a stack of gadgets, with a driver at the bottom, zero or more processes
+    above the driver, and an engine at the top.
 *)
 
-(** The substate of a stream gadget concerned with consuming sequences of
-    events.  Use [inherit consume rx] to derive a state object that can consume
-    events of the type carried by the wire connected to the [rx] receiver.
-*)
-class virtual ['x] consume:
-    (** The receiver for events in the sequence. *)
-    'x #Iom_gadget.rx ->
-    object('self)
-        val consume_: bool  (** [true] if ready for consuming events. *)
-
-        (** The result of [self#push event] is applied in the [consume] method
-            to obtain the substate object after [event] is processed.
-        *)
-        method virtual push: 'x -> 'self Iom_gadget.t
-
-        (** If the stream is ready for consuming events, then [self#consume f]
-            guards on the receiver, dispatching received events through
-            [self#push] and applying [f] to the resulting substate object.  If
-            [push] raises an exception, then [f] is applied to the exception
-            instead.  If the stream is not ready for consuming events, then
-            the receiver is not guarded.
-        *)
-        method consume:
-            ('self Cf_exnopt.t -> unit Iom_gadget.t) -> unit Iom_gadget.guard
-
-        (** Returns a copy of the substate with [consume_ = false]. *)
-        method block: 'self Iom_gadget.t
-
-        (** Returns a copy of the substate with [consume_ = true]. *)
-        method unblock: 'self Iom_gadget.t
-    end
-
-(** The substate of a stream gadget concerned with producing sequences of
-    events.  Use [inherit produce tx] to derive a state object that can produce
-    events of the type carried by the wire connected to the [tx] transmitter.
-*)
-class virtual ['y] produce:
-    'y #Iom_gadget.tx ->
-    object('self)
-        val produce_: bool  (** [true] if ready for producing events. *)
-
-        (** The result of [self#shift] is evaluated to obtain a list of events
-            to produce and the optional substate to return in the [produce]
-            method.
-        *)
-        method virtual shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
-
-        (** If the stream is ready for producing events, then [self#produce]
-            returns an optional substate indicating that further processing is
-            expected, returns [None] to indicate that processing is finished,
-            or returns an exception.  If the stream is not ready for producing
-            events, then the current substate is returned with no other effect.
-        *)
-        method produce: 'self option Cf_exnopt.t Iom_gadget.t
-
-        (** Returns a copy of the substate with [produce_ = true]. *)
-        method ready: 'self Iom_gadget.t
-
-        (** Returns a copy of the substate with [produce_ = false]. *)
-        method wait: 'self Iom_gadget.t
-    end
-
-(** The substate of a stream gadget concerned with both producing and consuming
-    sequences of events.  Use [inherit transform rx tx] to derive a state
-    object that can consume events of the type carried by [rx] and produce
-    events of the type carried by [tx].
-*)
-class virtual ['x, 'y] transform:
-    'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
-    object('self)
-        inherit ['x] consume
-        inherit ['y] produce
-    end
-
 (** The virtual base class of all stream gadget states. *)
 class virtual ['c, 'n] substate: object('self)
     constraint 'self = #Iom_gadget.next
         method private guard: unit Iom_gadget.guard
     end
 
-(** Use [inherit control plug] to derive a stream gadget state that receives
+(** Use [inherit notify jack] to derive a stream gadget state that receives
     events on the notification wire in [jack].
 *)
 class virtual ['c, 'n] notify:
         method private guard: unit Iom_gadget.guard
     end
 
+(** Use [inherit engine jack] to derive a stream gadget that receives events
+    on the notification wire and sends events on the control wire in [jack].
+*)
+class ['c, 'n] engine:
+    ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
+    object
+        inherit ['c, 'n] notify
+        inherit Iom_gadget.start
+        inherit Iom_gadget.next
+    end
+
+(** Use [inherit driver plug] to derive a stream gadget that receives events
+    on the control wire and sends events on the notification wire in [plug].
+*)
+class ['c, 'n] driver:
+    ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
+    object
+        inherit ['c, 'n] control
+        inherit Iom_gadget.start
+        inherit Iom_gadget.next
+    end
+
+(** Use [inherit process jack plug] to derive a stream gadget that processes
+    events received on the control wire in [plug] and forwards them on the
+    control wire in [jack].  Likewise, it processes events received on the
+    notification wire in [jack] and forwards them on the notification wire in
+    [plug].
+*)
+class ['c, 'n] process:
+    ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
+    ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
+    object
+        inherit ['c, 'n] control
+        inherit ['c, 'n] notify
+        inherit Iom_gadget.start
+        inherit Iom_gadget.next
+    end
+
 (*--- End of File [ iom_stream.mli ] ---*)

File iom/t/t_iom.ml

         method private fragment x = Iom_stream.Last, (Cf_message.create x)
 
         method private pop acc len m =
-            let pos = limits_.Iom_octet_stream.high in 
+            let pos = limits0.Iom_octet_stream.high in 
             if len > pos then begin
                 let m1, m2 = Cf_message.split ~pos m in
                 self#pop (m1 :: acc) (len - pos) m2