Commits

Anonymous committed 0c870e0

Major refactoring between v0.1 and v0.2 (see CHANGES).

  • Participants
  • Parent commits 6ab0559

Comments (0)

Files changed (13)

-iom_reactor.cmi: iom_gadget.cmi 
-iom_sock_stream.cmi: iom_gadget.cmi 
+iom_machine.cmi: iom_gadget.cmi 
+iom_reactor.cmi: iom_machine.cmi iom_gadget.cmi 
+iom_file.cmi: iom_reactor.cmi iom_gadget.cmi 
+iom_socket.cmi: iom_machine.cmi iom_gadget.cmi iom_file.cmi 
+iom_sock_stream.cmi: iom_socket.cmi iom_gadget.cmi iom_file.cmi 
 iom_tcp4_socket.cmi: iom_sock_stream.cmi 
 iom_tcp6_socket.cmi: iom_sock_stream.cmi 
 iom_gadget.cmo: iom_gadget.cmi 
 iom_gadget.cmx: iom_gadget.cmi 
-iom_reactor.cmo: iom_gadget.cmi iom_reactor.cmi 
-iom_reactor.cmx: iom_gadget.cmx iom_reactor.cmi 
-iom_sock_stream.cmo: iom_gadget.cmi iom_sock_stream.cmi 
-iom_sock_stream.cmx: iom_gadget.cmx iom_sock_stream.cmi 
+iom_machine.cmo: iom_gadget.cmi iom_machine.cmi 
+iom_machine.cmx: iom_gadget.cmx iom_machine.cmi 
+iom_reactor.cmo: iom_machine.cmi iom_gadget.cmi iom_reactor.cmi 
+iom_reactor.cmx: iom_machine.cmx iom_gadget.cmx iom_reactor.cmi 
+iom_file.cmo: iom_reactor.cmi iom_gadget.cmi iom_file.cmi 
+iom_file.cmx: iom_reactor.cmx iom_gadget.cmx iom_file.cmi 
+iom_socket.cmo: iom_machine.cmi iom_gadget.cmi iom_file.cmi iom_socket.cmi 
+iom_socket.cmx: iom_machine.cmx iom_gadget.cmx iom_file.cmx iom_socket.cmi 
+iom_sock_stream.cmo: iom_socket.cmi iom_gadget.cmi iom_file.cmi \
+    iom_sock_stream.cmi 
+iom_sock_stream.cmx: iom_socket.cmx iom_gadget.cmx iom_file.cmx \
+    iom_sock_stream.cmi 
 iom_tcp4_socket.cmo: iom_sock_stream.cmi iom_tcp4_socket.cmi 
 iom_tcp4_socket.cmx: iom_sock_stream.cmx iom_tcp4_socket.cmi 
 iom_tcp6_socket.cmo: iom_sock_stream.cmi iom_tcp6_socket.cmi 
 iom_tcp6_socket.cmx: iom_sock_stream.cmx iom_tcp6_socket.cmi 
-t/t_iom.cmo: iom_tcp6_socket.cmi iom_tcp4_socket.cmi iom_sock_stream.cmi \
-    iom_gadget.cmi 
-t/t_iom.cmx: iom_tcp6_socket.cmx iom_tcp4_socket.cmx iom_sock_stream.cmx \
-    iom_gadget.cmx 
+t/t_iom.cmo: iom_tcp6_socket.cmi iom_tcp4_socket.cmi iom_socket.cmi \
+    iom_sock_stream.cmi iom_gadget.cmi iom_file.cmi 
+t/t_iom.cmx: iom_tcp6_socket.cmx iom_tcp4_socket.cmx iom_socket.cmx \
+    iom_sock_stream.cmx iom_gadget.cmx iom_file.cmx 
 released versions of the library.
 
 
+===== Version 0.2 =====
+
+This release is a major refactoring of almost all of the interfaces and
+implementations.  Basically, everything north of the underlying [Cf_gadget]
+wrapper has been designed over from scratch.
+
+The main reason for doing this is to leverage the knowledge gained from other
+experimental projects that used the interfaces defined in the previous release.
+The new interface should offer more opportunities for reuse and be more easily
+adapted for new types of I/O requirements, e.g. UDP sockets, AF_LOCAL sockets,
+multicast, etcetera.
+
+Highlights of refactoring this release:
+
++ OCNAE Cf-0.7 is now required (for additions to [Cf_gadget] and other things).
++ Add new module [Iom_machine] to match the [Cf_machine] new with Cf-0.7.
++ Replace the [Iom_gadget.poll] function with a virtual subclass of
+  [Iom_machine.state].
++ Redesigned interface to core reactors in [Iom_reactor] module.
++ New modules [Iom_file] and [Iom_socket] containing virtual classes for state
+  machines that process more generalized I/O events.
++ Redesigned interface to TCP connection/listening reactors and SOCK_STREAM
+  endpoint reactors.
+
+Additional interfaces for UDP sockets, AF_LOCAL sockets and multicast, etc. are
+deferred until the next minor release at the earliest.
+
 ===== Version 0.1 =====
 
 Highlights of this release:
 OCAMLDEP     = ocamldep
 OCAMLLEX     = ocamllex
 OCAMLYACC    = ocamlyacc
+OCAMLDOC     = ocamlfind ocamldoc $(OCAMLFINDOPT)
 
 CC_OPT       = -ccopt -fPIC -ccopt -O2 -ccopt -Wall -ccopt -Wno-unused-variable
 CMI_OPT      = $(DEBUG_OPT)
 IOM_YACC_MLI_FILES = $(IOM_YACC_MODULES:%=%.mli)
 
 IOM_MODULES = \
-    gadget reactor sock_stream tcp4_socket tcp6_socket
+    gadget machine reactor file socket sock_stream tcp4_socket tcp6_socket
 
 IOM_PRIMITIVES = 
 
 
 ###############################################################################
 
-TEST_MODULES = iom # mirrord mirrorc echo perf
+TEST_MODULES = iom # perf
 
 TEST_PROGRAMS = $(TEST_MODULES:%=t.%)
 TEST_OPT_PROGRAMS = $(TEST_MODULES:%=t-opt.%)
 
 doc::
 	@mkdir -p doc
-	ocamldoc.opt -v -d doc -html -colorize-code -m A \
-	  -I $(DOC_INCLUDE_CF) $(DOC_SOURCES)
+	$(OCAMLDOC) -v -d doc -html -colorize-code -m A $(DOC_SOURCES)
+
+#	ocamldoc.opt -v -d doc -html -colorize-code -m A \
+#	  -I $(DOC_INCLUDE_CF) $(DOC_SOURCES)
 
 ###############################################################################
 

iom/iom_gadget.ml

 class type connector = Cf_gadget.connector
 class ['a] rx e = ['a, input_t, output_t] Cf_gadget.rx e
 class ['a] tx e = ['a, input_t, output_t] Cf_gadget.tx e
+type ('x, 'y) duplex_t = ('x rx * 'y tx) * ('y rx * 'x tx)
 
 let wire = Cf_gadget.wire
 let null = Cf_gadget.null
 type queue_t = <get: (unit, unit) t> Queue.t
 type kernel_t = { k_poll_: Cf_poll.t; k_queue_: queue_t }
 
-type control_t = C_final | C_load | C_unload
+let poll e k =
+    let e = (e :> [> (unit, unit) t Cf_poll.state_t ] Cf_poll.event) in
+    e#load ~q:k.k_queue_ k.k_poll_
 
 let run =
-    let rec ingest1_ (q : queue_t) () =
+    let rec ingest1_ q =
         if Queue.is_empty q then
             Cf_cmonad.return ()
         else
             let e = Queue.take q in
-            start e#get () >>= ingest1_ q
-    and ingest0_ q () =
+            e#get >>= fun () ->
+            ingest1_ q
+    and ingest0_ q =
         Cf_gadget.read >>= function
         | Cf_poll.More ->
-            ingest1_ q () >>= ingest0_ q
+            ingest1_ q >>= fun () ->
+            ingest0_ q
         | Cf_poll.Last ->
             Cf_cmonad.return ()
     in
     fun f s0 ->
         let p = Cf_poll.create () and q = Queue.create () in
         let k = { k_poll_ = p; k_queue_ = q; } in
-        let m = f k >>= Cf_gadget.start (ingest0_ k.k_queue_ ()) in
+        let m = f k >>= Cf_gadget.start (ingest0_ k.k_queue_) in
         let w = Cf_gadget.eval m s0 in
         let save = Unix.sigprocmask Unix.SIG_SETMASK [] in
         runloop_ p w;
         ignore (Unix.sigprocmask Unix.SIG_SETMASK save)
 
-let poll ?c:copt ~e k =
-    let e = (e :> (unit, unit) t Cf_poll.event) in
-    let q = k.k_queue_ and p = k.k_poll_ in
-    match copt with
-    | None ->
-        e#load ~q p; Cf_cmonad.return ()
-    | Some c ->
-        let c = (c :> control_t rx) in
-        let rec loop () =
-            guard begin
-                c#get begin function
-                    | C_final -> Cf_cmonad.return ()
-                    | C_load -> e#load ~q p; loop ()
-                    | C_unload -> e#unload; loop ()
-                end
-            end
-        in
-        Cf_gadget.start (loop ()) ()
-
 (*--- End of File [ iom_gadget.ml ] ---*)

iom/iom_gadget.mli

 
     A specialization of [Cf_gadget] in the {b Cf} library, specifically
     targeted to processing a stream of I/O events in concurrent,
-    single-threaded network services.  Evaluating an I/O gadget monad results
-    in the execution of an event loop (using the [Cf_poll] module).  The
-    underlying [Cf_flow] object conceptually reads I/O events from the system
-    and writes commands to wait for more.
+    single-threaded network services.  The interface is the same, except the
+    input and output type parameters are removed, and there are no explicit
+    [read] and [write] functions.
+    
+    Evaluating an I/O gadget monad results in the execution of an event loop
+    (using the [Cf_poll] module).  The underlying [Cf_flow] object conceptually
+    reads I/O events from the system and writes commands to wait for more.
     
     An I/O gadget monad is a continuation monad initialized with an abstract
     unit of work for the scheduler.  Monadic functions are defined for the
 type 's gate_t
 
 (** The abstract type of an event wire. *)
-type 's wire_t
+type 'a wire_t
 
 (** A continuation monad for composing logical sequences of scheduling work
     within a specific process.  The scheduler may interleave units of work from
 *)
 val simplex: ('s, 'a rx * 'a tx) t
 
+(** A convenience type combining two wires into a pair of [rx] and [tx]
+    tuples, one tuple for each end of a duplex communication.  Each tuple is
+    composed of the [rx] and the [tx] objects to use on one end of the
+    communication.
+*)
+type ('x, 'y) duplex_t = ('x rx * 'y tx) * ('y rx * 'x tx)
+
 (** Use [duplex >>= f] to get a pair of new wires from the scheduler, construct
     a {i pair} of receive and transmit connector pairs, one for each wire, and
     apply it to the bound function [f].
 *)
-val duplex: ('s, ('a rx * 'b tx) * ('b rx * 'a tx)) t
+val duplex: ('s, ('x, 'y) duplex_t) t
 
 (** Use [load >>= f] to load the internal state of the current process and
     apply it to the bound function [f].
 (** The abstract type of the system I/O event multiplexer. *)
 type kernel_t
 
+(** Use [poll e k] to load the I/O event into the polling mux and event queue
+    comprising the kernel [k].
+*)
+val poll:
+    [> (unit, unit) t Cf_poll.state_t ] #Cf_poll.event -> kernel_t -> unit
+
 (** Use [run f s] to construct a new I/O event kernel, apply it to the function
     [f] to obtain a top-level process for the event loop, and start the process
     running with the initial state [s].
 *)
 val run: (kernel_t -> ('s, unit) t) -> 's -> unit
 
-(** The type of events used for controlling I/O polling event reactors. *)
-type control_t =
-    | C_final   (** Terminate the reactor. *)
-    | C_load    (** Begin waiting for the event. *)
-    | C_unload  (** Stop waiting for the event. *)
-
-(** A reactor that waits for I/O events.  Use [poll ~e k] to construct a
-    reactor process that waits for the event [e] to occur and starts the
-    associated process (which usually sends an event on a transmit connector).
-    Use [poll ~c ~e k] to construct a reactor process that waits for
-    {!control_t} events on the receiver [c], and loads/unloads the event [e]
-    in the kernel [k] in a loop until it receives the [C_final] event.
-*)
-val poll:
-    ?c:control_t #rx -> e:(unit, unit) t #Cf_poll.event ->
-    kernel_t -> ('s, unit) t
-
 (*--- End of File [ iom_gadget.mli ] ---*)

iom/iom_reactor.ml

 (*---------------------------------------------------------------------------*
   IMPLEMENTATION  iom_reactor.ml
 
-  Copyright (c) 2002-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(*
-let xprintf fmt =
-    print_string "[Iom_reactor] ";
-    Printf.printf fmt 
-*)
+open Cf_cmonad.Op
+module G = Iom_gadget
 
-open Iom_gadget
+class virtual ['control, 'state] core ?c k =
+    object(self:'self)
+        inherit Iom_machine.initial as super
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+        constraint 'self = 'state #Cf_poll.event
 
-type e_signal_t = [ `Signal of int ]
-type e_time_t = [ `Time of Cf_tai64n.t ]
-type e_idle_t = [ `Idle of Cf_tai64n.t ]
+        val mutable started_ = false
+                
+        method private virtual control: 'control -> ('self, unit) Iom_gadget.t
 
-class signal r n =
-    let r = (r :> e_signal_t tx) in
-    let m = `Final (r#put (`Signal n)) in
-    object
-        inherit [(unit, unit) t] Cf_poll.signal n
-        method private service_ _ = m
+        method private guard =
+            match c with
+            | None ->
+                Cf_cmonad.return ()
+            | Some ctrlRx ->
+                let ctrlRx = (ctrlRx :> 'control G.rx) in
+                ctrlRx#get self#control
+        
+        method start =
+            if started_ then
+                failwith "Iom_reactor.core#start: already started!";
+            started_ <- true;
+            if c = None then Iom_gadget.poll self k;
+            super#start
     end
 
-let signal ?c ~r ~n k = poll ?c ~e:(new signal r n) k
+type control_t = [ `Load | `Unload | `Cancel ]
+type signal_t = [ `Signal of int ]
+type time_t = [ `Time of Cf_tai64n.t ]
+type idle_t = [ `Idle of Cf_tai64n.t ]
+type 'rwx file_t = [ `File of Unix.file_descr * 'rwx ]
+    constraint 'rwx = [< Cf_poll.rwx_t ]
 
-class time r t =
-    let r = (r :> e_time_t tx) in
-    let m = `Final (r#put (`Time t)) in
-    object
-        inherit [(unit, unit) t] Cf_poll.time t
-        method private service_ _ = m
+class virtual ['control, 'state] auxcore ?c ?s k =
+    object(self:'self)
+        inherit ['control, 'state] core ?c k
+        constraint 'control = [> control_t ]
+        
+        method private service p : 'state =
+            match s with
+            | None -> `Loaded p
+            | Some m -> `Final m
+        
+        method private control = function
+            | `Cancel ->
+                self#unload;
+                Cf_cmonad.return ()
+            | `Load ->
+                Iom_gadget.poll self k;
+                self#next
+            | `Unload ->
+                self#unload;
+                self#next
+            | (_ : 'ctrl) ->
+                assert (not true);
+                self#next
     end
 
-let time ?c ~r ~t k = poll ?c ~e:(new time r t) k
+class ['control, 'rwx] filerwx ?c r rwx fd k =
+    let r = (r :> 'out G.tx) in
+    let s = r#put (`File (fd, rwx)) in
+    object(self:'self)
+        inherit ['state] Cf_poll.file rwx fd
+        inherit ['control, 'state] auxcore ?c ~s k
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+        constraint 'rwx = [< Cf_poll.rwx_t ]
+    end
 
-class idle r =
-    let r = (r :> e_idle_t tx) in
+class ['control] signal ?c r n k =
+    let r = (r :> 'out G.tx) in
+    let s = r#put (`Signal n) in
     object
-        inherit [(unit, unit) t] Cf_poll.idle
+        inherit ['state] Cf_poll.signal n
+        inherit ['control, 'state] auxcore ?c ~s k
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+    end
 
-        method private service_ _ =
+class ['control] time ?c r t k =
+    let r = (r :> 'out G.tx) in
+    let s = r#put (`Time t) in
+    object
+        inherit ['state] Cf_poll.time t
+        inherit ['control, 'state] auxcore ?c ~s k
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+    end
+
+class ['control] idle ?c r k =
+    let r = (r :> 'out G.tx) in
+    object
+        inherit ['state] Cf_poll.idle
+        inherit ['control, 'state] auxcore ?c k
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+
+        method private service _ =
             let now =
                 match epoch_ with
                 | Some now -> now
             `Final (r#put (`Idle now))
     end
 
-let idle ?c ~r k = poll ?c ~e:(new idle r) k
+let filerwx ?c rwx fd k =
+    G.simplex >>= fun (eRx, eTx) ->
+    let p = new filerwx ?c eTx rwx fd k in
+    p#start >>= fun () ->
+    Cf_cmonad.return eRx
+
+let signal ?c n k =
+    G.simplex >>= fun (eRx, eTx) ->
+    let p = new signal ?c eTx n k in
+    p#start >>= fun () ->
+    Cf_cmonad.return eRx
+
+let time ?c t k =
+    G.simplex >>= fun (eRx, eTx) ->
+    let p = new time ?c eTx t k in
+    p#start >>= fun () ->
+    Cf_cmonad.return eRx
+
+let idle ?c k =
+    G.simplex >>= fun (eRx, eTx) ->
+    let p = new idle ?c eTx k in
+    p#start >>= fun () ->
+    Cf_cmonad.return eRx
 
 (*--- End of File [ iom_reactor.ml ] ---*)

iom/iom_reactor.mli

 (*---------------------------------------------------------------------------*
   INTERFACE  iom_reactor.mli
 
-  Copyright (c) 2002-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(** Miscellaneous I/O reactors (signals, timeouts and idle). *)
+(** Primitive I/O event multiplexers.  Reactors to produce primitive events
+    associated with file descriptors, process signals, interval timers and
+    work-loop idle conditions.
+*)
 
 (** {6 Overview}
 
-    This module contains low-level I/O reactor functions useful for reacting
-    to timer events, system process signals, and event loop idle conditions.
+    This module implements primitive reactors for multiplexing I/O events at
+    the boundary between application and the operating system.  All the gadgets
+    defined here use subclasses of {!Cf_poll.event} as their internal state,
+    and they require a {!Iom_gadget.kernel_t} value in their constructors.
     
-    These functions are all specializations of {!Iom_gadget.poll} for each
-    subclass of {!Cf_poll.event}.
+    Note: because {!Cf_poll.event} is implemented in the imperative style and
+    used as the state parameter in a functional style programming framework,
+    care must be taken when inheriting from the classes in this module to
+    insure that order of evaluation is strictly maintained.
 *)
 
-(** {6 Types} *)
+(** {6 Core } *)
 
-(** The type of system signal events.  The integer is the signal number. *)
-type e_signal_t = [ `Signal of int ]
+(**
+    The core state machinery of all reactor primitives.  Use [inherit
+    ['control, 'state] core ?c k] to derive a reactor subclass that guards
+    the optional control event receiver [?c] and behaves as a {!Cf_poll.event}
+    object in all other respects.  (Use multiple inheritance to include the
+    required methods and values in the subclass.)  If the optional control
+    event receiver is not provided, then the object is conveniently pre-loaded
+    into the polling mux encapsulated within the kernel [k].
+    
+    Note: The [start] method inherited from {!Iom_machine.initial} may only be
+    invoked once for each object.  Subsequence invocations raise [Failure].
+*)
+class virtual ['control, 'state] core:
+    ?c:'control #Iom_gadget.rx ->
+    Iom_gadget.kernel_t ->
+    object('self)
+        (** Subclass of gadget state machine. *)
+        inherit Iom_machine.initial
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+        constraint 'self = 'state #Cf_poll.event
 
-(** The type of timer events.  The timestamp is the time of the event. *)
-type e_time_t = [ `Time of Cf_tai64n.t ]
+        val mutable started_: bool  (** Set [true] in the [start] method. *)
 
-(** The type of event loop idle events.  The timestamp is the time when the
-    idle condition occured.
+        (** Implement to handle incoming control events. *)
+        method private virtual control: 'control -> ('self, unit) Iom_gadget.t
+    end
+
+(** {6 Classes}
+
+    Use the object-oriented interface described in this section for deriving
+    subclasses of the primitive reactors.
 *)
-type e_idle_t = [ `Idle of Cf_tai64n.t ]
 
-(** {6 Functions} *)
+(** Control events.  Send to reactors to control their operation. *)
+type control_t = [
+    | `Load         (** Load the event into the polling mux. *)
+    | `Unload       (** Unload the event from the polling mux. *)
+    | `Cancel       (** Unload the event and exit the reactor loop. *)
+]
 
-(** Use [signal ?c ~r ~n k] to compose an I/O reactor that waits with the
-    kernel [k] for the system signal number [n] and sends [`Signal n] on the
-    transmitter [r].  The optional [?c] argument is passed unchanged to the
-    underlying call to {!Iom_gadget.poll}.
+(** System signal event.  Sent by signal reactors to indicate the reception of
+    a system signal.
+*)
+type signal_t = [ `Signal of int ]
+
+(** Timer event.  Sent by time reactors to indicate the arrival of a system
+    time epoch.
+*)
+type time_t = [ `Time of Cf_tai64n.t ]
+
+(** Idle event.  Sent by idle reactors to indicate the arrival of an idle
+    opportunity, when the work loop will block waiting for the next I/O event
+    unless another reactor schedules additional operations to perform.
+*)
+type idle_t = [ `Idle of Cf_tai64n.t ]
+
+(** File descriptor event.  Sent by file reactors to indicate the arrival of a
+    read/write/exception I/O condition on a {!Unix.file_descr}.
+*)
+type 'rwx file_t = [ `File of Unix.file_descr * 'rwx ]
+    constraint 'rwx = [< Cf_poll.rwx_t ]
+
+(** A convenience class type common to the primitive reactor classes defined
+    below.
+*)
+class type virtual ['control, 'state] auxcore =
+    object('self)
+        (** Subclass of I/O reactor core. *)
+        inherit ['control, 'state] core
+        constraint 'control = [> control_t ]
+
+        (** Implements method required by {!Cf_poll.event} for servicing I/O
+            polling mux events.
+        *)
+        method private service: Cf_poll.t -> 'state
+        
+        (** Implements method for handling control events. *)
+        method private control: 'control -> ('self, unit) Iom_gadget.t
+    end
+
+(** Use [new filerwx ?c tx rwx fd k] to create a state machine object for
+    primitive file events of type [rwx] in the polling mux kernel [k] for the
+    file descriptor [fd].  When events arrive, they are sent with the
+    transmitter [tx].  The optional control receiver [?c] can be specified to
+    control the loading and unloading into and out from the polling mux.  If
+    [?c] is not specified, then the event is automatically loaded when the
+    reactor is started, and unloaded when the first event arrives.
+*)
+class ['control, 'rwx] filerwx:
+    ?c:'control #Iom_gadget.rx -> [> 'rwx file_t ] #Iom_gadget.tx -> 'rwx ->
+    Unix.file_descr -> Iom_gadget.kernel_t ->
+    object('self)
+        (** Subclass of the primitive I/O reactor. *)
+        inherit ['control, 'state] auxcore
+        (** Subclass of core foundation file descriptor polling mux event. *)
+        inherit ['state] Cf_poll.file
+        constraint 'rwx = [< Cf_poll.rwx_t ]
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.file_state_t ]
+    end
+
+(** Use [new signal ?c tx n k] to create a state machine object for catching
+    the system signal [n] in the polling mux kernel [k].  When signals are
+    caught, the corresponding events are sent with the transmitter [tx].  The
+    optional control receiver [?c] can be specified to control the loading and
+    unloading into and out from the polling mux.  If [?c] is not specified,
+    then the event is automatically loaded when the reactor is started, and
+    unloaded when the first event arrives.
+*)
+class ['control] signal:
+    ?c:'control #Iom_gadget.rx -> [> signal_t ] #Iom_gadget.tx -> int ->
+    Iom_gadget.kernel_t ->
+    object('self)
+        (** Subclass of the primitive I/O reactor. *)
+        inherit ['control, 'state] auxcore
+        (** Subclass of core foundation system signal polling mux event. *)
+        inherit ['state] Cf_poll.signal
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+    end
+
+(** Use [new time ?c tx t k] to create a state machine object for waiting until
+    the system time [t] arrives in the polling mux kernel [k].  When the epoch
+    occurs, the corresponding event is sent with the transmitter [tx].  The
+    optional control receiver [?c] can be specified to control the loading and
+    unloading into and out from the polling mux.  If [?c] is not specified,
+    then the event is automatically loaded when the reactor is started, and
+    unloaded when the event arrives.
+*)
+class ['control] time:
+    ?c:'control #Iom_gadget.rx -> [> time_t ] #Iom_gadget.tx -> Cf_tai64n.t ->
+    Iom_gadget.kernel_t ->
+    object('self)
+        (** Subclass of the primitive I/O reactor. *)
+        inherit ['control, 'state] auxcore
+        (** Subclass of core foundation system timer polling mux event. *)
+        inherit ['state] Cf_poll.time
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+    end
+
+
+(** Use [new idle ?c tx k] to create a state machine object for waiting until
+    the next idle opportunity arrives in the polling mux kernel [k].  When the
+    opportunity occurs, the corresponding event is sent with the transmitter
+    [tx].  The optional control receiver [?c] can be specified to control the
+    loading and unloading into and out from the polling mux.  If [?c] is not
+    specified, then the event is automatically loaded when the reactor is
+    started, and unloaded when the event arrives.
+*)
+class ['control] idle:
+    ?c:'control #Iom_gadget.rx -> [> idle_t ] #Iom_gadget.tx ->
+    Iom_gadget.kernel_t ->
+    object('self)
+        inherit ['control, 'state] auxcore
+        inherit ['state] Cf_poll.idle
+        constraint 'state = [> (unit, unit) Iom_gadget.t Cf_poll.state_t ]
+    end
+
+(** {6 Functions}
+
+    Use the functions described in this section for convenient access to the
+    base class reactors.
+*)
+
+(** Use [filerwx ?c rwx fd k >>= fun rx -> ...] to start a file I/O reactor in
+    the kernel [k] on the file descriptor [fd] for reacting to events of the
+    type [rwx].  The optional control channel [?c] can be used to control the
+    loading and unloading into and out from the polling mux in the kernel.  If
+    [?c] is not specified, then the event is automatically loaded when the
+    reactor is started, and unloaded when the event arrives.
+*)
+val filerwx:
+    ?c:control_t #Iom_gadget.rx -> 'rwx -> Unix.file_descr ->
+    Iom_gadget.kernel_t -> ('s, 'rwx file_t Iom_gadget.rx) Iom_gadget.t
+
+(** Use [signal ?c n k >>= fun rx -> ...] to start a signal I/O reactor in
+    the kernel [k] for catching the system signal [n].  The optional control
+    channel [?c] can be used to control the loading and unloading into and out
+    from the polling mux in the kernel.  If [?c] is not specified, then the
+    event is automatically loaded when the reactor is started, and unloaded
+    when the event arrives.
 *)
 val signal:
-    ?c:Iom_gadget.control_t #Iom_gadget.rx ->
-    r:[> e_signal_t ] #Iom_gadget.tx -> n:int -> Iom_gadget.kernel_t ->
-    ('s, unit) Iom_gadget.t
+    ?c:control_t #Iom_gadget.rx -> int -> Iom_gadget.kernel_t ->
+    ('s, signal_t Iom_gadget.rx) Iom_gadget.t
 
-(** Use [time ?c ~r ~t k] to compose an I/O reactor that waits with the kernel
-    [k] for the system time [t] to arrive, and sends [`Time t] on the
-    transmitter [r].  The optional [?c] argument is passed unchanged to the
-    underlying call to {!Iom_gadget.poll}.
+(** Use [time ?c t k >>= fun rx -> ...] to start a timer I/O reactor in
+    the kernel [k] for waiting until the system epoch [t].  The optional
+    control channel [?c] can be used to control the loading and unloading into
+    out from the polling mux in the kernel.  If [?c] is not specified, then the
+    event is automatically loaded when the reactor is started, and unloaded
+    when the epoch occurs.
 *)
 val time:
-    ?c:Iom_gadget.control_t #Iom_gadget.rx -> r:[> e_time_t ] #Iom_gadget.tx ->
-    t:Cf_tai64n.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+    ?c:control_t #Iom_gadget.rx -> Cf_tai64n.t -> Iom_gadget.kernel_t ->
+    ('s, time_t Iom_gadget.rx) Iom_gadget.t
 
-(** Use [idle ?c ~r k] to compose an I/O reactor that waits for the event loop
-    in the kernel [k] to become idle (causing the program to block for some
-    non-zero length of time for system events to occur), and sends [`Idle t] on
-    the transmitter [r], where [t] is the time at which the idle condition
-    occurs.  The optional [?c] argument is passed unchanged to the underlying
-    call to {!Iom_gadget.poll}.
+(** Use [idle ?c k >>= fun rx -> ...] to start an idle I/O reactor in the
+    kernel [k] for waiting until the system epoch [t].  The optional control
+    channel [?c] can be used to control the loading and unloading into out from
+    the polling mux in the kernel.  If [?c] is not specified, then the event is
+    automatically loaded when the reactor is started, and unloaded when the
+    opportunity occurs.
 *)
 val idle:
-    ?c:Iom_gadget.control_t #Iom_gadget.rx -> r:[> e_idle_t ] #Iom_gadget.tx ->
-    Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+    ?c:control_t #Iom_gadget.rx -> Iom_gadget.kernel_t ->
+    ('s, idle_t Iom_gadget.rx) Iom_gadget.t
 
 (*--- End of File [ iom_reactor.mli ] ---*)

iom/iom_sock_stream.ml

 (*---------------------------------------------------------------------------*
   IMPLEMENTATION  iom_sock_stream.ml
 
-  Copyright (c) 2003-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
 
 (**)
 let jout = Cf_journal.stdout
-(* let () = jout#setlimit `None *)
 (**)
 
-type endpoint_tx_t =
-    | IO_tx_data of Cf_message.t	(* send data to peer *)
-    | IO_tx_release                 (* initiate orderly release *)
-    | IO_tx_extend of int           (* extend application receive window *)
-    | IO_tx_window of int           (* set application receive window *)
-    | IO_tx_reset                   (* abortive close *)
-    | IO_tx_unlink                  (* flush output and stop input *)
-
-type endpoint_rx_t =
-    | IO_rx_error of exn            (* socket closed on error condition *)
-    | IO_rx_data of Cf_message.t	(* data received from peer *)
-    | IO_rx_release                 (* peer initiated orderly release *)
-    | IO_rx_blocked                 (* transmit flow control *)
-    | IO_rx_ready                   (* ready for transmit *)
-    | IO_rx_closed                  (* orderly close complete *)
-    | IO_rx_unlinked                (* output flushed and input stopped *)
-
 module type T = sig
     module S: Cf_sock_stream.T
     
-    type connection_t = S.t * S.address_t * S.address_t
+    class connection:
+        Iom_file.more_t -> S.t ->
+        object
+            inherit Iom_file.event
+            method socket: S.t
+        end
+    
+    class connector:
+        ?sock:S.t -> ?src:S.address_t -> dst:S.address_t ->
+        ?c:Iom_file.control_t #Iom_gadget.rx -> 
+        s:Iom_file.signal_t #Iom_gadget.tx -> tx:connection #Iom_gadget.tx ->
+        Iom_gadget.kernel_t ->
+        object('self)
+            inherit [Iom_file.control_t, Iom_file.signal_t, 'state]
+                Iom_socket.core
+                        
+            method private service: Cf_poll.t -> 'state
+        end
 
-    type initiator_rx_t =
-        | I_connect of connection_t
-        | I_error of exn
+    type listener_signal_t = [ Iom_file.signal_t | `Bound of S.address_t ]
+
+    class listener:
+        ?sock:S.t -> ?src:S.address_t -> e:int -> ?d:int ->
+        ?c:Iom_file.reader_control_t #Iom_gadget.rx ->
+        s:listener_signal_t #Iom_gadget.tx ->
+        tx:connection #Iom_gadget.tx -> Iom_gadget.kernel_t ->
+        object('self)
+            inherit [Iom_file.reader_control_t, listener_signal_t, 'state]
+                Iom_socket.core
+            
+            method private bound: S.address_t -> ('self, unit) Iom_gadget.t
+            
+            method private pending:
+                Cf_poll.t -> connection list -> int -> 'state
+
+            method private service: Cf_poll.t -> 'state
+        end
     
-    val initiator:
-        c:initiator_rx_t #Iom_gadget.tx -> ?cancel:unit #Iom_gadget.rx ->
-        ?sock:S.t -> ?src:S.address_t -> S.address_t -> Iom_gadget.kernel_t ->
-        ('s, unit) Iom_gadget.t
-    
-    type listener_tx_t =
-        | L_limit of int
-        | L_extend of int
-        | L_close
-    
-    type listener_rx_t =
-        | L_bind of S.address_t
-        | L_connect of connection_t
-        | L_error of exn
-        
-    val listener:
-        c:(listener_tx_t #Iom_gadget.rx * listener_rx_t #Iom_gadget.tx) ->
-        ?sock:S.t -> ?qlen:int -> S.address_t -> Iom_gadget.kernel_t ->
-        ('s, unit) Iom_gadget.t
-    
-    val endpoint:
-        c:(endpoint_tx_t #Iom_gadget.rx * endpoint_rx_t #Iom_gadget.tx) -> 
-        S.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+    val connect:
+        ?sock:S.t -> ?src:S.address_t -> dst:S.address_t ->
+        ?c:Iom_file.control_t #Iom_gadget.rx -> Iom_gadget.kernel_t ->
+        ('s, Iom_file.signal_t Iom_gadget.rx * connection Iom_gadget.rx)
+            Iom_gadget.t
+
+    val listen:
+        ?sock:S.t -> ?src:S.address_t -> e:int -> ?d:int -> 
+        ?c:Iom_file.reader_control_t #Iom_gadget.rx -> Iom_gadget.kernel_t ->
+        ('s, listener_signal_t Iom_gadget.rx * connection Iom_gadget.rx)
+            Iom_gadget.t
 end
 
-let is_blocked_ x = List.memq x [ Unix.EAGAIN; Unix.EWOULDBLOCK ]
-let is_blocked_connect_ x = List.memq x [ Unix.EALREADY; Unix.EINPROGRESS ]
+module Create(S: Cf_sock_stream.T) : (T with module S = S) = struct
+    open Cf_cmonad.Op
 
-module Create(S: Cf_sock_stream.T): (T with module S = S) = struct
-    module G = Iom_gadget
-    open Cf_cmonad.Op
-    
     module S = S
     
-    type 's service_aux_t =
-        | Aux_load
-        | Aux_unload
-        | Aux_working of ('s, unit) G.t
-        | Aux_final of ('s, unit) G.t
+    class connection more socket =
+        object(_:#Iom_file.event)
+            method more = more
+            method socket: S.t = socket
+        end
+
+    let connection_ more socket tx =
+        let tx = (tx :> connection Iom_gadget.tx) in
+        tx#put (new connection more socket)
     
-    class virtual file_aux ctrlTx rwx e =
-        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
-        let e = (e :> S.basic) in
+    let unspecified_ = S.P.AF.of_sockaddr S.P.AF.unspecified
+
+    class connector
+      ?(sock = S.create ()) ?(src = unspecified_) ~dst ?c ~s ~tx:outTx k =
+        let outTx = (outTx :> connection Iom_gadget.tx) in
+        let src = S.P.AF.to_sockaddr src in
+        let dst = S.P.AF.to_sockaddr dst in
         object(self:'self)
-            inherit [(unit, unit) G.t] Cf_poll.file rwx e#fd
+            inherit [Iom_file.control_t, Iom_file.signal_t, 'state]
+                Iom_socket.core ?c ~s ~rwx:`W sock k as super
+            
+            method private service _ =
+                `Final begin
+                    match
+                        try
+                            Cf_socket.connect sock dst;
+                            None
+                        with
+                        | Unix.Unix_error (Unix.EISCONN, _, _) -> None
+                        | x -> Some x
+                    with
+                    | None -> connection_ Iom_file.Last sock outTx
+                    | Some x -> Iom_gadget.start (self#error x) self
+                end
+                        
+            method start =
+                try
+                    Cf_socket.connect sock dst;
+                    connection_ Iom_file.Last sock outTx
+                with
+                | Unix.Unix_error (Unix.EINPROGRESS, _, _) ->
+                    Iom_gadget.poll self k;
+                    super#start
+                | x ->
+                    Iom_gadget.start (self#error x) self
+            
+            initializer
+                Cf_socket.bind sock src
+        end
 
-            val mutable limit_ = 0
-            val mutable finalized_ = false
+    type listener_signal_t = [ Iom_file.signal_t | `Bound of S.address_t ]
+
+    let seqput_ acc tx =
+        let tx = (tx :> connection Iom_gadget.tx) in
+        Cf_seq.C.sequence (Cf_seq.map tx#put (Cf_seq.of_list (List.rev acc)))
+    
+    type listen_result_t = R_socket of S.t | R_again | R_error of exn
+
+    class listener
+      ?(sock = S.create ()) ?(src = unspecified_) ~e ?(d = e) ?c ~s:sigTx
+      ~tx:outTx k =
+        let outTx = (outTx :> connection Iom_gadget.tx) in
+        let src = S.P.AF.to_sockaddr src in
+        object(self)
+            inherit [Iom_file.reader_control_t, listener_signal_t, 'state]
+                Iom_socket.core ?c ~s:sigTx ~rwx:`R sock k as super
             
-            method virtual private service_aux_: 's. 's service_aux_t
+            method private pending p acc limit =
+                if limit > 0 then begin
+                    match
+                        try
+                            let sock, _ = Cf_socket.accept sock in
+                            R_socket sock
+                        with
+                        | Unix.Unix_error (code, _, _) when self#again code ->
+                            R_again
+                        | x ->
+                            R_error x
+                    with
+                    | R_socket sock ->
+                        let v = new connection Iom_file.More sock in
+                        self#pending p (v :: acc) (pred limit)
+                    | R_again ->
+                        `Working (p, seqput_ acc outTx)
+                    | R_error x ->
+                        `Final begin
+                            seqput_ acc outTx >>= fun () ->
+                            Iom_gadget.start (self#error x) self
+                        end
+                end
+                else
+                    `Working (p, seqput_ acc outTx)
 
-            method private service_ p =
-                match self#service_aux_ with
-                | Aux_final v -> `Final v
-                | Aux_working v -> `Working (p, v)
-                | _ -> assert (not true); state_
+            method private service p = self#pending p [] d
+            
+            method private bound addr =
+                let sigTx = (sigTx :> listener_signal_t Iom_gadget.tx) in
+                sigTx#put (`Bound addr)
+            
+            method private control = function
+                | `Wait ->
+                    self#unload;
+                    self#next
+                | `Ready ->
+                    Iom_gadget.poll self k;
+                    self#next
+                | c ->
+                    super#control c
 
-            method limit: 's. int -> ('s, unit) G.t = fun n ->
-                if finalized_ then
-                    Cf_cmonad.return ()
-                else begin
-                    let n0 = limit_ in
-                    limit_ <- n;
-                    if n <= 0 && n0 > 0 then
-                        ctrlTx#put G.C_unload
-                    else if n > 0 && n0 <= 0 then begin
-                        match self#service_aux_ with
-                        | Aux_load ->
-                            ctrlTx#put G.C_load
-                        | Aux_unload ->
-                            ctrlTx#put G.C_unload
-                        | Aux_working v
-                        | Aux_final v ->
-                            if n <= 0 then
-                                v
-                            else
-                                v >>= fun () ->
-                                ctrlTx#put G.C_load
-                    end
-                    else
-                        Cf_cmonad.return ()
-                end
+            method start =
+                try
+                    Cf_socket.bind sock src;
+                    Cf_socket.listen sock e;
+                    let addr = Cf_socket.getsockname sock in
+                    let addr = S.P.AF.of_sockaddr addr in
+                    Iom_gadget.start (self#bound addr) self >>= fun () ->
+                    super#start
+                with
+                | x ->
+                    Iom_gadget.start (self#error x) self
             
-            method extend: 's. int -> ('s, unit) G.t = fun n ->
-                self#limit (limit_ + n)
-            
-            method finalize: 's. unit -> ('s, unit) G.t = fun () ->
-                finalized_ <- true;
-                ctrlTx#put G.C_unload >>= fun () ->
-                ctrlTx#put G.C_final
-            
-            method is_finished = finalized_
-        end
-    
-    type connection_t = S.t * S.address_t * S.address_t
-
-    type initiator_rx_t =
-        | I_connect of connection_t
-        | I_error of exn
-
-    type listener_tx_t =
-        | L_limit of int
-        | L_extend of int
-        | L_close
-    
-    type listener_rx_t =
-        | L_bind of S.address_t
-        | L_connect of connection_t
-        | L_error of exn
-    
-    type try_signaling_t =
-        | Try_sig_connect of connection_t
-        | Try_sig_error of exn
-        | Try_sig_pending
-    
-    let i_cons_connection_ ~s =
-        let laddr = s#getsockname in
-        let sock = s#socket in
-        let raddr = S.P.AF.of_sockaddr (Cf_socket.getpeername sock) in
-        Try_sig_connect (sock, laddr, raddr)
-
-    let i_try_connect_ ~s =
-        try
-            s#connect;
-            i_cons_connection_ ~s
-        with
-        | Unix.Unix_error (code, _, _) when is_blocked_connect_ code ->
-            let error = Unix.error_message code in
-            Try_sig_pending
-        | error ->
-            Try_sig_error error
-    
-    let i_try_connect_0_ ~s =
-        match i_try_connect_ ~s with
-        | Try_sig_error (Unix.Unix_error (Unix.EISCONN, _, _)) ->
-            (try i_cons_connection_ ~s with x -> Try_sig_error x)
-        | v ->
-            v
-    
-    class i_poll ~ic s =
-        let ic = (ic :> initiator_rx_t G.tx) in
-        let s = (s :> S.initiator) in
-        object
-            inherit [(unit, unit) G.t] Cf_poll.file `W s#fd
-                                    
-            method private service_ _ =
-                match i_try_connect_0_ ~s with
-                | Try_sig_connect connection ->
-                    `Final (ic#put (I_connect connection))
-                | Try_sig_error error ->
-                    `Final (ic#put (I_error error))
-                | Try_sig_pending ->
-                    state_
-        end
-    
-    let i_cancel_rx_ c () =
-        let c = (c :> G.control_t G.tx) in
-        c#put G.C_unload >>= fun () ->
-        c#put G.C_final
-
-    let i_cancel_ ~cancel ~c =
-        let cancel = (cancel :> unit G.rx) in
-        let c = (c :> G.control_t G.tx) in
-        G.start (c#put G.C_load) () >>= fun () ->
-        G.start (G.guard (cancel#get (i_cancel_rx_ c))) ()
-    
-    let initiator ~c ?cancel ?sock ?src addr k =
-        let ic = (c :> initiator_rx_t G.tx) in
-        try
-            let s = new S.initiator ?sock ?src addr in
-            Unix.set_nonblock s#fd;
-            match i_try_connect_ ~s with
-            | Try_sig_error error ->
-                ic#put (I_error error)
-            | Try_sig_connect connection ->
-                ic#put (I_connect connection)
-            | Try_sig_pending ->
-                let e = new i_poll ~ic s in
-                match cancel with
-                | None ->
-                    G.poll ~e k
-                | Some cancel ->
-                    G.simplex >>= fun (rx, tx) ->
-                    G.poll ~c:rx ~e k >>= fun () ->
-                    i_cancel_ ~cancel ~c:tx
-        with
-        | x ->
-            G.start (ic#put (I_error x)) ()
-        
-    let l_try_accept_ ~s =
-        let s = (s :> S.listener) in
-        try
-            let snew, raddr = s#accept in
-            let laddr = S.P.AF.of_sockaddr (Cf_socket.getsockname snew) in
-            Try_sig_connect (snew, laddr, raddr)
-        with
-        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
-            Try_sig_pending
-        | error ->
-            Try_sig_error error
-            
-    class l_poll ~lr ctrlTx s =
-        let lr = (lr :> listener_rx_t G.tx) in
-        let s = (s :> S.listener) in
-        object
-            inherit file_aux ctrlTx `R s
-                        
-            method private service_aux_ =
-                if limit_ <= 0 then
-                    Aux_unload
-                else
-                    match l_try_accept_ ~s with
-                    | Try_sig_pending ->
-                        Aux_load
-                    | Try_sig_error error ->
-                        limit_ <- 0;
-                        Aux_final (lr#put (L_error error))
-                    | Try_sig_connect connection ->
-                        limit_ <- pred limit_;
-                        let m = lr#put (L_connect connection) in
-                        if limit_ > 0 then Aux_working m else Aux_final m
+            initializer
+                if d < 1 then invalid_arg "Iom_sock_stream.listener: d < 1";
+                if e < d then invalid_arg "Iom_sock_stream.listener: e < d"
         end
         
-    let l_control_loop_ ~c ~rc =
-        let lc, _ = c in
-        let lc = (lc :> listener_tx_t G.rx) in
-        let rc = (rc :> G.control_t G.tx) in
-        let rec loop () = G.guard (lc#get cmd_rx)
-        and cmd_rx msg =
-            G.load >>= fun (socket, poll) ->
-            let socket = (socket :> S.listener) in
-            let poll = (poll :> l_poll) in
-            match msg with
-            | L_extend n ->
-                poll#extend n >>= loop
-            | L_limit n ->
-                poll#limit n >>= loop
-            | L_close ->
-                socket#close;
-                rc#put G.C_unload >>= fun () ->
-                rc#put G.C_final
-        in
-        fun s ->
-            G.start (loop ()) s
-        
-    let listener ~c ?sock ?qlen addr k =
-        let _, lr = c in
-        let lr = (lr :> listener_rx_t G.tx) in
-        try
-            let s = new S.listener ?sock addr in
-            begin
-                match qlen, sock with
-                | Some n, _ -> s#listen n
-                | None, None -> s#listen 1
-                | _, _ -> ()
-            end;
-            let address = s#getsockname in
-            Unix.set_nonblock s#fd;
-            G.simplex >>= fun (rcRx, rcTx) ->
-            let e = new l_poll ~lr rcTx s in
-            lr#put (L_bind address) >>= fun () ->
-            l_control_loop_ ~c ~rc:rcTx (s, e) >>= fun () ->
-            G.poll ~c:rcRx ~e k
-        with
-        | error ->
-            G.start (lr#put (L_error error)) ()
-    
-    
-    type e_try_data_t =
-        | Try_data_working of int
-        | Try_data_error of exn
-        | Try_data_pending
-    
-    let e_try_recv_ e buf pos len =
-        let e = (e :> S.endpoint) in
-        try
-            let n = e#recv buf pos len in
-            Try_data_working n
-        with
-        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
-            Try_data_pending
-        | error ->
-            Try_data_error error
-    
-    let rec e_msg_buf_pos_len_ = function
-        | _ :: _ :: _ as m ->
-            assert (not true);
-            e_msg_buf_pos_len_ (Cf_message.copy m)
-        | hd :: [] ->
-            hd
-        | [] ->
-            "", 0, 0
-    
-    let e_try_send_ ~s msg =
-        let s = (s :> S.endpoint) in
-        let buf, pos, len = e_msg_buf_pos_len_ msg in
-        try
-            Try_data_working (s#send buf pos len)
-        with
-        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
-            Try_data_pending
-        | error ->
-            Try_data_error error
-    
-    type e_signal_t =
-        | Ev_error of exn
-        | Ev_rcvrel
-        | Ev_txsync
-    
-    class e_rx_poll ctrlTx sigTx upTx e =
-        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
-        let sigTx = (sigTx :> e_signal_t G.tx) in
-        let upTx = (upTx :> endpoint_rx_t G.tx) in
-        object(self:'self)
-            inherit file_aux ctrlTx `R e as super
+    let connect ?sock ?src ~dst ?c k =
+        Iom_gadget.simplex >>= fun (errRx, errTx) ->
+        Iom_gadget.simplex >>= fun (upRx, upTx) ->
+        let i = new connector ?sock ?src ~dst ?c ~s:errTx ~tx:upTx k in
+        i#start >>= fun () ->
+        Cf_cmonad.return (errRx, upRx)
 
-            val mutable buffer_ = String.create 0
-            
-            method private service_aux_ =
-                if limit_ <= 0 then
-                    Aux_unload
-                else begin
-                    if String.length buffer_ < limit_ then
-                        buffer_ <- String.create limit_;
-                    match e_try_recv_ e buffer_ 0 limit_ with
-                    | Try_data_pending ->
-                        Aux_load
-                    | Try_data_error error ->
-                        Aux_final (sigTx#put (Ev_error error))
-                    | Try_data_working n ->
-                        if n = 0 then begin
-                            buffer_ <- "";
-                            limit_ <- 0;
-                            Aux_final (sigTx#put Ev_rcvrel)
-                        end
-                        else begin
-                            let msg = [ String.sub buffer_ 0 n, 0, n ] in
-                            let v = upTx#put (IO_rx_data msg) in
-                            if n < limit_ then begin
-                                limit_ <- limit_ - n;
-                                Aux_working v
-                            end
-                            else begin
-                                limit_ <- 0;
-                                Aux_final v
-                            end
-                        end
-                end
-        end
-    
-    type e_tx_sync_t = Tx_open | Tx_releasing | Tx_unlinking | Tx_finalized
-    
-    class e_tx_poll ctrlTx sigTx upTx e =
-        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
-        let sigTx = (sigTx :> e_signal_t G.tx) in
-        let upTx = (upTx :> endpoint_rx_t G.tx) in
-        object(self:'self)
-            inherit [(unit, unit) G.t] Cf_poll.file `W e#fd
-
-            val mutable queue_ = Cf_deque.nil
-            val mutable sync_ = Tx_open
-            
-            method private service_aux_: 's. 's service_aux_t =
-                let msg = Cf_deque.fold (fun a s -> s :: a) [] queue_ in
-                queue_ <- Cf_deque.nil;
-                let msg = Cf_message.flatten msg in
-                assert (msg <> []);
-                match e_try_send_ e msg with
-                | Try_data_pending ->
-                    queue_ <- Cf_deque.B.push (List.hd msg) queue_;
-                    Aux_load
-                | Try_data_error error ->
-                    Aux_final (sigTx#put (Ev_error error))
-                | Try_data_working n ->
-                    let msg = Cf_message.shift ~pos:n msg in
-                    let v = Cf_cmonad.return () in
-                    match msg with
-                    | [] ->
-                        Aux_final v
-                    | _ ->
-                        queue_ <- Cf_deque.B.push (List.hd msg) queue_;
-                        Aux_working v
-
-            method private service_ p =
-                match self#service_aux_ with
-                | Aux_working v ->
-                    `Working (p, v)
-                | Aux_final v ->
-                    begin
-                        match sync_ with
-                        | Tx_open ->
-                            `Final (upTx#put IO_rx_ready)
-                        | Tx_unlinking ->
-                            `Final (sigTx#put Ev_txsync)
-                        | _ ->
-                            assert (sync_ <> Tx_finalized);
-                            e#shutdown Unix.SHUTDOWN_SEND;
-                            `Final (sigTx#put Ev_txsync)
-                    end
-                | _ ->
-                    assert (not true);
-                    state_
-                        
-            method push: 's. Cf_message.t -> ('s, bool) G.t = fun m ->
-                if m <> [] || sync_ <> Tx_open then begin
-                    queue_ <-
-                        List.fold_left (fun q s -> Cf_deque.A.push s q)
-                        queue_ m;
-                    if state_ <> `Unloaded then
-                        Cf_cmonad.return true
-                    else begin
-                        match self#service_aux_ with
-                        | Aux_load ->
-                            ctrlTx#put G.C_load >>= fun () ->
-                            Cf_cmonad.return false
-                        | Aux_unload ->
-                            assert (not true);
-                            Cf_cmonad.return true
-                        | Aux_working v ->
-                            (* v >>= fun () -> *)
-                            ctrlTx#put G.C_load >>= fun () ->
-                            Cf_cmonad.return false
-                        | Aux_final v ->
-                            (* v >>= fun () -> *)
-                            Cf_cmonad.return true
-                    end
-                end
-                else
-                    Cf_cmonad.return true
-            
-            method finalize: 's. unit -> ('s, unit) G.t = fun () ->
-                assert (state_ = `Unloaded);
-                sync_ <- Tx_finalized;
-                ctrlTx#put G.C_unload >>= fun () ->
-                ctrlTx#put G.C_final
-            
-            method sync: 's. e_tx_sync_t -> ('s, bool) G.t = fun sync ->
-                if state_ <> `Unloaded then begin
-                    sync_ <- sync;
-                    Cf_cmonad.return false
-                end
-                else begin
-                    if sync = Tx_releasing then e#shutdown Unix.SHUTDOWN_SEND;
-                    self#finalize () >>= fun () ->
-                    Cf_cmonad.return true
-                end
-            
-            method sync_val = sync_
-        end
-    
-    let endpoint ~c s k =
-        let upRx, upTx = c in
-        let upRx = (upRx :> endpoint_tx_t G.rx) in
-        let upTx = (upTx :> endpoint_rx_t G.tx) in
-        let e = new S.endpoint s in
-        Unix.set_nonblock e#fd;
-        let fd: int = Obj.magic e#fd in
-        G.simplex >>= fun (sigRx, sigTx) ->
-        let sigRx = (sigRx :> e_signal_t G.rx) in
-        let sigTx = (sigTx :> e_signal_t G.tx) in
-        G.simplex >>= fun (ctrlRx, ctrlTx) ->
-        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
-        let rxPoll = new e_rx_poll ctrlTx sigTx upTx e in
-        G.poll ~c:ctrlRx ~e:rxPoll k >>= fun () ->
-        G.simplex >>= fun (ctrlRx, ctrlTx) ->
-        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
-        let txPoll = new e_tx_poll ctrlTx sigTx upTx e in
-        G.poll ~c:ctrlRx ~e:txPoll k >>= fun () ->
-        let rec loop () =
-            G.guard begin
-                upRx#get do_upRx_ >>= fun () ->
-                sigRx#get do_sigRx_
-            end
-        and do_upRx_ = function
-            | IO_tx_data msg ->
-                txPoll#push msg >>= fun idle ->
-                if idle then
-                    loop ()
-                else
-                    upTx#put IO_rx_blocked >>= loop
-            | IO_tx_release ->
-                txPoll#sync Tx_releasing >>= fun idle ->
-                if idle && rxPoll#is_finished then begin
-                    e#close;
-                    upTx#put IO_rx_closed
-                end
-                else
-                    loop ()
-            | IO_tx_extend n ->
-                rxPoll#extend n >>= loop
-            | IO_tx_window n ->
-                rxPoll#limit n >>= loop
-            | IO_tx_reset ->
-                rxPoll#finalize () >>= fun () ->
-                txPoll#finalize () >>= fun () ->
-                e#shutdown Unix.SHUTDOWN_ALL;
-                e#close;
-                Cf_cmonad.return ()
-            | IO_tx_unlink ->
-                rxPoll#finalize () >>= fun () ->
-                txPoll#sync Tx_unlinking >>= fun idle ->
-                if idle then begin
-                    txPoll#finalize () >>= fun () ->
-                    upTx#put IO_rx_unlinked
-                end
-                else
-                    loop ()
-        and do_sigRx_ = function
-            | Ev_error x ->
-                rxPoll#finalize () >>= fun () ->
-                txPoll#finalize () >>= fun () ->
-                e#shutdown Unix.SHUTDOWN_ALL;
-                e#close;
-                upTx#put (IO_rx_error x)
-            | Ev_rcvrel ->
-                rxPoll#finalize () >>= fun () ->
-                begin
-                    match txPoll#sync_val with
-                    | Tx_open
-                    | Tx_unlinking ->
-                        upTx#put IO_rx_release >>= loop
-                    | Tx_releasing ->
-                        loop ()
-                    | Tx_finalized ->
-                        e#close;
-                        upTx#put IO_rx_closed
-                end
-            | Ev_txsync ->
-                match txPoll#sync_val with
-                | Tx_unlinking ->
-                    rxPoll#finalize () >>= fun () ->
-                    upTx#put IO_rx_unlinked
-                | sync ->
-                    assert (sync <> Tx_open);
-                    loop ()
-        in
-        G.start (loop ()) ()
+    let listen ?sock ?src ~e ?d ?c k =
+        Iom_gadget.simplex >>= fun (sigRx, sigTx) ->
+        Iom_gadget.simplex >>= fun (upRx, upTx) ->
+        let s = new listener ?sock ?src ~e ?d ?c ~s:sigTx ~tx:upTx k in
+        s#start >>= fun () ->
+        Cf_cmonad.return (sigRx, upRx)
 end
 
 (*--- End of File [ iom_sock_stream.ml ] ---*)

iom/iom_sock_stream.mli

 (*---------------------------------------------------------------------------*
-  INTERFACE  iom_sock_stream.mli
+  INTERFACE  iom_x_sock_stream.mli
 
-  Copyright (c) 2003-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(** Reactor for SOCK_STREAM sockets. *)
+(** Reactors for reading/writing on sockets of the [`SOCK_STREAM] type. *)
 
 (** {6 Overview}
 
-    This module implements a functorial interface for constructing reactors
-    for I/O on [SOCK_STREAM] sockets.  The {!Iom_tcp4_socket} and
-    {!Iom_tcp6_socket} modules are specializations for TCP sockets (IPv4 and
-    IPv6, respectively).
-    
-    A {!Iom_sock_stream.T} module defines three reactor functions:
-    - [initiator]: create an endpoint by connecting to a remote listener;
-    - [listener]: create endpoints by listening for remote connectors;
-    - [endpoint]: input, output and shutdown on established endpoints.
-    
-    The types of events used with the [initiator] and [listener] reactors are
-    specialized for each type of socket.  The types of events used with the
-    [endpoint] reactor are common to all [SOCK_STREAM] reactors.
+    This module implements classes and convenience functions for asynchronously
+    connecting, actively and/or passively, with connection-oriented sockets,
+    e.g. TCP and AF_LOCAL stream sockets.
 *)
 
-(** {6 Common Types}
-
-    The [endpoint_tx_t] and [endpoint_rx_t] types are defined in common for
-    use with all [endpoint] reactors.
-*)
-
-(** The type of events for sending to [endpoint] reactors. *)
-type endpoint_tx_t =
-    | IO_tx_data of Cf_message.t	(** send data to peer *)
-    | IO_tx_release                 (** initiate orderly release *)
-    | IO_tx_extend of int           (** extend application receive window *)
-    | IO_tx_window of int           (** set application receive window *)
-    | IO_tx_reset                   (** abortive close *)
-    | IO_tx_unlink                  (** flush output and stop input *)
-
-(** The type of events for receiving from [endpoint] reactors. *)
-type endpoint_rx_t =
-    | IO_rx_error of exn            (** socket closed on error condition *)
-    | IO_rx_data of Cf_message.t	(** data received from peer *)
-    | IO_rx_release                 (** peer initiated orderly release *)
-    | IO_rx_blocked                 (** transmit flow control *)
-    | IO_rx_ready                   (** ready for transmit *)
-    | IO_rx_closed                  (** orderly close complete *)
-    | IO_rx_unlinked                (** output flushed and input stopped *)
-
-(** {6 Functorial Interface} *)
-
-(** The signature of modules produced by the [Create] functor below.  Modules
-    of this type define the I/O reactors for a specialization of [SOCK_STREAM]
-    sockets.
-*)
+(** The signature of modules produced by the {!Create} functor (see below). *)
 module type T = sig
-
-    (** The specific socket module used as input to the functor. *)
+    (** The signature of the module defining the type and functions of the
+        particular variety of [`SOCK_STREAM] sockets used to construct this
+        reactor module.
+    *)
     module S: Cf_sock_stream.T
     
-    (** The type of connection establishment events is a triple composed of the
-        following three values:
-        - A socket object
-        - The local address of the endpoint
-        - The remote address of the endpoint
+    (** A subclass of {!Iom_file.event} for indicating a successful connection
+        is established.
     *)
-    type connection_t = S.t * S.address_t * S.address_t
+    class connection:
+        Iom_file.more_t -> S.t ->
+        object
+            (** Subclass of core event class. *)
+            inherit Iom_file.event
+            
+            (** The socket for the newly established connection. *)
+            method socket: S.t
+        end
+    
+    (** Use [new connector ?sock ?src ~dst ?c ~s ~tx k] to create a state
+        machine that asynchronously attempts to establish a connection within
+        the context of the kernel [k], optionally using the socket [?sock]
+        bound to the source address [?src], to the destination address [dst].
+        Control and notification events for the reactor are sent and received
+        on the [?c] and [s] wires.  If a connection is successfully
+        established, it will be sent on the [tx] wire.
+        
+        The [start] method attempts initiates the connection before starting
+        the reactor.  The [service] method attempts to complete the connection.
+    *)
+    class connector:
+        ?sock:S.t -> ?src:S.address_t -> dst:S.address_t ->
+        ?c:Iom_file.control_t #Iom_gadget.rx -> 
+        s:Iom_file.signal_t #Iom_gadget.tx -> tx:connection #Iom_gadget.tx ->
+        Iom_gadget.kernel_t ->
+        object('self)
+            (** Subclass from the socket core reactor class. *)
+            inherit [Iom_file.control_t, Iom_file.signal_t, 'state]
+                Iom_socket.core
+            
+            (** The [service] method required by the {!Cf_poll.file} class. *)
+            method private service: Cf_poll.t -> 'state
+        end
 
-    (** The type of events to receive from the [initiator] reactor. *)
-    type initiator_rx_t =
-        | I_connect of connection_t     (** Connection established. *)
-        | I_error of exn                (** Error trying to connect. *)
+    (** Notifications from the listener reactor. *)
+    type listener_signal_t = [
+        | Iom_file.signal_t         (** Ordinary signal events. *)
+        | `Bound of S.address_t     (** The source address of the listener. *)
+    ]
+
+    (** Use [new listener ?sock ?src ~e ?d ?c ~s ~tx k] to create a state
+        machine that listens within the context of the kernel [k] and
+        asynchronously accepts connections.  The listener is bound in the
+        [start] method to an arbitrarily chosen transport address unless the
+        source address is explicitly specified with [?src].  A new socket is
+        created unless it is explicitly provided with [?sock].  The listener
+        backlog is set to [e].  The maximum number of connections that will be
+        drained from the listener at each ready event is [d] (the default is to
+        drain the entire backlog).  Control and notification events are sent
+        and received on the [?c] and [s] wires.  Accepted connections are sent
+        on the [tx] wire.
+    *)
+    class listener:
+        ?sock:S.t -> ?src:S.address_t -> e:int -> ?d:int ->
+        ?c:Iom_file.reader_control_t #Iom_gadget.rx ->
+        s:listener_signal_t #Iom_gadget.tx ->
+        tx:connection #Iom_gadget.tx -> Iom_gadget.kernel_t ->
+        object('self)
+            (** Subclass from the socket core reactor class. *)
+            inherit [Iom_file.reader_control_t, listener_signal_t, 'state]
+                Iom_socket.core
+
+            (** This method is invoked in the [start] method to send [`Bound
+                addr] on the wire [s] when the listener socket is successfully
+                bound to a source address.
+            *)
+            method private bound: S.address_t -> ('self, unit) Iom_gadget.t
+            
+            (** A recursive method invoked in the [service] method to drain the
+                listener socket of pending connections and return the
+                appropriate state to the {!Cf_poll.event} base class.
+            *)
+            method private pending:
+                Cf_poll.t -> connection list -> int -> 'state
+            
+            (** The [service] method required by the {!Cf_poll.file} class. *)
+            method private service: Cf_poll.t -> 'state
+        end
     
-    (** Use [initiator ~c ?cancel ?sock ?src addr k] to start a reactor that
-        will attempt to connect to the address [addr] with the kernel [k] and
-        send its results on the control transmitter [c].  If the [?cancel]
-        receiver is specified, then the connection can be aborted by sending a
-        unit value on the corresponding transmitter.  The reactor uses the
-        socket [?sock] if provided, otherwise it constructs a new socket
-        implicitly.  The socket will be bound to the address [?src] if
-        provided, otherwise the socket is bound to the unspecified address.
-        
-        The reactor terminates after sending an event or after receiving a
-        cancel event.  The socket is closed on termination of the reactor if
-        the socket was implicitly created when the reactors was started and a
-        connection was not successfully established.
+    (** Use [connect ?sock ?src ~dst ?c k >>= fun (sigRx, connRx) -> ...] to
+        attempt a connection in the context of the kernel [k] to the
+        destination address [dst].  The [?sock] and [?src] parameters can be
+        used to specify a specific socket to use and source address to bind.
+        Notifications should be received on the [sigRx] wire.  On a successful
+        connection, the event will be received on the [connRx] wire.
     *)
-    val initiator:
-        c:initiator_rx_t #Iom_gadget.tx -> ?cancel:unit #Iom_gadget.rx ->
-        ?sock:S.t -> ?src:S.address_t -> S.address_t -> Iom_gadget.kernel_t ->
-        ('s, unit) Iom_gadget.t
-    
-    (** The type of events to send to the [listener] reactor. *)
-    type listener_tx_t =
-        | L_limit of int    (** Set the maximum of accepted connections. *)
-        | L_extend of int   (** Extend the maximum of accepted connections. *)
-        | L_close           (** Close the listener socket. *)
-    
-    (** The type of events to receive from the [listener] reactor. *)
-    type listener_rx_t =
-        | L_bind of S.address_t         (** Notification of local address. *)
-        | L_connect of connection_t     (** Connection established. *)
-        | L_error of exn                (** Error listening for connections. *)
-    
-    (** Use [listener ~c ?sock ?qlen addr k] to start a reactor that binds to
-        the local address [addr] and uses the kernel [k] to listen for and
-        accept new connections from remote initiators.  The duplex [c] is a
-        coupling of a control receiver and a control transmitter.  The reactor
-        uses the socket [?sock] if provided, otherwise it constructs a new
-        socket implicitly.  The backlog is optionally specified with the
-        [?qlen] parameter.
-        
-        The reactor closes the socket and terminates after receiving the
-        [L_close] event or after transmitting the [L_error] event.  An [L_bind]
-        event is always (and only) the first event transmitted by the reactor.
-    *)
-    val listener:
-        c:(listener_tx_t #Iom_gadget.rx * listener_rx_t #Iom_gadget.tx) ->
-        ?sock:S.t -> ?qlen:int -> S.address_t -> Iom_gadget.kernel_t ->
-        ('s, unit) Iom_gadget.t
-    
-    (** Use [endpoint ~c sock k] to start a reactor that performs raw I/O on
-        the socket [sock] with the kernel [k], communicating on the control
-        duplex [c] with its controller.
-        
-        The reactor terminates after sending an [IO_rx_error], [IO_rx_closed]
-        or [IO_rx_unlinked] event, or after receiving an [IO_tx_reset] event.
-        The socket is closed with the reactors terminates, except when the
-        closing event is [IO_rx_unlinked].
-    *)
-    val endpoint:
-        c:(endpoint_tx_t #Iom_gadget.rx * endpoint_rx_t #Iom_gadget.tx) -> 
-        S.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+    val connect:
+        ?sock:S.t -> ?src:S.address_t -> dst:S.address_t ->
+        ?c:Iom_file.control_t #Iom_gadget.rx -> Iom_gadget.kernel_t ->
+        ('s, Iom_file.signal_t Iom_gadget.rx * connection Iom_gadget.rx)
+            Iom_gadget.t
+
+    (** Use [listen ~e ?d ?sock ?src *)
+    val listen:
+        ?sock:S.t -> ?src:S.address_t -> e:int -> ?d:int -> 
+        ?c:Iom_file.reader_control_t #Iom_gadget.rx -> Iom_gadget.kernel_t ->
+        ('s, listener_signal_t Iom_gadget.rx * connection Iom_gadget.rx)
+            Iom_gadget.t
 end
 
-(** The functor that produces a module of type [T]. *)
+(** Use [Create(S)] to specialize a socket reactor for a specific type of
+    [`SOCK_STREAM] module [S].
+*)
 module Create(S: Cf_sock_stream.T): T with module S = S
 
-(*--- End of File [ iom_sock_stream.mli ] ---*)
+(*--- End of File [ iom_x_sock_stream.mli ] ---*)

iom/iom_tcp4_socket.ml

 (*---------------------------------------------------------------------------*
   IMPLEMENTATION  iom_tcp4_socket.ml
 
-  Copyright (c) 2003-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without

iom/iom_tcp4_socket.mli

 (*---------------------------------------------------------------------------*
   INTERFACE  iom_tcp4_socket.mli
 
-  Copyright (c) 2003-2005, James H. Woodyatt
+  Copyright (c) 2005, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
 end
 *)
 
-module T1 = struct
+module F1(R: Iom_sock_stream.T) = struct
     module G = Iom_gadget
     open Cf_cmonad.Op
 
-    module SR = Iom_sock_stream
-    module TCP_s = Cf_tcp4_socket
-    module TCP_r = Iom_tcp4_socket
-    module INADDR = Cf_ip4_addr
-    let any_address = Cf_ip4_addr.any
-    
-    (*
-    let xtag = "[T1.test] "
-    
-    let xprintf fmt =
-        print_string xtag;
-        Printf.printf fmt
-    
-    let xsprintf (fmt : ('a, unit, string) format) =
-        Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
-    *)
-    
     type top_event_t = [
         | `Error of exn
-        | `Listen of TCP_s.address_t
-        | `Connect of TCP_s.t
+        | `Listen of R.S.address_t
+        | `Connect of R.S.t
         | `Receive
     ]
         
     let initiator_ ~top addr k =
         let top = (top :> top_event_t G.tx) in
-        G.simplex >>= fun (rx, tx) ->
-        TCP_r.initiator ~c:tx addr k >>= fun () ->
-        let rx = (rx :> TCP_r.initiator_rx_t G.rx) in
+        R.connect ~dst:addr k >>= fun (sigRx, eventRx) ->
+        let sigRx = (sigRx :> Iom_file.signal_t G.rx) in
+        let eventRx = (eventRx :> R.connection G.rx) in
         let loop () =
             G.guard begin
-                rx#get begin function
-                | TCP_r.I_connect (socket, _, _) -> top#put (`Connect socket)
-                | TCP_r.I_error error -> top#put (`Error error)
+                eventRx#get begin fun c ->
+                    top#put (`Connect c#socket)
+                end >>= fun () ->
+                sigRx#get begin fun (`Error x) ->
+                    top#put (`Error x)
                 end
             end
         in
     
     let listener_ ~top k =
         let top = (top :> top_event_t G.tx) in
-        let address = (any_address, 0 :> TCP_s.address_t) in
-        G.duplex >>= fun ((lr, lc), c) ->
-        TCP_r.listener ~c address k >>= fun () ->
-        let lr = (lr :> TCP_r.listener_rx_t G.rx) in
-        let lc = (lc :> TCP_r.listener_tx_t G.tx) in
+        G.simplex >>= fun (ctrlRx, ctrlTx) ->
+        R.listen ~c:ctrlRx ~e:1 k >>= fun (listenRx, eventRx) ->
+        let listenRx = (listenRx :> R.listener_signal_t G.rx) in
+        let eventRx = (eventRx :> R.connection G.rx) in
+        let ctrlTx = (ctrlTx :> Iom_file.reader_control_t G.tx) in
+        ctrlTx#put `Ready >>= fun () ->
         let rec loop () =
             G.guard begin
-                lr#get begin function
-                | TCP_r.L_error error ->
-                    top#put (`Error error)
-                | TCP_r.L_connect (socket, _, _) ->
-                    lc#put TCP_r.L_close >>= fun () ->
-                    top#put (`Connect socket)
-                | TCP_r.L_bind address ->
-                    lc#put (TCP_r.L_limit 1) >>= fun () ->
-                    top#put (`Listen address) >>= loop
-                end
+                eventRx#get do_eventRx_ >>= fun () ->
+                listenRx#get do_listenRx_
             end
+        and do_eventRx_ c =
+            top#put (`Connect c#socket) >>= fun () ->
+            ctrlTx#put `Close
+        and do_listenRx_ = function
+            | `Error x ->
+                top#put (`Error x)
+            | `Bound addr ->
+                top#put (`Listen addr) >>= loop
         in
         G.start (loop ()) ()
     
-    let exchange_ ~top msg socket k =
+    let exchange_ ~top msg sock k =
         let top = (top :> top_event_t G.tx) in
-        G.duplex >>= fun ((er, ec), c) ->
-        TCP_r.endpoint ~c socket k >>= fun () ->
-        let ec = (ec :> SR.endpoint_tx_t G.tx) in
         let rxmsg, txmsg = msg in
-        let initM =
-            ec#put (SR.IO_tx_extend (succ (String.length rxmsg))) >>= fun () ->
-            ec#put (SR.IO_tx_data (Cf_message.create txmsg))
+        Iom_socket.endpoint ~e:(succ (String.length rxmsg)) sock k >>= fun a ->
+        let (wCtrlRx, rCtrlTx), (inRx, outTx) = a in
+        let wCtrlRx = (wCtrlRx :> Iom_file.writer_signal_t G.rx) in
+        let rCtrlTx = (rCtrlTx :> Iom_file.reader_control_t G.tx) in
+        let inRx = (inRx :> Iom_file.octets G.rx) in
+        let outTx = (outTx :> Iom_file.octets G.tx) in
+        let txm = Cf_message.create txmsg in
+        rCtrlTx#put `Ready >>= fun () ->
+        outTx#put (new Iom_file.octets Iom_file.More txm) >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                inRx#get do_inRx_ >>= fun () ->
+                wCtrlRx#get do_wCtrlRx_
+            end
+        and do_inRx_ rxe =
+            let str = Cf_message.contents rxe#octets in
+            match rxe#more with
+            | Iom_file.More ->
+                if str <> rxmsg then failwith "Unexpected message received.";
+                outTx#put (new Iom_file.octets Iom_file.Last []) >>= loop
+            | Iom_file.Last ->
+                top#put `Receive
+        and do_wCtrlRx_ v =
+            match v with
+            | `Error x ->
+                top#put (`Error x)
+            | `Ready
+            | `Wait ->
+                loop ()
+            | `Final ->
+                assert (not true);
+                Cf_cmonad.return ()
         in
-        G.start initM () >>= fun () ->
-        let er = (er :> SR.endpoint_rx_t G.rx) in
-        let rec loop () = G.guard (er#get rx_endpoint)
-        and rx_endpoint = function
-            | SR.IO_rx_data m ->
-                let str = Cf_message.contents m in
-                if str <> rxmsg then
-                    failwith "Unexpected message received."
-                else
-                    ec#put SR.IO_tx_release >>= loop
-            | SR.IO_rx_error error ->
-                top#put (`Error error)
-            | SR.IO_rx_closed ->
-                top#put `Receive
-            | _ ->
-                loop ()
-        in
-        G.start (loop ()) 0
+        G.start (loop ()) ()
     
     type state_t =
         | S_init
         listener_ ~top:topTx k >>= fun () ->
         let rec loop () = G.guard begin
             topRx#get begin function
-            | `Error error ->
-                raise error
+            | `Error x ->
+                raise x
             | `Listen addr ->
                 initiator_ ~top:topTx addr k >>= loop
             | `Connect socket ->
             raise x
 end
 
-module T2 = struct
-    module G = Iom_gadget
-    open Cf_cmonad.Op
-
-    module SR = Iom_sock_stream
-    module TCP_s = Cf_tcp6_socket
-    module TCP_r = Iom_tcp6_socket
-    module INADDR = Cf_ip6_addr
-    let any_address = Cf_ip6_addr.unspecified
-    
-    (*
-    let xtag = "[T2.test] "
-    
-    let xprintf fmt =
-        print_string xtag;
-        Printf.printf fmt
-    
-    let xsprintf (fmt : ('a, unit, string) format) =
-        Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
-    *)
-    
-    type top_event_t = [
-        | `Error of exn
-        | `Listen of TCP_s.address_t
-        | `Connect of TCP_s.t
-        | `Receive
-    ]
-        
-    let initiator_ ~top addr k =
-        let top = (top :> top_event_t G.tx) in
-        G.simplex >>= fun (rx, tx) ->
-        TCP_r.initiator ~c:tx addr k >>= fun () ->
-        let rx = (rx :> TCP_r.initiator_rx_t G.rx) in
-        let loop () = G.guard begin
-            rx#get begin function
-            | TCP_r.I_connect (socket, _, _) -> top#put (`Connect socket)
-            | TCP_r.I_error error -> top#put (`Error error)
-            end
-        end in
-        G.start (loop ()) ()
-    
-    let listener_ ~top k =
-        let top = (top :> top_event_t G.tx) in
-        let address = (any_address, 0, 0l :> TCP_s.address_t) in
-        G.duplex >>= fun ((lr, lc), c) ->
-        TCP_r.listener ~c address k >>= fun () ->
-        let lr = (lr :> TCP_r.listener_rx_t G.rx) in
-        let lc = (lc :> TCP_r.listener_tx_t G.tx) in
-        let rec loop () = G.guard begin
-            lr#get begin function
-            | TCP_r.L_error error ->
-                top#put (`Error error)
-            | TCP_r.L_connect (socket, _, _) ->
-                lc#put TCP_r.L_close >>= fun () ->
-                top#put (`Connect socket)
-            | TCP_r.L_bind address ->
-                lc#put (TCP_r.L_limit 1) >>= fun () ->
-                top#put (`Listen address) >>= loop
-            end
-        end in
-        G.start (loop ()) ()
-    
-    let exchange_ ~top msg socket k =
-        let top = (top :> top_event_t G.tx) in
-        G.duplex >>= fun ((er, ec), c) ->
-        TCP_r.endpoint ~c socket k >>= fun () ->
-        let ec = (ec :> SR.endpoint_tx_t G.tx) in
-        let rxmsg, txmsg = msg in
-        let initM =
-            ec#put (SR.IO_tx_extend (succ (String.length rxmsg))) >>= fun () ->
-            ec#put (SR.IO_tx_data (Cf_message.create txmsg))
-        in
-        G.start initM () >>= fun () ->
-        let er = (er :> SR.endpoint_rx_t G.rx) in
-        let rec loop () = G.guard (er#get rx_endpoint)
-        and rx_endpoint = function
-            | SR.IO_rx_data m ->
-                let str = Cf_message.contents m in
-                if str <> rxmsg then
-                    failwith "Unexpected message received."
-                else
-                    ec#put SR.IO_tx_release >>= loop
-            | SR.IO_rx_error error ->
-                top#put (`Error error)
-            | SR.IO_rx_closed ->
-                top#put `Receive
-            | _ ->
-                loop ()
-        in
-        G.start (loop ()) 0
-    
-    type state_t =
-        | S_init
-        | S_connecting
-        | S_receiving of int
-        | S_done
-    
-    let okay = ref false
-    
-    let reactor_ k =
-        G.wire >>= fun top ->
-        let topRx = new G.rx top in
-        let topTx = new G.tx top in
-        listener_ ~top:topTx k >>= fun () ->
-        let rec loop () = G.guard begin
-            topRx#get begin function
-            | `Error error ->
-                raise error
-            | `Listen addr ->
-                initiator_ ~top:topTx addr k >>= loop
-            | `Connect socket ->
-                G.load >>= begin function
-                | S_init ->
-                    G.store S_connecting >>= fun () ->
-                    exchange_ ~top:topTx ("foo", "bar") socket k >>= loop
-                | S_connecting ->
-                    G.store (S_receiving 2) >>= fun () ->
-                    exchange_ ~top:topTx ("bar", "foo") socket k >>= loop
-                | _ ->
-                    failwith "toploop_: `Connect state error"
-                end
-            | `Receive ->
-                G.load >>= begin function
-                | S_receiving n when n > 0 ->
-                    let n = pred n in
-                    let s =
-                        if n > 0 then
-                            S_receiving n
-                        else begin
-                            okay := true;
-                            S_done
-                        end
-                    in
-                    G.store s >>= loop
-                | _ ->
-                    failwith "toploop_: `Receive state error"
-                end
-            end
-        end in
-        G.start (loop ()) S_init
-    
-    let test () =
-        try
-            G.run reactor_ ();
-            if not !okay then failwith "reactor: processing completed early."
-        with
-        | Unix.Unix_error (error, fname, arg) ->
-            let msg =
-                let error = Unix.error_message error in
-                Printf.sprintf "Unix error \"%s\" in %s(%s).\n" error fname arg
-            in
-            failwith msg
-        | x ->
-            raise x
-end
+module T1 = F1(Iom_tcp4_socket)
+module T2 = F1(Iom_tcp6_socket)
 
 let main () =
     let tests = [
     open Cf_cmonad.Op
 
     module G = Iom_gadget
-    module SR = Iom_sock_stream
     module TCP = Iom_tcp6_socket
     module IP6 = Cf_ip6_addr
     module IP4 = Cf_ip6_addr
         Printf.sprintf "%s%s/%u" hostname ifname port
 
     let interrupt_ k =
-        G.simplex >>= fun (sigRx, sigTx) ->
-        let sigTx = (sigTx :> [ `Signal of int ] G.tx) in
         G.simplex >>= fun (ctrlRx, ctrlTx) ->