Commits

jhwoodyatt  committed ddcb80b

Checkpoint. Initial commit of 0.3 release, which is a major overhaul.
Minor tweaks (and documentation) still pending.

  • Participants
  • Parent commits 09b6f68

Comments (0)

Files changed (33)

-iom_machine.cmi: iom_gadget.cmi 
-iom_reactor.cmi: iom_machine.cmi iom_gadget.cmi 
-iom_file.cmi: iom_reactor.cmi iom_gadget.cmi 
-iom_socket.cmi: iom_machine.cmi iom_gadget.cmi iom_file.cmi 
-iom_sock_stream.cmi: iom_socket.cmi iom_gadget.cmi iom_file.cmi 
+iom_stream.cmi: iom_gadget.cmi 
+iom_layer.cmi: iom_stream.cmi iom_gadget.cmi 
+iom_octet_stream.cmi: iom_stream.cmi iom_gadget.cmi 
+iom_reactor.cmi: iom_stream.cmi iom_octet_stream.cmi iom_gadget.cmi 
+iom_pipe.cmi: iom_stream.cmi iom_reactor.cmi iom_octet_stream.cmi \
+    iom_gadget.cmi 
+iom_socket.cmi: iom_stream.cmi iom_reactor.cmi iom_gadget.cmi 
+iom_sock_stream.cmi: iom_stream.cmi iom_socket.cmi iom_reactor.cmi \
+    iom_octet_stream.cmi iom_gadget.cmi 
 iom_tcp4_socket.cmi: iom_sock_stream.cmi 
 iom_tcp6_socket.cmi: iom_sock_stream.cmi 
 iom_gadget.cmo: iom_gadget.cmi 
 iom_gadget.cmx: iom_gadget.cmi 
-iom_machine.cmo: iom_gadget.cmi iom_machine.cmi 
-iom_machine.cmx: iom_gadget.cmx iom_machine.cmi 
-iom_reactor.cmo: iom_machine.cmi iom_gadget.cmi iom_reactor.cmi 
-iom_reactor.cmx: iom_machine.cmx iom_gadget.cmx iom_reactor.cmi 
-iom_file.cmo: iom_reactor.cmi iom_gadget.cmi iom_file.cmi 
-iom_file.cmx: iom_reactor.cmx iom_gadget.cmx iom_file.cmi 
-iom_socket.cmo: iom_machine.cmi iom_gadget.cmi iom_file.cmi iom_socket.cmi 
-iom_socket.cmx: iom_machine.cmx iom_gadget.cmx iom_file.cmx iom_socket.cmi 
-iom_sock_stream.cmo: iom_socket.cmi iom_gadget.cmi iom_file.cmi \
-    iom_sock_stream.cmi 
-iom_sock_stream.cmx: iom_socket.cmx iom_gadget.cmx iom_file.cmx \
-    iom_sock_stream.cmi 
+iom_stream.cmo: iom_gadget.cmi iom_stream.cmi 
+iom_stream.cmx: iom_gadget.cmx iom_stream.cmi 
+iom_layer.cmo: iom_stream.cmi iom_gadget.cmi iom_layer.cmi 
+iom_layer.cmx: iom_stream.cmx iom_gadget.cmx iom_layer.cmi 
+iom_octet_stream.cmo: iom_stream.cmi iom_gadget.cmi iom_octet_stream.cmi 
+iom_octet_stream.cmx: iom_stream.cmx iom_gadget.cmx iom_octet_stream.cmi 
+iom_reactor.cmo: iom_stream.cmi iom_octet_stream.cmi iom_gadget.cmi \
+    iom_reactor.cmi 
+iom_reactor.cmx: iom_stream.cmx iom_octet_stream.cmx iom_gadget.cmx \
+    iom_reactor.cmi 
+iom_pipe.cmo: iom_stream.cmi iom_reactor.cmi iom_octet_stream.cmi \
+    iom_pipe.cmi 
+iom_pipe.cmx: iom_stream.cmx iom_reactor.cmx iom_octet_stream.cmx \
+    iom_pipe.cmi 
+iom_socket.cmo: iom_stream.cmi iom_reactor.cmi iom_gadget.cmi iom_socket.cmi 
+iom_socket.cmx: iom_stream.cmx iom_reactor.cmx iom_gadget.cmx iom_socket.cmi 
+iom_sock_stream.cmo: iom_stream.cmi iom_socket.cmi iom_reactor.cmi \
+    iom_octet_stream.cmi iom_gadget.cmi iom_sock_stream.cmi 
+iom_sock_stream.cmx: iom_stream.cmx iom_socket.cmx iom_reactor.cmx \
+    iom_octet_stream.cmx iom_gadget.cmx iom_sock_stream.cmi 
 iom_tcp4_socket.cmo: iom_sock_stream.cmi iom_tcp4_socket.cmi 
 iom_tcp4_socket.cmx: iom_sock_stream.cmx iom_tcp4_socket.cmi 
 iom_tcp6_socket.cmo: iom_sock_stream.cmi iom_tcp6_socket.cmi 
 iom_tcp6_socket.cmx: iom_sock_stream.cmx iom_tcp6_socket.cmi 
-t/t_iom.cmo: iom_tcp6_socket.cmi iom_tcp4_socket.cmi iom_socket.cmi \
-    iom_sock_stream.cmi iom_gadget.cmi iom_file.cmi 
-t/t_iom.cmx: iom_tcp6_socket.cmx iom_tcp4_socket.cmx iom_socket.cmx \
-    iom_sock_stream.cmx iom_gadget.cmx iom_file.cmx 
+t/t_iom.cmo: iom_tcp6_socket.cmi iom_tcp4_socket.cmi iom_stream.cmi \
+    iom_sock_stream.cmi iom_reactor.cmi iom_pipe.cmi iom_octet_stream.cmi \
+    iom_layer.cmi iom_gadget.cmi 
+t/t_iom.cmx: iom_tcp6_socket.cmx iom_tcp4_socket.cmx iom_stream.cmx \
+    iom_sock_stream.cmx iom_reactor.cmx iom_pipe.cmx iom_octet_stream.cmx \
+    iom_layer.cmx iom_gadget.cmx 
 
 ===== Version 0.3 =====
 
-Highlights of refactoring this release:
+This release is yet another major overhaul.  The programming interfaces are
+all new again and coded from scratch-- this time with more extensive, but still
+far from exhaustive, testing and regression.
 
-+ Improvements to simplified file I/O interface.
-+ Remove warnings generated when compiling with OCaml 3.09 series.
-+ Update copyrights and author email address.
+Highlights of the new design:
 
---- [Makefile]
-    Use -warn-error A on debug builds.
-
---- [Iom_file]
-    Add [rd_t] and [wr_t] I/O linkage record types, and use them for the result
-    of the [read] and [write] reactor monads.
-
++ Uses new stateless [Cf_gadget] module (from Cf-0.8) with state classes
+  instead of the old stateful [Cf_gadget] (from Cf-0.7).
++ Conventions established for multiplexing communication between gadgets on
+  multiple wires, i.e. a [jack] is used for sending control and receiving
+  notification, and a [plug] is used for receiving control and sending
+  notification.
++ Support for Unix pipes.
++ Redesigned interface to TCP streams.
++ Generalized support for simplex and duplex octet stream processsing layers,
+  e.g. framing discipline, packet multiplexing, etc.
 
 ===== Version 0.2 =====
 
-  Copyright (c) 2003-2005, James H. Woodyatt
+  Copyright (c) 2003-2006, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without

File iom/Makefile

 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 # OF THE POSSIBILITY OF SUCH DAMAGE. 
 
-DEBUG_OPT = -g -passopt -principal -warn-error A
-REQUIRE = unix cf
+REQUIRE = cf
 PREDICATES =
 
 ###############################################################################
 OCAMLYACC    = ocamlyacc
 OCAMLDOC     = ocamlfind ocamldoc $(OCAMLFINDOPT)
 
-CC_OPT       = -ccopt -fPIC -ccopt -O2 -ccopt -Wall -ccopt -Wno-unused-variable
-CMI_OPT      = $(DEBUG_OPT)
-CMO_OPT      = $(DEBUG_OPT)
-CMX_OPT      = -unsafe -noassert -inline 9
+DEBUG_OPT    = -g
+WARN_ERROR   = -passopt -w -passopt Ae # -warn-error A
+PRINCIPLE    = -principal
+UNSAFE       = -unsafe -noassert
+
+ALL_OCAML_OPT = $(WARN_ERROR) $(PRINCIPLE)
+
+CC_OPT  = -ccopt -fPIC -ccopt -O2 -ccopt -Wall -ccopt -Wno-unused-variable
+CMI_OPT = $(ALL_OCAML_OPT) $(DEBUG_OPT)
+CMO_OPT = $(ALL_OCAML_OPT) $(DEBUG_OPT)
+CMX_OPT = $(ALL_OCAML_OPT) $(UNSAFE) -inline 9
 
 .SUFFIXES: .ml .mli .mll .mly .cmo .cmi .cma .cmx .cmxa
 
 IOM_YACC_MODULES = $(IOM_LEXYACC_MODULES:%=iom_yacc_%)
 IOM_LEX_MODULES = $(IOM_LEXYACC_MODULES:%=iom_lex_%)
 
-clean::
-	rm -rf $(IOM_LEX_ML_FILES)
-	rm -rf $(IOM_YACC_ML_FILES)
-	rm -rf $(IOM_YACC_MLI_FILES)
+#clean::
+#	rm -rf $(IOM_LEX_ML_FILES)
+#	rm -rf $(IOM_YACC_ML_FILES)
+#	rm -rf $(IOM_YACC_MLI_FILES)
 
 iom_lex_%.cmo : iom_yacc_%.cmi iom_lex_%.cmi
 iom_lex_%.cmx : iom_yacc_%.cmi iom_lex_%.cmi
 IOM_YACC_MLI_FILES = $(IOM_YACC_MODULES:%=%.mli)
 
 IOM_MODULES = \
-    gadget machine reactor file socket sock_stream tcp4_socket tcp6_socket
+    gadget stream layer octet_stream reactor pipe socket sock_stream \
+	tcp4_socket tcp6_socket
 
 IOM_PRIMITIVES = 
 
 
 ###############################################################################
 
-TEST_MODULES = iom # perf
+TEST_MODULES = iom # relay
 
 TEST_PROGRAMS = $(TEST_MODULES:%=t.%)
 TEST_OPT_PROGRAMS = $(TEST_MODULES:%=t-opt.%)
 
 opt:: $(TEST_OPT_PROGRAMS)
 
-t.% : t/t_%.ml iom.cma
-	$(OCAMLC) -o $@ $(DEBUG_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cma) $<
+t.% : t/t_%.ml $(TEST_LIBS:%=%.cma)
+	$(OCAMLC) -o $@ $(CMO_OPT) $(DEBUG_OPT) $(TEST_LINKOPT) \
+        $(TEST_LIBS:%=%.cma) $<
 
-t-opt.% : t/t_%.ml iom.cmxa
-	$(OCAMLOPT) -o $@ $(TEST_LINKOPT) $(TEST_LIBS:%=%.cmxa) $<
+t-opt.% : t/t_%.ml $(TEST_LIBS:%=%.cmxa)
+	$(OCAMLOPT) -o $@ $(CMX_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cmxa) $<
 
 test:: $(TEST_PROGRAMS)
 	@for i in $(TEST_PROGRAMS); do echo; echo $$i; ./$$i; done
 
 default:: ocamltop
 
-ocamltop: iom.cma
-	$(OCAMLMKTOP) $(TEST_LINKOPT) -o $@ iom.cma
+ocamltop: $(TEST_LIBS:%=%.cma)
+	$(OCAMLMKTOP) -o $@ $(CMO_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cma)
 
 clean::
 	rm -f ocamltop

File iom/iom_file.ml

-(*---------------------------------------------------------------------------*
-  IMPLEMENTATION  iom_file.ml
-
-  Copyright (c) 2005-2006, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-open Cf_cmonad.Op
-
-type signal_t = [ `Error of exn ]
-type control_t = [ `Close | `Unlink ]
-type ready_t = [ `Ready | `Wait ]
-
-class virtual ['control, 'signal, 'state] core ?c ~s ~rwx ~fd k =
-    let sigTx = (s :> 'signal Iom_gadget.tx) in
-    object(self:'self)
-        inherit ['control, 'state] Iom_reactor.core ?c k as super
-        inherit ['state] Cf_poll.file rwx fd
-        constraint 'control = [> control_t ]
-        constraint 'signal = [> signal_t ]
-        
-        method private error: exn -> ('self, unit) Iom_gadget.t = fun x ->
-            Unix.close fd;
-            self#unload;
-            sigTx#put (`Error x)
-        
-        method private close =
-            Unix.close fd;
-            self#unload;
-            Cf_cmonad.return ()
-        
-        method private unlink =
-            self#unload;
-            Cf_cmonad.return ()
-        
-        method private control = function
-            | `Close -> self#close
-            | `Unlink -> self#unlink
-            | _ -> assert (not true); self#next
-
-        method private again = function
-            | Unix.EAGAIN 
-            | Unix.EWOULDBLOCK ->
-                true
-            | _ ->
-                false
-        
-        method start =
-            try
-                Unix.set_nonblock fd;
-                super#start
-            with
-            | x ->
-                Iom_gadget.start (self#error x) self
-    end
-
-type more_t = More | Last
-class event more = object method more: more_t = more end
-
-type reader_control_t = [ control_t | ready_t ]
-
-class virtual ['control, 'signal, 'event] reader_core ?c ~s ~tx:outTx ~fd k =
-    let outTx = (outTx :> 'event Iom_gadget.tx) in
-    object(self:'self)
-        inherit ['control, 'signal, 'state] core ?c ~s ~rwx:`R ~fd k as super
-        constraint 'control = [> reader_control_t ]
-        constraint 'event = #event
-        
-        method private virtual read: 'event option        
-
-        method private render: 'event -> ('self, unit) Iom_gadget.t = fun e ->
-            outTx#put e
-
-        method private service p =
-            try
-                match self#read with
-                | None ->
-                    `Loaded p
-                | Some e ->
-                    let more = e#more in
-                    let out = Iom_gadget.start (self#render e) self in
-                    match more with
-                    | Last -> `Final out
-                    | More -> `Working (p, out)
-            with
-            | Unix.Unix_error (code, _, _) when self#again code ->
-                `Loaded p
-            | x ->
-                `Final (Iom_gadget.start (self#error x) self)
-        
-        method private control = function
-            | `Wait ->
-                self#unload;
-                self#next
-            | `Ready ->
-                Iom_gadget.poll self k;
-                self#next
-            | c ->
-                super#control c
-    end
-
-type writer_signal_t = [ signal_t | ready_t | `Final ]
-
-class virtual ['control, 'signal, 'event] writer_core
-  ?c ~s:sigTx ~rx:inRx ~fd k =
-    let inRx = (inRx :> 'event Iom_gadget.rx) in
-    let sigTx = (sigTx :> 'signal Iom_gadget.tx) in
-    object(self:'self)
-        inherit ['control, 'signal, 'state] core
-            ?c ~s:sigTx ~rwx:`W ~fd k as super
-        constraint 'signal = [> writer_signal_t ]
-        constraint 'event = #event
-        
-        val mutable pending_ = None
-
-        method private virtual write: 'event -> 'event option
-
-        method private final =
-            Unix.close fd;
-            sigTx#put `Final
-
-        method private ready = sigTx#put `Ready
-        method private wait = sigTx#put `Wait
-
-        method private service p =
-            match pending_ with
-            | None ->
-                assert (not true);
-                `Final (Iom_gadget.start self#next self)
-            | Some e ->
-                try
-                    let more = e#more in
-                    match self#write e with
-                    | Some _ ->
-                        `Loaded p
-                    | None ->
-                        pending_ <- None;
-                        let m =
-                            match more with
-                            | Last ->
-                                self#final
-                            | More ->
-                                self#ready >>= fun () ->
-                                self#next
-                        in
-                        `Final (Iom_gadget.start m self)
-                with
-                | Unix.Unix_error (code, _, _) when self#again code ->
-                    `Loaded p
-                | x ->
-                    `Final (Iom_gadget.start (self#error x) self)
-        
-        method private blocking_ more =
-            assert (pending_ <> None);
-            Iom_gadget.poll self k;
-            match more with
-            | More -> self#wait
-            | Last -> Cf_cmonad.return ()
-
-        method private ingest e =
-            pending_ <- Some e;
-            try
-                let more = e#more in
-                pending_ <- self#write e;
-                match pending_ with
-                | Some ctrl ->
-                    self#blocking_ more
-                | None ->
-                    if more <> Last then self#next else self#final
-            with
-            | Unix.Unix_error (code, _, _) when self#again code ->
-                self#blocking_ e#more
-            | x ->
-                pending_ <- None;
-                self#unload;
-                self#error x
-
-        method private guard =
-			super#guard >>= fun () ->
-            match pending_ with
-            | None -> inRx#get self#ingest
-            | Some _ -> Cf_cmonad.return ()
-    end
-
-class octets more msg =
-    object(_:#event)
-        method more = more
-        method octets = (msg : Cf_message.t)
-    end
-
-type read_result_t = R_octets of int | R_again
-
-class reader ~e ?(d = 1) ?c ~s ~tx ~fd k =
-    object(self)
-        inherit [reader_control_t, signal_t, octets] reader_core
-            ?c ~s ~tx ~fd k as super
-        
-        val mutable buffer_ = ""
-        val mutable cursor_ = 0
-        
-        method private unload_ p =
-            buffer_ <- "";
-            cursor_ <- 0;
-            super#unload_ p
-        
-        method private load_ p =
-            buffer_ <- String.create e;
-            cursor_ <- 0;
-            super#load_ p
-        
-        method private octets n =
-            let n = Unix.read fd buffer_ cursor_ n in
-            if n = 0 then Unix.close fd;
-            n
-        
-        method private pending acc limit =
-            let n0 = e - cursor_ in
-            let n0 = min n0 limit in
-            let n0 =
-                if n0 < d then begin
-                    buffer_ <- String.create e;
-                    cursor_ <- 0;
-                    e
-                end
-                else
-                    n0
-            in
-            assert (cursor_ >= 0);
-            assert (cursor_ + n0 <= String.length buffer_);
-            assert (n0 > 0);
-            match
-                try R_octets (self#octets n0) with
-                Unix.Unix_error (code, _, _) when self#again code -> R_again
-            with
-            | R_again when acc = [] ->
-                None
-            | R_again ->
-                Some (new octets More (List.rev acc))
-            | R_octets 0 ->
-                Some (new octets Last [])
-            | R_octets n ->
-                let pos = cursor_ in
-                let acc = (buffer_, pos, n) :: acc in
-                cursor_ <- cursor_ + n;
-                if n = n0 && n0 < limit then
-                    self#pending acc (limit - n0)
-                else
-                    let more = if n > 0 then More else Last in
-                    Some (new octets more (List.rev acc))
-                        
-        method private read = self#pending [] e
-        
-        initializer
-            if d < 1 then invalid_arg "Iom_file.reader: d < 1";
-            if e < d then invalid_arg "Iom_file.reader: e < d"
-    end
-
-class writer ?c ~s ~rx ~fd k =
-    object(self)
-        inherit [control_t, writer_signal_t, octets] writer_core
-            ?c ~s ~rx ~fd k as super
-                
-        method private octets buf pos len = Unix.write fd buf pos len
-        
-        method private pending more = function
-            | [] ->
-                None
-            | (buf, pos, len) :: tl ->
-                assert (pos >= 0);
-                assert (pos + len <= String.length buf);
-                assert (len > 0);
-                let n = self#octets buf pos len in
-                if n < len then
-                    Some ((buf, pos + n, len - n) :: tl)
-                else
-                    self#pending more tl
-        
-        method private write p =
-            let more = p#more in
-            match self#pending more p#octets with
-            | Some m ->
-                Some (new octets more m)
-            | None ->
-                None
-    end
-
-type rd_t = {
-	rd_ctrlTx: reader_control_t Iom_gadget.tx;	(** Control events *)
-	rd_sigRx: signal_t Iom_gadget.rx;			(** Signal events *)
-	rd_dataRx: octets Iom_gadget.rx;			(** Data to write *)
-}
-
-let read ~e ?d ~fd k =
-	Iom_gadget.simplex >>= fun (ctrlRx, ctrlTx) ->
-    Iom_gadget.simplex >>= fun (sigRx, sigTx) ->
-    Iom_gadget.simplex >>= fun (dataRx, dataTx) ->
-    let r = new reader ~e ?d ~c:ctrlRx ~s:sigTx ~tx:dataTx ~fd k in
-    r#start >>= fun () ->
-    Cf_cmonad.return {
-		rd_ctrlTx = ctrlTx;
-		rd_sigRx = sigRx;
-		rd_dataRx = dataRx;
-	}
-
-type wr_t = {
-	wr_ctrlTx: control_t Iom_gadget.tx;
-	wr_sigRx: writer_signal_t Iom_gadget.rx;
-	wr_dataTx: octets Iom_gadget.tx;
-}
-
-let write ~fd k =
-	Iom_gadget.simplex >>= fun (ctrlRx, ctrlTx) ->
-    Iom_gadget.simplex >>= fun (sigRx, sigTx) ->
-    Iom_gadget.simplex >>= fun (dataRx, dataTx) ->
-    let w = new writer ~c:ctrlRx ~s:sigTx ~rx:dataRx ~fd k in
-    w#start >>= fun () ->
-    Cf_cmonad.return {
-		wr_ctrlTx = ctrlTx;
-		wr_sigRx = sigRx;
-		wr_dataTx = dataTx;
-	}
-
-(*--- End of File [ iom_file.ml ] ---*)

File iom/iom_file.mli

-(*---------------------------------------------------------------------------*
-  INTERFACE  iom_file.mli
-
-  Copyright (c) 2005-2006, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-(** Reactors for reading/writing on file descriptors. *)
-
-(** {6 Overview}
-
-    This module implements base classes and convenience functions for
-    asynchronously reading and/or writing streams of octets on I/O channels
-    represented by {!Unix.file_descr} objects, e.g. pipes, sockets and tty
-    devices.
-    
-    A reader will read octets from a file descriptor as they become
-    available while the reader is enabled and sends them as events on a wire.
-    A writer will receive events on a wire and write their octets to a file
-    descriptor while the device remains ready for writing.
-    
-    Flow control and other signals are sent and received on separate wires from
-    the data flow wires.
-*)
-
-(** {6 Core} *)
-
-(** An event sent by file reactors to indicate their progress. *)
-type signal_t = [
-    `Error of exn   (** All processing stopped by a system error. *)
-]
-
-(** An event received by file reactors to control their operation. *)
-type control_t = [
-    | `Unlink       (** Immediately stop all processing. *)
-    | `Close        (** Immediately stop all processing and close file. *)
-]
-
-(** A flow control event. *)
-type ready_t = [
-    | `Ready        (** Ready to read/write octets. *)
-    | `Wait         (** Wait before read/write octets. *)
-]
-
-(** Methods and fields in common to reader and writer reactors.  Use [inherit
-    ['control, 'signal, 'state] core ?c ~s ~rwx ~fd k] to derive a subclass
-    to be a reactor in the polling mux kernel [k] for the file descriptor [fd]
-    on the file I/O event [rwx].  The optional receiver [?c] and the
-    transmitter [s] are where ['control] and ['signal] event wires are
-    connected to the reactor.  Subclasses may extend the types of control and
-    signal event types if they implement methods for handling additional
-    variants.
-*)
-class virtual ['control, 'signal, 'state] core:
-    ?c:'control #Iom_gadget.rx -> s:'signal #Iom_gadget.tx ->
-    rwx:[< Cf_poll.rwx_t ] -> fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    object('self)
-        (** Subclass of primitive I/O reactors. *)
-        inherit ['control, 'state] Iom_reactor.core
-        (** Subclass of core foundation file event multiplexer. *)
-        inherit ['state] Cf_poll.file
-        constraint 'control = [> control_t ]
-        constraint 'signal = [> signal_t ]
-        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.file_state_t ]
-
-        (** Use [self#error x] to close the file descriptor [fd], unload the
-            event from the polling mux, and send [`Error x] on the signal
-            transmitter [s].  (Note: this method does not invoke [self#close]
-            to do this.)
-        *)
-        method private error: exn -> ('self, unit) Iom_gadget.t
-        
-        (** Use [self#close] to close the file descriptor [fd] and unload the
-            event from the polling mux.  (Note: this method is invoked when a
-            [`Close] control event is received.)
-        *)
-        method private close: ('self, unit) Iom_gadget.t
-        
-        (** Use [self#unload] to unload the event from the polling mux.  (Note:
-            this method is invoked when a [`Unlink] control event is received.)
-        *)
-        method private unlink: ('self, unit) Iom_gadget.t
-        
-        (** This method is invoked in [self#guard] when an event is received
-            on the optional control channel [?c].  It dispatches [`Close] and
-            [`Unlink] events by invoking either [self#close] or [self#unlink]
-            respectively.  All other events are silently ignored.
-        *)
-        method private control: 'control -> ('self, unit) Iom_gadget.t
-        
-        (** This convenience method returns [true] if the {!Unix.error} code
-            specified is [Unix.EAGAIN] or [Unix.EWOULDBLOCK].
-        *)
-        method private again: Unix.error -> bool
-    end
-
-(** Indication of whether more events are expected. *)
-type more_t = More | Last
-
-(** Core event class.  Subclasses add methods for additional components of a
-    file event.  See below.
-*)
-class event:
-    more_t ->
-    object('self)
-        (** Indicate whether additional events are expected. *)
-        method more: more_t
-    end
-
-(** A control event for reader reactors. *)
-type reader_control_t = [ control_t | ready_t ]
-
-(** Core reader class.  Use [inherit ['control, 'signal, 'event] reader_core
-    ?c ~s ~tx ~fd k] to derive a subclass for reading file events in the
-    polling loop [k] for the file descriptor [fd] and sending them on the
-    wire [tx].  Notification and control events are received and sent,
-    respectively, on the wires [s] and [?c].
-*)
-class virtual ['control, 'signal, 'event] reader_core:
-    ?c:'control #Iom_gadget.rx -> s:'signal #Iom_gadget.tx ->
-    tx:'event #Iom_gadget.tx ->  fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    object('self)
-        (** Subclass of the core file reactor. *)
-        inherit ['control, 'signal, 'state] core
-        constraint 'control = [> reader_control_t ]
-        constraint 'event = #event
-        
-        (** The [service] method invokes [self#read] when the file descriptor
-            is ready for reading.  If no event is returned, then the event
-            remains loaded in the mux, otherwise the event is processed by
-            invoking [self#render e] with it.  If an exception [x] is raised in
-            the [read] method, then it will be caught and processed by invoking
-            [self#error x].
-        *)
-        method private virtual read: 'event option        
-
-        (** The [service] method invokes [self#render e] with an event returned
-            by the [read] method.  This sends the event on the wire [tx], and
-            exits the process if the event indicates that no more events are
-            expected.  Otherwise, [self#next] is invoked.
-        *)
-        method private render: 'event -> ('self, unit) Iom_gadget.t
-        
-        (** The [service] method required by {!Cf_poll.file}. *)
-        method private service: Cf_poll.t -> 'state
-    end
-
-(** A notification event for writer reactors. *)
-type writer_signal_t = [ signal_t | ready_t | `Final ]
-
-(** Core writer class.  Use [inherit ['control, 'signal, 'event] writer_core
-    ?c ~s ~rx ~fd k] to derive a subclass for writing file events received
-    from the wire [rx] in the polling loop [k] for the file descriptor [fd].
-    Notification and control events are received and sent, respectively, on the
-    wires [s] and [?c].
-*)
-class virtual ['control, 'signal, 'event] writer_core:
-    ?c:'control #Iom_gadget.rx -> s:'signal #Iom_gadget.tx ->
-    rx:'event #Iom_gadget.rx ->  fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    object('self)
-        (** Subclass of the core file reactor. *)
-        inherit ['control, 'signal, 'state] core
-        constraint 'signal = [> writer_signal_t ]
-        constraint 'event = #event
-        
-        (** Storage for pending event, waiting for file to become writable. *)
-        val mutable pending_: 'event option
-
-        (** This method is invoked, depending on the context, in either the
-            [ingest] method or the [service] method, to write the event [e] to
-            the file descriptor [fd].  If an exception [x] is raised, then it
-            will be caught and [self#error x] will be invoked before the
-            reactor exits.  If the operation cannot be completed without
-            blocking, then return [Some e] to indicate a pending event.
-        *)
-        method private virtual write: 'event -> 'event option
-
-        (** Use [self#final] to closed the file descriptor [fd], and send
-            [`Final] on the wire [s].  (Note: this is invoked when an event
-            is written and no further events are expected.)
-        *)
-        method private final: ('self, unit) Iom_gadget.t
-        
-        (** Use [self#ready] to send [`Ready] on the wire [s]. *)
-        method private ready: ('self, unit) Iom_gadget.t
-        
-        (** Use [self#wait] to send [`Wait] on the wire [s].  This method is
-            invoked in the [ingest] method when a pending event is returned
-            from the [write] method.
-        *)
-        method private wait: ('self, unit) Iom_gadget.t
-
-        (** This method is invoked by the [guard] method when an event is
-            received on the wire [rx].  It invokes [self#write e] with the
-            event.  If a pending event is returned, then it invokes [self#wait]
-            and loads the reactor into the kernel [k], i.e. to schedule a call
-            to the [service] method when the file descriptor is ready for
-            writing.
-        *)
-        method private ingest: 'event -> ('self, unit) Iom_gadget.t
-        
-        (** The [service] method required by {!Cf_poll.file}. *)
-        method private service: Cf_poll.t -> 'state
-    end
-
-(** {6 Classes}
-
-    The classes described in this section implement asynchronous reading and
-    writing on unidirection file descriptors, e.g. pipes and simple files.
-*)
-
-(** An event associated with a read or write of zero or more octets. *)
-class octets:
-    more_t -> Cf_message.t ->
-    object
-        (** Subclass of core event. *)
-        inherit event
-        
-        (** Returns the octets to read or write. *)
-        method octets: Cf_message.t
-    end
-
-(** Use [new reader ~e ?d ?c ~s ~tx ~fd k] to create a state machine for
-    asynchronously reading octets from the file descriptor [fd] in the kernel
-    [k] for presentation on the transmit wire [tx].  Control and notification
-    events are received and transmitted, respectively, on the [?c] and [s]
-    wires.  No more than [e] octets will be read from the file descriptor in a
-    single operation, and multiple operations may be necessary to read all
-    pending octets.  At least [d] octets will be available in the buffer
-    whenever a read is attempted.  (The default value of [d] is [1].)
-*)
-class reader:
-    e:int -> ?d:int -> ?c:reader_control_t #Iom_gadget.rx ->
-    s:signal_t #Iom_gadget.tx -> tx:octets #Iom_gadget.tx ->
-    fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    object
-        (** A subclass of the core reader class. *)
-        inherit [reader_control_t, signal_t, octets] reader_core
-
-        (** The read buffer. *)
-        val mutable buffer_: string
-        
-        (** The offset into the read buffer for the next read operation. *)
-        val mutable cursor_: int
-        
-        (** The [read] method required by the {!reader_core} class.  Invokes
-            [self#pending [] e] to read the whole event from the file
-            descriptor [fd].
-        *)
-        method private read: octets option
-        
-        (** Invoked by the [pending] method to read octets.  If the end of file
-            is reached, then the file descriptor is closed.
-        *)
-        method private octets: int -> int
-        
-        (** A recursive method invoked by the [read] method.  It progressively
-            reads substrings by invoking [cursor_ <- self#octets cursor_] and
-            stacking the received substrings in the specifed message and
-            invoking itself again if there are still more octets to read.
-        *)
-        method private pending: Cf_message.t -> int -> octets option
-    end
-
-(** Use [new writer ?c ~s ~rx ~fd k] to create a state machine for
-    asynchronously writing octets to the file descriptor [fd] in the kernel
-    [k] received on the wire [rx].  Control and notification events are
-    received and transmitted, respectively, on the [?c] and [s] wires.
-*)
-class writer:
-    ?c:control_t #Iom_gadget.rx -> s:writer_signal_t #Iom_gadget.tx ->
-    rx:octets #Iom_gadget.rx -> fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    object
-        (** A subclass of the core writer reactor. *)
-        inherit [control_t, writer_signal_t, octets] writer_core
-        
-        (** The [write] method required by the {!writer_core} class.  Invokes
-            [self#pending e#more e#octets] to write the event to the file
-            descriptor [fd].
-        *)
-        method private write: octets -> octets option
-        
-        (** Invoked by the [pending] method to write octets. *)
-        method private octets: string -> int -> int -> int
-        
-        (** A recursive method invoked by the [write] method.  It progressively
-            writes substrings by invoking [self#octets buf pos len] as long
-            as there are substrings remaining in the message to be written.  If
-            a message cannot be completel written, then the unsent portion
-            portion is returned.
-        *)
-        method private pending: more_t -> Cf_message.t -> Cf_message.t option
-    end
-
-(** {6 Simplified Functional Interface}
-
-    Use the functions described in this section for convenient access to the
-    base class reactors.
-*)
-
-(** Linkage produced by the [read] reactor monad. *)
-type rd_t = {
-	rd_ctrlTx: reader_control_t Iom_gadget.tx;	(** Control events *)
-	rd_sigRx: signal_t Iom_gadget.rx;			(** Signal events *)
-	rd_dataRx: octets Iom_gadget.rx;			(** Data to write *)
-}
-
-(** Use [read ~e ?d ~fd k >>= fun rd -> ...] to create the necessary wires and
-	start reading from file descriptor [fd] in the context of kernel [k].
-    
-	No more than [e] octets will be read from the file descriptor in a single
-	operation, and multiple operations may be necessary to read all pending
-	octets.  At least [d] octets will be available in the buffer whenever a
-	read is attempted.  (The default value of [d] is [1].)
-	
-	Notifications from the reader may be received on [rd.rd_sigRx], control
-	events may be sent on [rd.rd_ctrlTx] and octet data are received on
-	[rd.rd_datatRx].
-*)
-val read:
-    e:int -> ?d:int -> fd:Unix.file_descr -> Iom_gadget.kernel_t ->
-    ('s, rd_t) Iom_gadget.t
-
-(** Linkage produced by the [write] reactor monad. *)
-type wr_t = {
-	wr_ctrlTx: control_t Iom_gadget.tx;			(** Control events *)
-	wr_sigRx: writer_signal_t Iom_gadget.rx;	(** Signal events *)
-	wr_dataTx: octets Iom_gadget.tx;			(** Data to write *)
-}
-
-(** Use [write ~fd k >>= fun wr -> ...] to create the necessary wires and start
-	writing to file descriptor [fd] in the context of kernel [k].
-    
-    Notifications from the writer may be received on [wr.wr_sigRx], control
-	events may be sent on [wr.wr_ctrlTx] and octet data are transmitted on
-	[wr.wr_datatTx].
-*)
-val write: fd:Unix.file_descr -> Iom_gadget.kernel_t -> ('s, wr_t) Iom_gadget.t
-
-(*--- End of File [ iom_file.mli ] ---*)

File iom/iom_gadget.ml

 (*---------------------------------------------------------------------------*
   IMPLEMENTATION  iom_gadget.ml
 
-  Copyright (c) 2002-2005, James H. Woodyatt
+  Copyright (c) 2002-2006, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(*
-let xprintf fmt =
-    print_string "[Iom_gadget] ";
-    Printf.printf fmt 
-*)
-
-type input_t = Cf_poll.more_t and output_t = unit
-type 's work_t = ('s, input_t, output_t) Cf_gadget.work_t
-type 's gate_t = ('s, input_t, output_t) Cf_gadget.gate_t
-type ('s, 'a) guard_t = ('s, input_t, output_t, 'a) Cf_gadget.guard_t
-type ('s, 'a) t = ('s, input_t, output_t, 'a) Cf_gadget.t
+type input = Cf_poll.more and output = unit
+type work = (input, output) Cf_gadget.work
+type gates = (input, output) Cf_gadget.gates
+type 'a guard = (input, output, 'a) Cf_gadget.guard
+type 'a t = (input, output, 'a) Cf_gadget.t
 
 let start = Cf_gadget.start
 let guard = Cf_gadget.guard
+let abort = Cf_gadget.abort
 
-type 'a wire_t = ('a, input_t, output_t) Cf_gadget.wire_t
+type 'a wire = ('a, input, output) Cf_gadget.wire
 class type connector = Cf_gadget.connector
-class ['a] rx e = ['a, input_t, output_t] Cf_gadget.rx e
-class ['a] tx e = ['a, input_t, output_t] Cf_gadget.tx e
-type ('x, 'y) duplex_t = ('x rx * 'y tx) * ('y rx * 'x tx)
+class ['a] rx e = ['a, input, output] Cf_gadget.rx e
+class ['a] tx e = ['a, input, output] Cf_gadget.tx e
+
+type ('x, 'y) plug = 'x rx * 'y tx
+type ('x, 'y) jack = 'y rx * 'x tx
 
 let wire = Cf_gadget.wire
+let wirepair = Cf_gadget.wirepair
 let null = Cf_gadget.null
 
-let fsimplex = Cf_gadget.fsimplex
 let simplex = Cf_gadget.simplex
 let duplex = Cf_gadget.duplex
 
-let load = Cf_gadget.load
-let store = Cf_gadget.store
-let modify = Cf_gadget.modify
+let wrap = Cf_gadget.wrap
 
-let wrap = Cf_gadget.wrap
+class virtual next = [input, output] Cf_gadget.next
+class virtual start = [input, output] Cf_gadget.start
+
+let create = Cf_gadget.create
+let createM = Cf_gadget.createM
 
 open Cf_cmonad.Op
 
-type queue_t = <get: (unit, unit) t> Queue.t
-type kernel_t = { k_poll_: Cf_poll.t; k_queue_: queue_t }
+type queue = <get: unit t> Queue.t
+type kernel = { k_poll_: Cf_poll.t; k_queue_: queue }
 
 let poll e k =
-    let e = (e :> [> (unit, unit) t Cf_poll.state_t ] Cf_poll.event) in
+    let e = (e :> unit t Cf_poll.event) in
     e#load ~q:k.k_queue_ k.k_poll_
 
 let run =
     let rec ingest1_ q =
         if Queue.is_empty q then
-            Cf_cmonad.return ()
+            Cf_cmonad.nil 
         else
             let e = Queue.take q in
             e#get >>= fun () ->
             ingest1_ q >>= fun () ->
             ingest0_ q
         | Cf_poll.Last ->
-            Cf_cmonad.return ()
+            Cf_cmonad.nil 
     in
     let rec runloop_ p w =
         match Lazy.force w with
         | Cf_flow.P ((), tl) -> assert (not true); runloop_ p tl
         | Cf_flow.Q f -> runloop_ p (lazy (f (Cf_poll.cycle p)))
     in
-    fun f s0 ->
+    fun f ->
         let p = Cf_poll.create () and q = Queue.create () in
         let k = { k_poll_ = p; k_queue_ = q; } in
-        let m = f k >>= Cf_gadget.start (ingest0_ k.k_queue_) in
-        let w = Cf_gadget.eval m s0 in
+        let m = f k >>= fun () -> Cf_gadget.start (ingest0_ k.k_queue_) in
+        let w = Cf_gadget.eval m in
         let save = Unix.sigprocmask Unix.SIG_SETMASK [] in
         runloop_ p w;
         ignore (Unix.sigprocmask Unix.SIG_SETMASK save)

File iom/iom_gadget.mli

 (*---------------------------------------------------------------------------*
   INTERFACE  iom_gadget.mli
 
-  Copyright (c) 2002-2005, James H. Woodyatt
+  Copyright (c) 2002-2006, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
     An I/O gadget monad is a continuation monad initialized with an abstract
     unit of work for the scheduler.  Monadic functions are defined for the
     following basic operations:
-    - starting a new I/O gadget process;
+    - starting a new I/O gadget;
     - creating a new wire (with or without endpoints);
-    - loading, storing and modifying a process state;
     - sending an event on a wire;
-    - guarding a process on the reception of events from a sequence of wires;
+    - guarding a gadget on the reception of events from a sequence of wires;
     - scheduling work to be performed when an I/O event occurs.
 
 *)
 
 (** {6 Types} *)
 
-(** The abstract type of a unit of work in the scheduler. *)
-type 's work_t
+(** An functionally compositional unit of scheduling work in a gadget. *)
+type work
 
-(** The abstract type of a process block. *)
-type 's gate_t
+(** A sequence of gates produced by the [guard] monad for receiving events in
+    a gadget using the [guard] function.
+*)
+type gates
 
-(** The abstract type of an event wire. *)
-type 'a wire_t
+(** A wire capable of carrying events. *)
+type 'x wire
 
 (** A continuation monad for composing logical sequences of scheduling work
-    within a specific process.  The scheduler may interleave units of work from
-    different processes in any order.
+    within a specific gadget.  The scheduler may interleave units of work from
+    different gadgets in any order.
 *)
-type ('s, 'a) t = ('s work_t, 'a) Cf_cmonad.t
+type 'a t = (work, 'a) Cf_cmonad.t
 
-(** A continuation monad for composing sequences of wires to block a process
-    when waiting for an event.
+(** A continuation monad for composing [gates], i.e. sequences of wires to
+    block a gadget when waiting for an event.
 *)
-type ('s, 'a) guard_t = ('s gate_t, 'a) Cf_cmonad.t
+type 'a guard = (gates, 'a) Cf_cmonad.t
 
 (** {6 Functions and Classes} *)
 
-(** Use [start g s] to start a new process on the gadget [g] with the initial
-    state [s].
+(** Use [start m] to start a new gadget [m]. *)
+val start: unit t -> unit t
+
+(** Use [guard g] to evaluate the guard continuation monad [g] to block the
+    current gadget until an event is received on a wires in the sequence used
+    in the composition of the continuation.  If none of the wires in the guard
+    are still connected to other gadgets, then the scheduler terminates all
+    further processing of the gadget.
+    
+    Control will never pass to any continuation bound to the result.  Instead,
+    control will pass to the continuation of the first gate in the guard to
+    receive a message.
 *)
-val start: ('s0, unit) t -> 's0 -> ('s1, unit) t
+val guard: unit guard -> 'a t
 
-(** Use [guard z] to evaluate the guard continuation monad [z] to block the
-    current process until an event is received on a wires in the sequence used
-    in the composition of the continuation.  If none of the wires in the guard
-    are still connected to processes, then the process terminates.
+(** Use [abort] to abort the current work unit and return to the scheduler.
+    Control will not be passed to any continuation bound to the result.
 *)
-val guard: ('s, unit) guard_t -> ('s, unit) t
+val abort: 'a t
 
-(** Use [wire >>= f] to get a new wire from the scheduler and apply it to the
-    bound function [f].
-*)
-val wire: ('s, 'a wire_t) t
+(** Use [wire] to return a new wire from the scheduler. *)
+val wire: 'a wire t
+
+(** Use [wirepair] to return a pair of new wires from the scheduler. *)
+val wirepair: ('a wire * 'b wire) t
 
 (** The null wire.  Events sent on the null wire are discarded immediately, and
     guards on the null wire never receive any events.
 *)
-val null: 'a wire_t
+val null: 'a wire
 
 (** The class type of wire connectors.  Both receivers and transmitters are
     subtypes of this class type.
         (** For a connector [c], [c#check] returns [false] if the other end of
             the wire has been collected by the garbage collector.  If [c#check]
             returns [true], then the other end of the wire may still be
-            connected to a process.
+            connected to a gadget.
         *)
         method check: bool
         
-        (** This methods returns a string that may be used to distinguish the
+        (** This methods returns a string that may be used with physical
+            equality comparisons, i.e. [(==)] and [(!=)] to distinguish the
             underlying wire from other wires in the same scheduler.
-            
-            {b Warning}: {i in long running processes, the identifier number
-            used to construct this string may overflow its representation.  In
-            this event, no attempt is made to keep from assigning the same
-            identifier to more than one wire simultaneously.}
         *)
         method id: string
     end
     construct a receive connector for the wire [w].
 *)
 class ['a] rx:
-    'a wire_t ->
+    'a wire ->
     object
         inherit connector
         
             binding operator [( >>= )] to produce a guard for a sequence of
             wires.
         *)
-        method get: 's. ('a -> ('s, unit) t) -> ('s, unit) guard_t
+        method get: ('a -> unit t) -> unit guard
     end
 
 (** The class of connectors used for sending events.  Use [new tx w] to
     construct a transmit connector for the wire [w].
 *)
 class ['a] tx:
-    'a wire_t ->
+    'a wire ->
     object
         inherit connector
         
         (** Send the given event on the underlying wire. *)
-        method put: 's. 'a -> ('s, unit) t
+        method put: 'a -> unit t
     end
 
-(** Use [fsimplex ~f >>= g] to get a new wire from the scheduler and apply it
-    to the function [f] to get a receive and transmit connector pair to apply
-    to the bound function [g].
+(** Use [simplex] to obtain a new wire from the scheduler and use it to return
+    a receive and transmit connector pair.
 *)
-val fsimplex: f:('a wire_t -> 'a #rx * 'a #tx) -> ('s, 'a rx * 'a tx) t
+val simplex: ('a rx * 'a tx) t
 
-(** Use [simplex >>= f] to get a new wire from the scheduler, construct a
-    receive and transmit connector pair, and apply it to the bound function
-    [f].
+(** A pair of convenience types for representing each end of a bundle of two
+    wires used for duplex communication.  By convention, a plug comprises a
+    receiver for control events and a transmitter for notification events, and
+    a jack comprises the transmitter for control events and the receiver for
+    notification events
 *)
-val simplex: ('s, 'a rx * 'a tx) t
-
-(** A convenience type combining two wires into a pair of [rx] and [tx]
-    tuples, one tuple for each end of a duplex communication.  Each tuple is
-    composed of the [rx] and the [tx] objects to use on one end of the
-    communication.
-*)
-type ('x, 'y) duplex_t = ('x rx * 'y tx) * ('y rx * 'x tx)
+type ('x, 'y) plug = 'x rx * 'y tx
+type ('x, 'y) jack = 'y rx * 'x tx
 
 (** Use [duplex >>= f] to get a pair of new wires from the scheduler, construct
     a {i pair} of receive and transmit connector pairs, one for each wire, and
     apply it to the bound function [f].
 *)
-val duplex: ('s, ('x, 'y) duplex_t) t
+val duplex: (('x, 'y) jack * ('x, 'y) plug) t
 
-(** Use [load >>= f] to load the internal state of the current process and
-    apply it to the bound function [f].
-*)
-val load: ('s, 's) t
-
-(** Use [store s] to store the value [s] as the new internal state of the
-    current process.
-*)
-val store: 's -> ('s, unit) t
-
-(** Use [modify f] to load the internal state of the current process, apply it
-    to the function f to obtain a new state, and store the new state in the
-    current process.
-*)
-val modify: ('s -> 's) -> ('s, unit) t
-
-(** Use [wrap a b w] to start a new process that receives events from the
+(** Use [wrap a b w] to start a new gadget that receives events from the
     receive connector [a], sends event to the transmit connector [b], and
     processes the stream of input and output events with the flow [w].
 *)
-val wrap: 'i #rx -> 'o #tx -> ('i, 'o) Cf_flow.t -> ('s, unit) t
+val wrap: 'i #rx -> 'o #tx -> ('i, 'o) Cf_flow.t -> unit t
+
+(** Use [inherit next] to derive a class that implements an intermediate state
+    in a machine.
+*)
+class virtual next:
+    object('self)        
+        (** The guard evaluated by this state of the machine. *)
+        method virtual private guard: unit guard
+
+        (** Use [obj#next] to transition the state of the machine by applying
+            {!Cf_state_gadget.guard} [self#guard].
+        *)
+        method next: 'a. 'a t
+    end
+
+(** Use [inherit start] to derive a class to represent the initial state of a
+    machine.  It's [start] method initiates the machine with the virtual
+    private [guard] method.
+*)
+class virtual start:
+    object('self)
+        (** The first guard evaluationed by the machine after starting. *)
+        method virtual private guard: unit guard
+        
+        (** Starts a new gadget. Defined as [start (guard self#guard)]. *)
+        method start: unit t
+    end
+
+(** Use [create f] to create a duplex channel, and apply [f] to the resulting
+    plug to obtain the initial state of a machine.  The machine is started and
+    the corresponding jack is returned.
+*)
+val create: (('c, 'n) plug -> #start) -> ('c, 'n) jack t
+
+(** Use [createM f] to create a duplex channel, and apply [f] to the resulting
+    plug to obtain a continuation monad that evaluates to the initial state of
+    a machine.  The machine is started and the corresponding jack is returned.
+*)
+val createM: (('c, 'n) plug -> #start t) -> ('c, 'n) jack t
 
 (** The abstract type of the system I/O event multiplexer. *)
-type kernel_t
+type kernel
 
 (** Use [poll e k] to load the I/O event into the polling mux and event queue
     comprising the kernel [k].
 *)
-val poll:
-    [> (unit, unit) t Cf_poll.state_t ] #Cf_poll.event -> kernel_t -> unit
+val poll: unit t #Cf_poll.event -> kernel -> unit
 
 (** Use [run f s] to construct a new I/O event kernel, apply it to the function
     [f] to obtain a top-level process for the event loop, and start the process
     running with the initial state [s].
 *)
-val run: (kernel_t -> ('s, unit) t) -> 's -> unit
+val run: (kernel -> unit t) -> unit
 
 (*--- End of File [ iom_gadget.mli ] ---*)

File iom/iom_layer.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  iom_layer.ml
+
+  Copyright (c) 2006, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(*
+let jout = Cf_journal.stdout
+*)
+
+open Cf_cmonad.Op
+
+class ['c, 'n] core jack plug =
+    let jack = (jack :> ('c, 'n) Iom_gadget.jack) in
+    let plug = (plug :> ('c, 'n) Iom_gadget.plug) in
+    let (_, controlTx : ('c, 'n) Iom_gadget.jack) = jack in
+    let (_, notifyTx : ('c, 'n) Iom_gadget.plug) = plug in
+    object(self)
+        inherit ['c, 'n] Iom_stream.notify jack as notify
+        method virtual private stop: 'a. 'a Iom_gadget.t
+        method virtual private fail: 'a. exn -> 'a Iom_gadget.t
+        method virtual private guard: unit Iom_gadget.guard
+        inherit ['c, 'n] Iom_stream.control plug as control
+        inherit Iom_gadget.next
+        inherit Iom_gadget.start
+
+        method private fail x =
+            controlTx#put `Stop >>= fun () ->
+            control#fail x
+        
+        method private notify n =
+            notifyTx#put n >>= fun () ->
+            match n with
+            | #Iom_stream.failed -> self#stop
+            | _ -> self#next
+        
+        method private control c =
+            control#control c >>= fun () ->
+            controlTx#put c
+        
+        method private guard =
+            control#guard >>= fun () ->
+            notify#guard
+    end
+
+class ['xy, 'c, 'n] simplex transform jack plug =
+    object(self:'self)
+        inherit ['c, 'n] core jack plug as super
+        constraint 'xy = ('x, 'y) #Iom_stream.transform
+
+        val transform_: 'xy = transform
+        
+        method private loop: 'xy -> unit Iom_gadget.t = fun obj ->
+            {< transform_ = obj >}#next
+        
+        method private drain: 'xy -> unit Iom_gadget.t = fun obj ->
+            obj#produce >>= function
+            | Cf_exnopt.X x -> self#fail x
+            | Cf_exnopt.U None -> self#stop
+            | Cf_exnopt.U (Some obj) -> self#loop obj
+        
+        method private ready = transform_#ready >>= self#drain
+        method private wait = transform_#wait >>= self#loop
+        method private block = transform_#block >>= self#loop
+        method private unblock = transform_#unblock >>= self#loop
+        
+        method private push = function
+            | Cf_exnopt.X x -> self#fail x
+            | Cf_exnopt.U obj -> self#drain obj
+    
+        method private guard =
+            super#guard >>= fun () ->
+            transform_#consume self#push
+    end
+
+class ['xy, 'c, 'n] isimplex xy jack plug = object(self)
+    inherit ['xy, 'c, 'n] simplex xy jack plug as super
+    
+    method private control = function
+        | #Iom_stream.ready -> self#ready
+        | #Iom_stream.wait -> self#wait
+        | c -> super#control c
+end
+
+class ['xy, 'c, 'n] osimplex xy jack plug = object(self)
+    inherit ['xy, 'c, 'n] simplex xy jack plug as super
+    
+    method private notify = function
+        | #Iom_stream.ready -> self#ready
+        | #Iom_stream.wait -> self#wait
+        | n -> super#notify n
+end
+
+class ['i, 'o, 'c, 'n] duplex itransform otransform jack plug =
+    object(self:'self)
+        inherit ['c, 'n] core jack plug as super
+        constraint 'i = ('ix, 'iy) #Iom_stream.transform
+        constraint 'o = ('oy, 'ox) #Iom_stream.transform
+        
+        val itransform_: 'i option = Some itransform
+        val otransform_: 'o option = Some otransform
+        
+        method private iloop: 'i option -> unit Iom_gadget.t = fun objOpt ->
+            match otransform_, objOpt with
+            | None, None -> self#stop
+            | _, _ -> {< itransform_ = objOpt >}#next
+        
+        method private idrain: 'i -> unit Iom_gadget.t = fun obj ->
+            obj#produce >>= function
+            | Cf_exnopt.X x -> self#fail x
+            | Cf_exnopt.U objOpt -> self#iloop objOpt
+
+        method private oloop: 'o option -> unit Iom_gadget.t = fun objOpt ->
+            match itransform_, objOpt with
+            | None, None -> self#stop
+            | _, _ -> {< otransform_ = objOpt >}#next
+        
+        method private odrain: 'o -> unit Iom_gadget.t = fun obj ->
+            obj#produce >>= function
+            | Cf_exnopt.X x -> self#fail x
+            | Cf_exnopt.U objOpt -> self#oloop objOpt
+        
+        method private isignal f =
+            match itransform_ with
+            | Some obj -> f obj >>= self#idrain
+            | None -> assert (not true); self#iloop None
+        
+        method private iready = self#isignal (fun obj -> obj#ready)
+        method private iwait = self#isignal (fun obj -> obj#wait)
+        method private iblock = self#isignal (fun obj -> obj#block)
+        method private iunblock = self#isignal (fun obj -> obj#unblock)
+        
+        method private osignal f =
+            match otransform_ with
+            | Some obj -> f obj >>= self#odrain
+            | None -> assert (not true); self#oloop None
+                
+        method private oready = self#osignal (fun obj -> obj#ready)
+        method private owait = self#osignal (fun obj -> obj#wait)
+        method private oblock = self#osignal (fun obj -> obj#block)
+        method private ounblock = self#osignal (fun obj -> obj#unblock)
+        
+        method private ipush = function
+            | Cf_exnopt.U obj -> self#idrain obj
+            | Cf_exnopt.X x -> self#fail x
+        
+        method private opush = function
+            | Cf_exnopt.U obj -> self#odrain obj
+            | Cf_exnopt.X x -> self#fail x
+
+        method private iguard =
+            match itransform_ with
+            | None -> Cf_cmonad.return ()
+            | Some obj -> obj#consume self#ipush
+
+        method private oguard =
+            match otransform_ with
+            | None -> Cf_cmonad.return ()
+            | Some obj -> obj#consume self#opush
+    
+        method private guard =
+            super#guard >>= fun () ->
+            self#iguard >>= fun () ->
+            self#oguard
+        
+        method private control = function
+            | #Iom_stream.ready -> self#oready
+            | #Iom_stream.wait -> self#owait
+            | c -> super#control c
+        
+        method private notify = function
+            | #Iom_stream.ready -> self#iready
+            | #Iom_stream.wait -> self#iwait
+            | n -> super#notify n
+    end
+
+let ingest xJack cons =
+    Iom_gadget.duplex >>= fun (yJack, yPlug) ->
+    Iom_gadget.simplex >>= fun (yRx, yTx) ->
+    let xRx, (_, xCtrl as xJack) = xJack in
+    let xy = (cons xCtrl xRx yTx :> ('x, 'y) Iom_stream.transform) in
+    xy#unblock >>= fun xy ->
+    let m = new isimplex xy xJack yPlug in
+    m#start >>= fun () ->
+    xCtrl#put `Ready >>= fun () ->
+    Cf_cmonad.return (yRx, yJack)
+
+let render xJack cons =
+    Iom_gadget.duplex >>= fun (yJack, yPlug) ->
+    Iom_gadget.simplex >>= fun (yRx, yTx) ->
+    let xTx, xJack = xJack in
+    let _, yNtfy = yPlug in
+    let xy = (cons yNtfy yRx xTx :> ('x, 'y) Iom_stream.transform) in
+    xy#unblock >>= fun xy ->
+    let m = new osimplex xy xJack yPlug in
+    m#start >>= fun () ->
+    yNtfy#put `Ready >>= fun () ->
+    Cf_cmonad.return (yTx, yJack)
+
+let duplex xJack iCons oCons =
+    Iom_gadget.duplex >>= fun (yJack, yPlug) ->
+    Iom_gadget.duplex >>= fun ((yoRx, yiTx), (yiRx, yoTx)) ->
+    let (xiRx, xoTx), (_, xCtrl as xJack) = xJack in
+    let _, yNtfy = yPlug in
+    let ixy = (iCons xCtrl xiRx yiTx :> ('xi, 'yi) Iom_stream.transform) in
+    let oxy = (oCons yNtfy yoRx xoTx :> ('yo, 'xo) Iom_stream.transform) in
+    ixy#unblock >>= fun ixy ->
+    oxy#unblock >>= fun oxy ->
+    let m = new duplex ixy oxy xJack yPlug in
+    m#start >>= fun () ->
+    xCtrl#put `Ready >>= fun () ->
+    yNtfy#put `Ready >>= fun () ->
+    Cf_cmonad.return ((yiRx, yoTx), yJack)
+
+(*--- End of File [ iom_layer.ml ] ---*)

File iom/iom_layer.mli

+(*---------------------------------------------------------------------------*
+  INTERFACE  iom_layer.mli
+
+  Copyright (c) 2006, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+class type ['xy, 'c, 'n] simplex = object('self)
+    inherit ['c, 'n] Iom_stream.notify
+    inherit ['c, 'n] Iom_stream.control
+    inherit Iom_gadget.next
+    inherit Iom_gadget.start
+    constraint 'xy = ('x, 'y) #Iom_stream.transform
+    
+    val transform_: 'xy
+
+    method private block: unit Iom_gadget.t
+    method private unblock: unit Iom_gadget.t
+    method private ready: unit Iom_gadget.t
+    method private wait: unit Iom_gadget.t
+end
+
+class ['xy, 'c, 'n] isimplex:
+    'xy -> ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
+    ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
+    object
+        inherit ['xy, 'c, 'n] simplex
+        constraint 'xy = ('x, 'y) #Iom_stream.transform
+        constraint 'c = [> Iom_stream.flowcontrol ]
+    end
+
+class ['xy, 'c, 'n] osimplex:
+    'xy -> ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
+    ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
+    object
+        inherit ['xy, 'c, 'n] simplex
+        constraint 'xy = ('x, 'y) #Iom_stream.transform
+        constraint 'n = [> Iom_stream.flownotify ]
+    end
+
+class ['ixy, 'oyx, 'c, 'n] duplex:
+    'ixy -> 'oyx ->
+    ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
+    ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
+    object('self)
+        inherit ['c, 'n] Iom_stream.notify
+        inherit ['c, 'n] Iom_stream.control
+        inherit Iom_gadget.next
+        inherit Iom_gadget.start
+        constraint 'c = [> Iom_stream.flowcontrol ]
+        constraint 'n = [> Iom_stream.flownotify ]
+        constraint 'ixy = ('ix, 'iy) #Iom_stream.transform
+        constraint 'oyx = ('oy, 'ox) #Iom_stream.transform
+        
+        val itransform_: 'ixy option
+        val otransform_: 'oyx option
+
+        method private iblock: unit Iom_gadget.t
+        method private iunblock: unit Iom_gadget.t
+        method private iready: unit Iom_gadget.t
+        method private iwait: unit Iom_gadget.t
+
+        method private oblock: unit Iom_gadget.t
+        method private ounblock: unit Iom_gadget.t
+        method private oready: unit Iom_gadget.t
+        method private owait: unit Iom_gadget.t
+    end
+
+val ingest:
+  ('x, [> Iom_stream.flowcontrol ] as 'c, [> Iom_stream.failed ] as 'n)
+    Iom_stream.ijack ->
+  ('c Iom_gadget.tx -> 'x Iom_gadget.rx -> 'y Iom_gadget.tx ->
+    ('x, 'y) #Iom_stream.transform) ->
+  ('y, 'c, 'n) Iom_stream.ijack Iom_gadget.t
+
+val render:
+  ('y, [> Iom_stream.stop ] as 'c, [> Iom_stream.flownotify ] as 'n)
+    Iom_stream.ojack ->
+  ('n Iom_gadget.tx -> 'x Iom_gadget.rx -> 'y Iom_gadget.tx ->
+    ('x, 'y) #Iom_stream.transform) ->
+  ('x, 'c, 'n) Iom_stream.ojack Iom_gadget.t
+
+val duplex:
+    ('xi, 'xo, [> Iom_stream.flowcontrol ] as 'c,
+        [> Iom_stream.flownotify ] as 'n) Iom_stream.iojack ->
+    ('c Iom_gadget.tx -> 'xi Iom_gadget.rx -> 'yi Iom_gadget.tx ->
+        ('xi, 'yi) #Iom_stream.transform) ->
+    ('n Iom_gadget.tx -> 'yo Iom_gadget.rx -> 'xo Iom_gadget.tx ->
+        ('yo, 'xo) #Iom_stream.transform) ->
+    ('yi, 'yo, 'c, 'n) Iom_stream.iojack Iom_gadget.t
+
+(*--- End of File [ iom_layer.mli ] ---*)

File iom/iom_machine.ml

-(*---------------------------------------------------------------------------*
-  IMPLEMENTATION  iom_machine.ml
-
-  Copyright (c) 2005-2006, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-open Cf_cmonad.Op
-
-class virtual substate =
-    object(_:'self)
-        method virtual next: ('self, unit) Iom_gadget.t
-    end
-
-class virtual state =
-    object(self:'self)
-        method private virtual guard: ('self, unit) Iom_gadget.guard_t
-
-        method next: ('self, unit) Iom_gadget.t =
-            Iom_gadget.store self >>= fun () ->
-            Iom_gadget.guard self#guard
-    end
-
-class initial =
-    object(self:'self)
-        inherit state
-
-        method private guard = Cf_cmonad.return ()
-
-        method start: 's. ('s, unit) Iom_gadget.t =
-            Iom_gadget.start (Iom_gadget.guard self#guard) self
-    end
-
-(*--- End of File [ iom_machine.ml ] ---*)

File iom/iom_machine.mli

-(*---------------------------------------------------------------------------*
-  INTERFACE  iom_machine.mli
-
-  Copyright (c) 2005-2006, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-(** Object-oriented framework for monadic composition of complex I/O stream
-    processors.
-*)
-
-(** {6 Overview}
-    
-    A specialization of [Cf_machine] in the {b Cf} library, specifically
-    targeted to processing a stream of I/O events in concurrent,
-    single-threaded network services.  The interface is the same, but the
-    input and output type parameters have been removed.
-*)
-
-(** A virtual class to represent the public interface of object-oriented gadget
-    states.  Use [inherit ['i, 'o] substate] or [constrain 'self = ('i, 'o)
-    #substate] to derive a class that implements part of a state in a machine.
-*)
-class virtual substate:
-    object('self)
-        (** Use [obj#next] to transition the state of the machine to [obj]. *)
-        method virtual next: ('self, unit) Iom_gadget.t
-    end
-
-(** A virtual class to represent the public and private interface of object-
-    oriented gadget states.  Use [inherit ['i, 'o] state] to derive a class
-    that implements a complete state in a state machine.
-*)
-class virtual state:
-    object('self)
-        inherit substate
-
-        (** Use [obj#next] to transition the state of the machine by storing
-            [obj] in the state of the gadget and applying {!Iom_gadget.guard}
-            [self#guard].
-        *)
-        method next: ('self, unit) Iom_gadget.t
-        
-        (** Implement to produce a guard for receiving events on one or more
-            wires.
-        *)
-        method private virtual guard: ('self, unit) Iom_gadget.guard_t
-    end
-
-(** Use [inherit ['i, 'o] initial] to derive a class to represent the
-    complete initial state of a machine.  While this class is not virtual, its
-    [guard] method is only defined as a stub that references no gates.  It is
-    provided in the base class to make the class type more useful.
-*)
-class initial:
-    object('self)
-        inherit state
-
-        (** A stub method that produces a guard on no gates at all.  The intent
-            is for subclasses to override this method as necessary.
-        *)
-        method private guard: ('self, unit) Iom_gadget.guard_t
-        
-        (** Starts a new gadget process.  Defined as {!Iom_gadget.start}
-            [(Iom_gadget.guard self#guard) self].
-        *)
-        method start: 's. ('s, unit) Iom_gadget.t
-    end
-
-(*--- End of File [ iom_machine.mli ] ---*)

File iom/iom_octet_stream.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  iom_octet_stream.ml
+
+  Copyright (c) 2006, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(*
+let jout = Cf_journal.stdout
+*)
+
+exception Buffering
+
+class fragment more data = object
+    method more: Iom_stream.more = more
+    method data: Cf_message.t = data
+end
+
+type limits = { low: int; high: int }
+
+let normalize_limits limits =
+    let low = if limits.low < 1 then 1 else limits.low in
+    let high = if limits.high < low then low else limits.high in
+    if low <> limits.low || high <> limits.high then
+        { low = low; high = high }
+    else
+        limits
+
+open Cf_cmonad.Op
+
+class virtual ['x, 'y] transform ~limits waitTx xRx yTx =
+    let waitTx = (waitTx :> 'wait Iom_gadget.tx) in
+    object
+        inherit ['x, 'y] Iom_stream.transform xRx yTx
+        val buffer_: Cf_message.substring Cf_deque.t = Cf_deque.nil
+        val mark_ = 0
+        val more_ = Iom_stream.More
+        val limits_ = limits
+        
+        method private push_octets data =
+            let length = Cf_message.length data in
+            let mark = mark_ + length in
+            let buffer = Cf_message.push data buffer_ in
+            if consume_ && not (mark < limits_.high) then begin
+                waitTx#put `Wait >>= fun () ->
+                Cf_cmonad.return (false, buffer, mark)
+            end
+            else
+                Cf_cmonad.return (consume_, buffer, mark)
+        
+        method private shift_octets len =
+            let mark = if mark_ - len > 0 then mark_ - len else 0 in
+            let buffer = Cf_message.shiftq ~len buffer_ in
+            if not consume_ && mark < limits_.low then begin
+                waitTx#put `Ready >>= fun () ->
+                Cf_cmonad.return (true, buffer, mark)
+            end
+            else
+                Cf_cmonad.return (consume_, buffer, mark)
+    end
+
+let no_limits_emit_ = { low = 1; high = max_int }
+
+class ['x, 'y] emit ?limits waitTx xRx yTx =
+    let limits =
+        match limits with
+        | None -> no_limits_emit_
+        | Some limits -> normalize_limits limits
+    in
+    object(self)
+        inherit ['x, 'y] transform ~limits waitTx xRx yTx
+        constraint 'y = #fragment
+        
+        method private fragment: 'x -> Iom_stream.more * Cf_message.t =
+            fun _ ->
+            Iom_stream.Last, []
+        
+        method private emit: 'y list * int = [], 0
+        
+        method push x =
+            let more, data = self#fragment x in
+            self#push_octets data >>= fun (consume, buffer, mark) ->
+            Cf_cmonad.return {<
+                consume_ = consume; buffer_ = buffer; mark_ = mark;
+                more_ = more;
+            >}
+        
+        method shift =
+            let s, len = self#emit in
+            self#shift_octets len >>= fun (consume, buffer, mark) ->
+            let obj =
+                match more_ with
+                | Iom_stream.More ->
+                    Some {<
+                        consume_ = consume; buffer_ = buffer; mark_ = mark
+                    >}
+                | Iom_stream.Last ->
+                    None
+            in
+            Cf_cmonad.return (Cf_exnopt.U (s, obj))
+    end
+
+let x_negative_adjust_ = "Iom_octet_stream.parse: negative adjustment!"
+
+let no_limits_parse_ = { low = max_int; high = max_int }
+
+class ['x, 'y] parse ?limits waitTx xRx yTx =
+    let limits =
+        match limits with
+        | None -> no_limits_parse_
+        | Some limits -> normalize_limits limits
+    in
+    object(self)
+        inherit ['x, 'y] transform ~limits waitTx xRx yTx
+        constraint 'x = #fragment
+                
+        method private lex:
+            Cf_message.t -> ('y * int) Cf_lex.t = fun _ -> Cf_parser.nil
+
+        method private parse =
+            let rec loop s n m z =
+                match
+                    try Cf_exnopt.U (self#lex m z) with x -> Cf_exnopt.X x
+                with
+                | Cf_exnopt.U (Some ((v, d), z)) ->
+                    if d < 0 then
+                        invalid_arg x_negative_adjust_
+                    else
+                        let v = v :: s in
+                        if d = 0 && Lazy.force z = Cf_seq.Z then
+                            Some (List.rev v, n)
+                        else
+                            loop v (n + d) (Cf_message.shift ~pos:d m) z
+                | Cf_exnopt.U None
+                | Cf_exnopt.X Buffering ->
+                    if n > 0 then Some (List.rev s, n) else None
+                | Cf_exnopt.X x ->
+                    raise x
+            in
+            let x =
+                match more_ with Iom_stream.More -> Some Buffering | _ -> None
+            in
+            let m = Cf_message.drain buffer_ in
+            loop [] 0 m (Cf_message.to_seq ?x m)
+        
+        method push x =
+            self#push_octets x#data >>= fun (consume, buffer, mark) ->
+            Cf_cmonad.return {<
+                consume_ = consume; buffer_ = buffer; mark_ = mark;
+                more_ = x#more;
+            >}
+        
+        method shift =
+            match
+                try Cf_exnopt.U self#parse with x -> Cf_exnopt.X x
+            with
+            | Cf_exnopt.X x ->
+                Cf_cmonad.return (Cf_exnopt.X x)
+            | Cf_exnopt.U None ->
+                Cf_cmonad.return (Cf_exnopt.U ([], Some self))
+            | Cf_exnopt.U (Some (s, len)) ->
+                self#shift_octets len >>= fun (consume, buffer, mark) ->
+                let obj =
+                    match more_ with
+                    | Iom_stream.Last when mark_ > 0 ->
+                        None
+                    | _ ->
+                        Some {<
+                            consume_ = consume; buffer_ = buffer; mark_ = mark
+                        >}
+                in
+                Cf_cmonad.return (Cf_exnopt.U (s, obj))
+    end
+
+(*--- End of File [ iom_octet_stream.ml ] ---*)

File iom/iom_octet_stream.mli

+(*---------------------------------------------------------------------------*
+  INTERFACE  iom_octet_stream.mli
+
+  Copyright (c) 2006, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+exception Buffering
+
+class fragment: Iom_stream.more -> Cf_message.t -> object
+    inherit Iom_stream.event
+    method data: Cf_message.t
+end
+
+type limits = { low: int; high: int }
+val normalize_limits: limits -> limits
+
+class virtual ['x, 'y] transform:
+    limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
+    object
+        inherit ['x, 'y] Iom_stream.transform
+
+        val buffer_: Cf_message.substring Cf_deque.t
+        val mark_: int
+        val more_: Iom_stream.more
+        val limits_: limits
+        
+        method private push_octets:
+            Cf_message.t ->
+            (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
+        
+        method private shift_octets:
+            int -> (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
+    end
+
+class ['x, 'y] emit:
+    ?limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
+    object('self)
+        inherit ['x, 'y] transform
+        constraint 'y = #fragment
+
+        method private fragment: 'x -> Iom_stream.more * Cf_message.t
+        method private emit: 'y list * int
+
+        method push: 'x -> 'self Iom_gadget.t
+        method shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
+    end
+
</