Commits

jhwoodyatt  committed d673174

Updated with documentation in preparation for 0.3 release.

  • Participants
  • Parent commits 11ae637
  • Tags iom-0_3

Comments (0)

Files changed (16)

 all new again and coded from scratch-- this time with more extensive, but still
 far from exhaustive, testing and regression.
 
+Previous release were rough drafts, and this new one is every bit as much of a
+rough draft as those were, but I feel more confident than before that no major
+overhauls of this design will be required.  Minor design flaws may require
+corrections, and bugs are very likely to be exposed over time with usage, but
+future changes in the 0.xxx version sequence should be incremental, rather than
+total.
+
+All along, I have been aiming for a framework that appears to be as adaptable
+as the venerable STREAMS framework from Unix SVR4.  With this release, I feel
+like I have finally succeeded in that effort.  Only time will tell, however, if
+I am right.
+
 Highlights of the new design:
 
 + Uses new stateless [Cf_gadget] module (from Cf-0.8) with state classes
 + Generalized support for simplex and duplex octet stream processsing layers,
   e.g. framing discipline, packet multiplexing, etc.
 
+See t/t-relay.ml for an undocument example of how to use the new Iom modules.
+
 ===== Version 0.2 =====
 
 This release is a major refactoring of almost all of the interfaces and
 Open issues in development:
 
++ (unspecified): A GODI package would be nice to promote community adoption.
+
 + (unspecified): Support needed for more types of sockets.  Right now, we only
     have support for [AF_INET] and [AF_INET6] address families, and the
     [SOCK_STREAM] type, i.e. TCP for IPv4 and IPv6.  Need support for UDP and
     for [AF_LOCAL] address families.
 
-+ (Iom_reactor_fork): A [fork/wait] reactor would be cool.  While we're
-    dreaming, it would also be nice to have pipes and socketpairs, complete
-    with file descriptor passing.  Oh, and [popen] is quite desirable.
++ (unspecified): A [fork/wait] reactor would be cool.  While we're dreaming, it
+    would also be nice to socketpairs, complete with file descriptor passing.
+    Oh, and [popen] is quite desirable.
 
-+ (t.mirrord): We don't seem to be handling SIGPIPE properly, and we're not
-    sure why we are even getting an EPIPE error condition.
-
-+ (t.mirrorc): Stress testing!
++ (unspecified): It might be helpful to define a core abstraction for switching
+    gadgets, e.g. for multiplexing streams, etc.  More research is in order.
 
 # End of open issues
 
 This library requires the following external components:
 
-- Objective Caml (v3.08.3 or newer)
+- Objective Caml (v3.09 or newer)
 - Findlib (tested with v1.0.4)
-- OCaml NAE Core Foundation (cf-0.7)
+- OCaml NAE Core Foundation (cf-0.8)
 
-Principal development was on Mac OS X 10.3 and Mac OS X 10.4 w/ XCode 2.1 using
-GCC 4.0.  The final version of this library also compiled successfully and
-passed all self-tests without warnings on Suse Linux 9.2 for x86-32.  Other
-platforms with POSIX-like environments should require only a minimal porting
-effort.
+Principal development was on Mac OS X 10.4 w/ XCode 2.3 using GCC 4.0.  No
+other platforms were tested for interoperability.
 
 ===== Building =====
 

File iom/iom_gadget.mli

         method virtual private guard: unit guard
 
         (** Use [obj#next] to transition the state of the machine by applying
-            {!Cf_state_gadget.guard} [self#guard].
+            {!Cf_gadget.guard} [self#guard].
         *)
         method next: 'a. 'a t
     end

File iom/iom_layer.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Stream commutation layers.  *)
+
+(** {6 Overview}
+
+    Classes and functions for commutating I/O event stream gadgets are defined.
+    Both simplex (input and output) and duplex streams are supported.  In
+    addition, convenience functions are provided for creating commutationg
+    layer gadgets from initial state objects.
+*)
+
+(** {6 State Classes} *)
+
+(** The class type describing the common features of both input and output
+    simplex stream states.
+*)
 class type ['xy, 'c, 'n] simplex = object('self)
     inherit ['c, 'n] Iom_stream.notify
     inherit ['c, 'n] Iom_stream.control
     inherit Iom_gadget.start
     constraint 'xy = ('x, 'y) #Iom_stream.transform
     
-    val transform_: 'xy
+    val transform_: 'xy (** A delegated stream transform object. *)
 
+    (** A delegator for the [block] method on the transform object. *)
     method private block: unit Iom_gadget.t
+
+    (** A delegator for the [unblock] method on the transform object. *)
     method private unblock: unit Iom_gadget.t
+
+    (** A delegator for the [ready] method on the transform object. *)
     method private ready: unit Iom_gadget.t
+
+    (** A delegator for the [wait] method on the transform object. *)
     method private wait: unit Iom_gadget.t
 end
 
+(** The class of stream gadget state for input stream commutators.  Control
+    events are constrained to include flow control signals.  Use [inherit
+    \['xy, 'c, 'n\] isimplex xy jack plug] to derive a state class object that
+    delegates stream event flow control to the transform [xy], receives
+    upstream control and sends upstream notifications on [plug], and also sends
+    downstream control and receives downstream notifications on [jack].
+*)
 class ['xy, 'c, 'n] isimplex:
     'xy -> ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
     ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
-    object
-        inherit ['xy, 'c, 'n] simplex
-        constraint 'xy = ('x, 'y) #Iom_stream.transform
-        constraint 'c = [> Iom_stream.flowcontrol ]
-    end
+    ['xy, [> Iom_stream.flowcontrol ] as 'c, 'n] simplex
 
+(** The class of stream gadget state for output stream commutators.
+    Notification events are constrained to include flow control signals.  Use
+    [inherit \['xy, 'c, 'n\] osimplex xy jack plug] to derive a state class
+    object that delegates stream event flow control to the transform [xy],
+    receives upstream control and sends upstream notifications on [plug], and
+    also sends downstream control and receives downstream notifications on
+    [jack].
+*)
 class ['xy, 'c, 'n] osimplex:
     'xy -> ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
     ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
-    object
-        inherit ['xy, 'c, 'n] simplex
-        constraint 'xy = ('x, 'y) #Iom_stream.transform
-        constraint 'n = [> Iom_stream.flownotify ]
-    end
+    ['xy, 'c, [> Iom_stream.flownotify ] as 'n] simplex
 
+(** The class of stream gadget state for full duplex stream commutators.
+    Both control and notification events are constrained to include flow
+    control signals.  Use [inherit \['ixy, 'oxy, 'c, 'n\] duplex ixy oxy jack
+    plug] to derive a state class object that delegates stream event flow
+    control to the input transform [ixy] and the output transform [oxy],
+    receives upstream control and sends upstream notifications on [plug], and
+    also sends downstream control and receives downstream notifications on
+    [jack].
+*)
 class ['ixy, 'oyx, 'c, 'n] duplex:
     'ixy -> 'oyx ->
     ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
         constraint 'ixy = ('ix, 'iy) #Iom_stream.transform
         constraint 'oyx = ('oy, 'ox) #Iom_stream.transform
         
+        (** A delegated input stream transform object. *)
         val itransform_: 'ixy option
+
+        (** A delegated output stream transform object. *)
         val otransform_: 'oyx option
 
+        (** A delegator for [iblock] on the input transform object. *)
         method private iblock: unit Iom_gadget.t
+
+        (** A delegator for [iunblock] on the input transform object. *)
         method private iunblock: unit Iom_gadget.t
+
+        (** A delegator for [iready] on the input transform object. *)
         method private iready: unit Iom_gadget.t
+
+        (** A delegator for [iwait] on the input transform object. *)
         method private iwait: unit Iom_gadget.t
 
+        (** A delegator for [oblock] on the output transform object. *)
         method private oblock: unit Iom_gadget.t
+
+        (** A delegator for [ounblock] on the output transform object. *)
         method private ounblock: unit Iom_gadget.t
+
+        (** A delegator for [oready] on the output transform object. *)
         method private oready: unit Iom_gadget.t
+
+        (** A delegator for [owait] on the output transform object. *)
         method private owait: unit Iom_gadget.t
     end
 
+(** Use [ingest jack cons] to contruct a harness of wires with a new plug and
+    jack for input streams on its respective ends, apply the appropriate wire
+    ends (a control event transmitter, an input receiver and an output
+    transmitter) to [cons], obtain a stream transform object, and use it to
+    construct an [isimplex] object and start the input stream gadget.  The new
+    jack is returned.
+*)
 val ingest:
   ('x, [> Iom_stream.flowcontrol ] as 'c, [> Iom_stream.failed ] as 'n)
     Iom_stream.ijack ->
     ('x, 'y) #Iom_stream.transform) ->
   ('y, 'c, 'n) Iom_stream.ijack Iom_gadget.t
 
+(** Use [render jack cons] to contruct a harness of wires with a new plug and
+    jack for output streams on its respective ends, apply the appropriate wire
+    ends (a notification event transmitter, an input receiver and an output
+    transmitter) to [cons], obtain a stream transform object, and use it to
+    construct an [osimplex] object and start the output stream gadget.  The new
+    jack is returned.
+*)
 val render:
   ('y, [> Iom_stream.stop ] as 'c, [> Iom_stream.flownotify ] as 'n)
     Iom_stream.ojack ->
     ('x, 'y) #Iom_stream.transform) ->
   ('x, 'c, 'n) Iom_stream.ojack Iom_gadget.t
 
+(** Use [duplex jack icons ocons] to contruct a harness of wires with a new
+    plug and jack for output streams on its respective ends, apply the
+    appropriate wire ends to [icons] and [ocons], obtain a pair of stream
+    transform objects, and use them to construct an [duplex] object and start
+    the output stream gadget.  The new jack is returned.
+*)
 val duplex:
     ('xi, 'xo, [> Iom_stream.flowcontrol ] as 'c,
         [> Iom_stream.flownotify ] as 'n) Iom_stream.iojack ->

File iom/iom_octet_stream.ml

             >}
         
         method shift =
-            let s, len = self#emit in
-            self#shift_octets len >>= fun (consume, buffer, mark) ->
-            let obj =
-                match more_ with
-                | Iom_stream.More ->
-                    Some {<
-                        consume_ = consume; buffer_ = buffer; mark_ = mark
-                    >}
-                | Iom_stream.Last ->
-                    None
-            in
-            Cf_cmonad.return (Cf_exnopt.U (s, obj))
+            match
+                try Cf_exnopt.U self#emit with x -> Cf_exnopt.X x
+            with
+            | Cf_exnopt.X _ as x ->
+                Cf_cmonad.return x
+            | Cf_exnopt.U (s, len) ->
+                self#shift_octets len >>= fun (consume, buffer, mark) ->
+                let obj =
+                    match more_ with
+                    | Iom_stream.More ->
+                        Some {<
+                            consume_ = consume; buffer_ = buffer; mark_ = mark
+                        >}
+                    | Iom_stream.Last ->
+                        None
+                in
+                Cf_cmonad.return (Cf_exnopt.U (s, obj))
     end
 
 let x_negative_adjust_ = "Iom_octet_stream.parse: negative adjustment!"

File iom/iom_octet_stream.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Core stream commutation for pipes and stream sockets. *)
+
+(** {6 Overview}
+
+    Pipes and stream sockets are system services that communicate in streams
+    of octets, usually with orderly close.  Pipes are simplex communication
+    channels and sockets are full duplex.  The classes and functions defined
+    here are extensions of the basis defined in {!Iom_stream} for handling
+    such octet streams.
+*)
+
+(** An exception to indicate that a stream transform requires more input before
+    output will be produced.
+*)
 exception Buffering
 
+(** Use [new fragment more message] to create an event object comprising a
+    fragment of the octet stream and an indication of whether the end of the
+    octet stream has been reached yet.
+*)
 class fragment: Iom_stream.more -> Cf_message.t -> object
     inherit Iom_stream.event
+    
+    (** Use [fragment#data] to obtain a message containing the current fragment
+        of the octet stream.
+    *)
     method data: Cf_message.t
 end
 
+(** A record type used for specifying the low and high limit marks for octet
+    stream transforms.  In general, octet stream transforms send [`Wait] when
+    they have buffered more than [high] octets, and send [`Ready] when enough
+    octets are drained that less than [low] octets remain.
+*)
 type limits = { low: int; high: int }
+
+(** Use [normalize_limits lim] to obtain a new limits record with [low]
+    constrained to be a positive integer, and [high] constrained to be greater
+    than or equal to [low].
+*)
 val normalize_limits: limits -> limits
 
+(** Use [inherit \['x, 'y\] transform ~limits flowTx inRx outTx] to define a
+    subclass for transforming streams of ['x] events received at [inRx] into
+    streams of ['y] events transmitted at [outTx].  Conventionally, both ['x]
+    and ['y] are subtypes of [fragment], but this is not required.
+*)
 class virtual ['x, 'y] transform:
     limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
     object
         inherit ['x, 'y] Iom_stream.transform
 
+        (** The pending octet buffer. *)
         val buffer_: Cf_message.substring Cf_deque.t
+        
+        (** The number of octets in the buffer. *)
         val mark_: int
+        
+        (** A flag to specify whether the input stream has ended. *)
         val more_: Iom_stream.more
+        
+        (** The limits record for this transform. *)
         val limits_: limits
         
+        (** Evaluate the result of [self#push_octets message] to obtain the new
+            [consume_] flag, the new [buffer_] and the new [mark_] after
+            pushing the octets in [message].
+        *)
         method private push_octets:
             Cf_message.t ->
             (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
         
+        (** Evaluate the result of [self#shift_octets length] to obtain the new
+            [consume_] flag, the new [buffer_] and the new [mark_] after
+            shifting a maximum of [length] octets from the buffer.
+        *)
         method private shift_octets:
             int -> (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
     end
 
+(** Use [inherit emit ?limits flowTx inRx outTx] to derive an octet stream
+    transform class (for use with {!Iom_layer.osimplex} or
+    {!Iom_layer.duplex}).  Override the [self#fragment] and [self#push] methods
+    in the subtype to implement the discipline for producing events to output.
+*)
 class ['x, 'y] emit:
     ?limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
         inherit ['x, 'y] transform
         constraint 'y = #fragment
 
+        (** Override [self#fragment inEvent] to return the octet stream
+            fragment components of the input event to the basis implementation
+            of [self#push].
+        *)
         method private fragment: 'x -> Iom_stream.more * Cf_message.t
+        
+        (** Override [self#emit] to examine the contents of the current buffer,
+            identify all the output events that can be shifted, and the length
+            of the octets in the buffer to shift.
+        *)
         method private emit: 'y list * int
 
+        (** This method is invoked by [self#consume] to push the input event.
+            It calls [self#fragment] to obtain the [more_] flag and a message
+            fragment, which it applies to [self#push_octets] and returns a
+            new transform state.
+        *)
         method push: 'x -> 'self Iom_gadget.t
+        
+        (** This method is invoked by [self#produce] to shift zero or more
+            output events from the buffer.  It calls [self#emit], then calls
+            [self#shift_octets] with the length obtained, and returns a new
+            transform state.
+        *)
         method shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
     end
 
+(** Use [inherit parse ?limits flowTx inRx outTx] to derive an octet stream
+    transform class (for use with {!Iom_layer.isimplex} or
+    {!Iom_layer.duplex}).  Override the [self#lex] and/or [self#parse] methods
+    in the subtype to implement the discipline for producing events to output.
+*)
 class ['x, 'y] parse:
     ?limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
         inherit ['x, 'y] transform
         constraint 'x = #fragment
 
+        (** Override [self#lex message] to provide a functional parser to
+            [self#parse] that will be applied to the character sequence in
+            [message] to obtain an output event and the number of octets to
+            shift.  The basis implementation always returns the [nil] parser.
+        *)
         method private lex: Cf_message.t -> ('y * int) Cf_lex.t
+        
+        (** Parse the octet buffer by calling [self#lex] in a loop until no
+            more output objects are parsed.
+        *)
         method private parse: ('y list * int) option
 
+        (** This method is invoked by [self#consume] to push the input event.
+            It decomposes the input fragment into its [more_] flag and its
+            octet fragment, which it applies to [self#push_octets] and returns
+            a new transform state.
+        *)
         method push: 'x -> 'self Iom_gadget.t
+        
+        (** This method is invoked by [self#produce] to shift zero or more
+            output events from the buffer.  It calls [self#parse], and if some
+            events were returned, then calls [self#shift_octets] with the
+            length obtained, and returns a new transform state.
+        *)
         method shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
     end
 

File iom/iom_pipe.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Pipes, reading and writing. *)
+
+(** {6 Overview}
+
+    I/O stream gadgets are defined for reading and writing to file descriptors
+    created with {!Unix.pipe}.
+*)
+
+(** A gadget for reading from a pipe. *)
 class reader:
     k:Iom_gadget.kernel -> fd:Unix.file_descr ->
     limits:Iom_octet_stream.limits ->
         Iom_stream.iplug ->
     [Iom_stream.flowcontrol, Iom_stream.failed] Iom_reactor.reader
 
+(** A gadget for writing to a pipe. *)
 class writer:
     k:Iom_gadget.kernel -> fd:Unix.file_descr ->
     (Iom_octet_stream.fragment, Iom_stream.stop, Iom_reactor.notify)
     [Iom_octet_stream.fragment, Iom_stream.stop, Iom_reactor.notify]
         Iom_reactor.writer
 
+(** Evaluate [create ~limits k] to obtain the reader and writer jacks for the
+    I/O stream gadgets connected to each end of a new pipe.
+*)
 val create:
     limits:Iom_octet_stream.limits -> Iom_gadget.kernel ->
     (Iom_reactor.reader_jack * Iom_reactor.writer_jack) Iom_gadget.t

File iom/iom_reactor.ml

         method virtual private source: 'source option
 
         method private service p =
-            match self#source with
-            | None ->
+            let eopt = self#source in
+            match eopt, source_ with
+            | None, true ->
                 Cf_poll.Loaded p
-            | Some event ->
+            | None, false ->
+                Cf_poll.Unloaded
+            | Some event, true ->
                 Cf_poll.Working (p, Iom_gadget.start (sourceTx#put event))
+            | Some event, false ->
+                Cf_poll.Final (Iom_gadget.start (sourceTx#put event))
 
         method private stop =
             if source_ then self#unload;
         | None -> None
 end
 
-class ['control, 'time] timer ~k ~dt plug = object
+class ['control, 'time] timer ~k ?t0 ~dt plug = object
     inherit ['control, 'signal] source ~k plug as super
-    inherit [unit Iom_gadget.t] Cf_poll.time dt
+    inherit [unit Iom_gadget.t] Cf_poll.time ?t0 dt
     constraint 'time = [> time ]
     
     method private source = Some (`Time epoch_)
 
 let signaler ~n k = Iom_gadget.create (new signaler ~k ~n)
 let idler k = Iom_gadget.create (new idler ~k)
-let timer ~dt k = Iom_gadget.create (new timer ~k ~dt)
+let timer ?t0 ~dt k = Iom_gadget.create (new timer ~k ?t0 ~dt)
 
 let retry_needed = function
     | Unix.EAGAIN 

File iom/iom_reactor.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Core I/O reactors. *)
+
+(** {6 Overview}
+
+    The {!Cf_poll} imperative event loop mechanism is lifted into a suite of
+    I/O gadgets.  Compositions may use them to respond to process signals, loop
+    idling conditions, timer events and file descriptor read/write/exception
+    events discoverable through {!Unix.select}.
+*)
+
+(** {6 Event Types} *)
+
+(** A process signal event. *)
 type signal = [ `Signal of int ]
+
+(** Loop idle event. *)
 type idle = [ `Idle of Cf_tai64n.t ]
+
+(** Timer event. *)
 type time = [ `Time of Cf_tai64n.t ]
 
+(** Writer sent end of octet stream. *)
 type complete = [ `Complete ]
+
+(** Writer notifications. *)
 type notify = [ Iom_stream.flownotify | complete ]
 
+(** {6 Jacks} *)
+
+(** The jack for process signal reactors. *)
 type ('c, 'n) signaler_jack = ('c, 'n) Iom_gadget.jack
     constraint 'c = [> Iom_stream.flowcontrol ]
     constraint 'n = [> signal ]
 
+(** The jack for loop idle reactors. *)
 type ('c, 'n) idler_jack = ('c, 'n) Iom_gadget.jack
     constraint 'c = [> Iom_stream.flowcontrol ]
     constraint 'n = [> idle ]
 
+(** The jack for timer event reactors. *)
 type ('c, 'n) timer_jack = ('c, 'n) Iom_gadget.jack
     constraint 'c = [> Iom_stream.flowcontrol ]
     constraint 'n = [> time ]
 
+(** A jack for reader reactors. *)
 type reader_jack =
     (Iom_octet_stream.fragment, Iom_stream.flowcontrol, Iom_stream.failed)
     Iom_stream.ijack
 
+(** A jack for writer reactors. *)
 type writer_jack =
     (Iom_octet_stream.fragment, Iom_stream.stop, notify) Iom_stream.ojack
 
+(** {6 Functions}
+
+    A simplified functional interface to the reactors.
+*)
+
+(** Evaluate [signaler ~n k] to obtain the signaler jack for a new reactor on
+    the process signal [n] in the kernel context [k].
+**)
 val signaler: n:int -> Iom_gadget.kernel -> ('c, 'n) signaler_jack Iom_gadget.t
+
+(** Evaluate [idler k] to obtain the idler jack for a new reactor on loop idle
+    events in the kernel context [k].
+*)
 val idler: Iom_gadget.kernel -> ('c, 'n) idler_jack Iom_gadget.t
-val timer: dt:float -> Iom_gadget.kernel -> ('c, 'n) timer_jack Iom_gadget.t
 
+(** Evaluate [timer ?t0 ~dt k] to obtain the timer jack for a new reactor on
+    timer events in the kernel context [k] that fire every [dt] seconds.
+    Use the optional [t0] parameter to specify the time of the first event.
+*)
+val timer:
+    ?t0:Cf_tai64n.t -> dt:float -> Iom_gadget.kernel ->
+    ('c, 'n) timer_jack Iom_gadget.t
+
+(** Evaluate [reader ~fd ~limits k] to obtain the reader jack for a new reactor
+    on read events for the file descriptor [fd] in the kernel context [k] with
+    [limits] on the buffering parameters.
+*)
 val reader:
     fd:Unix.file_descr -> limits:Iom_octet_stream.limits -> 
     Iom_gadget.kernel -> reader_jack Iom_gadget.t
 
+(** Evaluate [writer ~fd k] to obtain the writer jack for a new reactor on
+    write events for the file descriptor [fd] in the kernel context [k].
+*)
 val writer: fd:Unix.file_descr -> Iom_gadget.kernel -> writer_jack Iom_gadget.t
 
+(** Use [retry_needed error] to test whether [error] is either [Unix.EAGAIN] or
+    [Unix.EWOULDBLOCK].
+*)
 val retry_needed: Unix.error -> bool
 
-(**/**)
+(** {6 Classes}
 
+    These classes are used to implement the simplified functions above.  They
+    are provided to enable adaptations and extensions beyond the scope of the
+    simplified interface.
+*)
+
+(** The core gadget for I/O reactors, comprising the basic control interface
+    for stream gadgets and a type constraint on subtypes requiring the
+    inheritance of the {!Cf_poll.event} class.  {b Note well:} these gadgets state
+    objects are mutable, so care must be taken when writing derived classes to
+    consider the side effects of mutation and not violate the requirements of
+    the I/O gadget monad.
+*)
 class virtual ['control] core:
     'control Iom_gadget.rx ->
     object('self)
     	constraint 'control = [> Iom_stream.stop ]
         constraint 'self = unit Iom_gadget.t #Cf_poll.event
 
+        (** Set [true] when the gadget is started.  Because reactor state is
+            mutable, attempts to start more than one gadget with the same
+            initial state object raises [Failure].
+        *)
         val mutable started_: bool
         
+        (** Use [self#stop] to terminate the gadget.  In the base class, this
+            just calls [Iom_gadget.abort].  Override to implement additional
+            processing.
+        *)
         method private stop: 'a. 'a Iom_gadget.t
+        
+        (** Override [self#control event] to implement control event processing
+            for the reactor.  In the base class, all control events are ignored.
+        *)
     	method private control: 'control -> unit Iom_gadget.t
+        
+        (** Override [self#guard] to guard on all the receive wires in the
+            gadget.  In the base class, just the control wire is guarded and
+            the events are dispatched with [self#control].
+        *)
     	method private guard: unit Iom_gadget.guard
     end
 
+(** A subclass of [core] that sends messages on a wire in response to polling
+    events and flow control.
+*)
 class virtual ['control, 'source] source:
     k:Iom_gadget.kernel -> ('control, 'source) Iom_gadget.plug ->
     object('self)
         inherit ['control] core
         constraint 'control = [> Iom_stream.flowcontrol ]
 
+        (** Set [true] when the flow controller is ready to receive events. *)
         val mutable source_: bool
         
+        (** Define this method to produce a source message when the polling
+            event [self#service] method is called.  Return [None] if no message
+            is ready to send in response to this polling event.
+        *)
         method virtual private source: 'source option
 
+        (** Call [self#source], and if [source_] is still set afterward, then
+            reload the polling event and send the message (if available) on the
+            source wire.
+        *)
         method private service: Cf_poll.t -> unit Iom_gadget.t Cf_poll.state
+        
+        (** Load the polling event and set [source_] to [true]. *)
     	method private ready: unit Iom_gadget.t
+
+        (** Unload the polling event and set [source_] to [false]. *)
     	method private wait: unit Iom_gadget.t
     end
 
+(** A stream gadget that responds to process signals. *)
 class ['control, 'signal] signaler:
     k:Iom_gadget.kernel -> n:int -> ('control, 'signal) Iom_gadget.plug ->
     object
         inherit ['control, 'signal] source
         inherit [unit Iom_gadget.t] Cf_poll.signal
         constraint 'signal = [> signal ]
+        
+        (** Returns [Some (`Signal n)]. *)
         method private source: 'signal option
     end
 
+(** A stream gadget that responds to loop idle events. *)
 class ['control, 'idle] idler:
     k:Iom_gadget.kernel -> ('control, 'idle) Iom_gadget.plug ->
     object
         inherit ['control, 'idle] source
         inherit [unit Iom_gadget.t] Cf_poll.idle
         constraint 'idle = [> idle ]
+        
+        (** Returns [Some (`Idle epoch)] if [epoch_] is not [None], otherwise
+            returns [None].
+        *)
         method private source: 'idle option
     end
 
+(** A stream gadget that responds to timer events. *)
 class ['control, 'time] timer:
-    k:Iom_gadget.kernel -> dt:float -> ('control, 'time) Iom_gadget.plug ->
+    k:Iom_gadget.kernel -> ?t0:Cf_tai64n.t -> dt:float ->
+    ('control, 'time) Iom_gadget.plug ->
     object
         inherit ['control, 'time] source
         inherit [unit Iom_gadget.t] Cf_poll.time
         constraint 'time = [> time ]
+        
+        (** Returns [Some (`Time epoch_)]. *)
         method private source: 'time option
     end
 
+(** A subclass of [source] that handles reading from a file descriptor when its
+    polling event indicates it is ready.
+*)
 class virtual ['fragment, 'control, 'notify] reader_aux:
     k:Iom_gadget.kernel ->  ?rwx:Cf_poll.rwx -> fd:Unix.file_descr ->
     limits:Iom_octet_stream.limits ->
         constraint 'fragment = #Iom_octet_stream.fragment
         constraint 'notify = [> Iom_stream.failed ]
 
+        (** The octet buffer used for reading the file. *)
         val mutable buffer_: string
+        
+        (** The current offet into the buffer where octets will be stored. *)
         val mutable cursor_: int
+        
+        (** Indicates whether the end of the octet stream has been received. *)
         val mutable more_: Iom_stream.more
+        
+        (** The stack of buffers pending for delivery. *)
         val mutable stack_: (string * int * int) list
+        
+        (** The number of octets left to read before the high read limit. *)
         val mutable free_: int
 
+        (** Define this method, called by [self#source] to construct an octet
+            stream fragment object.
+        *)
         method virtual private fragment: 'fragment
 
+        (** Use [self#fail x] to abort the reactor with the exception [x].
+            Sends [`Failed x] as a notification and calls [Iom_gadget.abort].
+        *)
         method private fail: 'a. exn -> 'a Iom_gadget.t
+        
+        (** Invoked by [self#source] to read octets from the file descriptor
+            into the buffer.  Defined in the base class as [(fun length ->
+            Unix.read fd buffer_ cursor_ length)] and has the same semantics.
+        *)
+        method private octets: int -> int
+
+        (** Enforces read limits and calls [self#octets] to read from the file
+            descriptor, then calls itself recursively until the end of the
+            octet stream is found, a failure exception arises, or the file
+            descriptor blocks.
+        *)
         method private source: 'fragment option
-        method private octets: int -> int
     end
 
+(** A stream gadget that reads octet stream fragments with [Unix.read]. *)
 class ['control, 'notify] reader:
     k:Iom_gadget.kernel -> ?rwx:Cf_poll.rwx -> fd:Unix.file_descr ->
     limits:Iom_octet_stream.limits ->
     (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.iplug ->
     object
         inherit [Iom_octet_stream.fragment, 'control, 'notify] reader_aux
+        
+        (** Returns a concrete octet stream fragment object. *)
         method private fragment: 'fragment
     end
 
+(** A stream gadget that writes octet stream fragments with [Unix.write]. *)
 class ['fragment, 'control, 'notify] writer:
     k:Iom_gadget.kernel -> fd:Unix.file_descr ->
     ('fragment, 'control, 'notify) Iom_stream.oplug ->
         constraint 'notify = [> notify ]
         constraint 'fragment = #Iom_octet_stream.fragment
         
+        (** The queue of substrings pending for write. *)
         val mutable buffer_: Cf_message.substring Cf_deque.t
+        
+        (** Flag whether the end of the octet stream has been queued. *)
         val mutable more_: Iom_stream.more
         
+        (** Use [self#fail x] to abort the reactor with the exception [x].
+            Sends [`Failed x] as a notification and calls [Iom_gadget.abort].
+        *)
         method private fail: 'a. exn -> 'a Iom_gadget.t
+        
+        (** Use [self#complete] to send the [`Complete] notification.  Derived
+            classes may need to override.
+        *)
         method private complete: unit Iom_gadget.t
+        
+        (** Service the polling event when the file descriptor is ready for
+            writing.  Drains the pending buffer queue until empty, a failure
+            exception arises or the file descriptor blocks.  Notifies [`Ready]
+            if the queue drains completely.
+        *)
         method private service: Cf_poll.t -> unit Iom_gadget.t Cf_poll.state
+        
+        (** Receive an octet stream fragment.  If the buffer queue is currently
+            empty, the fragment is pushed into the buffer and [self#consume] is
+            called to drain the queue.  If the queue is not emptied, then
+            notifies [`Wait] and loads the polling event.
+        *)
         method private receive: 'fragment -> unit Iom_gadget.t
+        
+        (** Invoked by [self#consume] to write octets to the file descriptor.
+            Defined in the base class as identical to [Unix.write] and has the
+            same semantics.
+        *)
         method private octets: string -> int -> int -> int
+        
+        (** Drains the buffer queue.  Called in [self#service] when the polling
+            event arises, and also called in [self#receive] when fragments
+            arrive while the file descriptor is not blocked.
+        *)
         method private consume: unit
     end
 

File iom/iom_sock_stream.ml

     method socket: ('af, [ `SOCK_STREAM ]) Cf_socket.t = socket
 end
 
-type 'address listen = [ `Listen of 'address ]
-type 'address listener_notify = [ 'address listen | Iom_stream.failed ]
-
-class ['control, 'notify] receiver ~k ~s:socket ~limits plug =
-    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
-    let fd = Cf_socket.to_unix_file_descr socket in
-    let fragmentTx, (_, notifyTx) = plug in
-    object(self)
-        inherit ['control, 'notify] Iom_reactor.reader ~k ~fd ~limits plug
-
-        method private octets n =
-            let v =
-                Cf_socket.recv socket buffer_ cursor_ n
-                    Cf_socket.msg_flags_none
-            in
-            if v = 0 then raise End_of_file;
-            v
-
-        method private service p =
-            assert source_;
-            try
-                match self#source with
-                | None ->
-                    Cf_poll.Loaded p
-                | Some fragment ->
-                    let more = fragment#more in
-                    let out = fragmentTx#put fragment in
-                    match more with
-                    | Iom_stream.More ->
-                        Cf_poll.Working (p, out)
-                    | Iom_stream.Last ->
-                        let out = out >>= fun () -> notifyTx#put `Complete in
-                        Cf_poll.Final out
-            with
-            | Unix.Unix_error (code, _, _)
-              when Iom_reactor.retry_needed code ->
-                Cf_poll.Loaded p
-            | x ->
-                Cf_poll.Final (notifyTx#put (`Failed x))
-    end
-
-class ['control, 'notify] sender ~k ~s:socket plug =
-    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
-    let fd = Cf_socket.to_unix_file_descr socket in
-    object
-        inherit [Iom_octet_stream.fragment, 'control, 'notify]
-            Iom_reactor.writer ~k ~fd plug as super
-        
-        method private octets buf pos len =
-            Cf_socket.send socket buf pos len Cf_socket.msg_flags_none
-
-        method private complete =
-            Unix.shutdown fd Unix.SHUTDOWN_SEND;
-            super#complete
-    end
-
 class virtual ['control, 'notify, 'connect] connector ~k ~s:socket ~rwx plug =
     let _ = (socket : 'socket) in
     let fd = Cf_socket.to_unix_file_descr socket in
                 | x -> self#fail x
     end
 
+type 'address listen = [ `Listen of 'address ]
+type 'address listener_notify = [ 'address listen | Iom_stream.failed ]
+
 let seqput_ acc tx =
     let tx = (tx :> 'a Iom_gadget.tx) in
     Cf_seq.C.sequence (Cf_seq.map tx#put (Cf_seq.of_list (List.rev acc)))
                     self#fail x
     end
 
+class ['control, 'notify] receiver ~k ~s:socket ~limits plug =
+    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
+    let fd = Cf_socket.to_unix_file_descr socket in
+    let fragmentTx, (_, notifyTx) = plug in
+    object(self)
+        inherit ['control, 'notify] Iom_reactor.reader ~k ~fd ~limits plug
+
+        method private octets n =
+            let v =
+                Cf_socket.recv socket buffer_ cursor_ n
+                    Cf_socket.msg_flags_none
+            in
+            if v = 0 then raise End_of_file;
+            v
+
+        method private service p =
+            assert source_;
+            try
+                match self#source with
+                | None ->
+                    Cf_poll.Loaded p
+                | Some fragment ->
+                    let more = fragment#more in
+                    let out = fragmentTx#put fragment in
+                    match more with
+                    | Iom_stream.More ->
+                        Cf_poll.Working (p, out)
+                    | Iom_stream.Last ->
+                        let out = out >>= fun () -> notifyTx#put `Complete in
+                        Cf_poll.Final out
+            with
+            | Unix.Unix_error (code, _, _)
+              when Iom_reactor.retry_needed code ->
+                Cf_poll.Loaded p
+            | x ->
+                Cf_poll.Final (notifyTx#put (`Failed x))
+    end
+
+class ['control, 'notify] sender ~k ~s:socket plug =
+    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
+    let fd = Cf_socket.to_unix_file_descr socket in
+    object
+        inherit [Iom_octet_stream.fragment, 'control, 'notify]
+            Iom_reactor.writer ~k ~fd plug as super
+        
+        method private octets buf pos len =
+            Cf_socket.send socket buf pos len Cf_socket.msg_flags_none
+
+        method private complete =
+            Unix.shutdown fd Unix.SHUTDOWN_SEND;
+            super#complete
+    end
+
 type ('ci, 'co, 'ni, 'no, 'x, 'cx, 'nx) state = [
     | ('ci, 'co, 'ni, 'no) Iom_socket.state
     | `S_receiving of ('ci, 'ni) Iom_gadget.jack

File iom/iom_sock_stream.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Connection-oriented, orderly close (SOCK_STREAM) sockets. *)
+
+(** {6 Overview}
+
+    TCP and other SOCK_STREAM sockets are connection-oriented communications
+    channels with reliable delivery of octet streams and orderly close
+    semantics.  Classes and functors are defined to extend the core socket
+    functions in {!Iom_socket} to add these features.
+*)
+
+(** An I/O stream event representing a connection event. *)
 class ['af] connection:
     Iom_stream.more -> ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
     object
+        (** Flag to indicate whether more connections are expected. *)
         method more: Iom_stream.more
+        
+        (** The socket descriptor. *)
         method socket: ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t
     end
 
+(** A listener has started at the specified address. *)
 type 'address listen = [ `Listen of 'address ]
+
+(** The notifications from listeners. *)
 type 'address listener_notify = [ 'address listen | Iom_stream.failed ]
 
-class ['control, 'notify] receiver:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    limits:Iom_octet_stream.limits ->
-    (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.iplug ->
-    object
-        inherit ['control, 'notify] Iom_reactor.reader
-        constraint 'notify = [> Iom_reactor.complete ]
-    end
+(** {6 Simplified Functional Interface} *)
 
-class ['control, 'notify] sender:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.oplug ->
-    [Iom_octet_stream.fragment, 'control, 'notify] Iom_reactor.writer
+(** The jack for controlling a socket. *)
+type endpoint_jack =
+    (Iom_octet_stream.fragment, Iom_octet_stream.fragment,
+        Iom_stream.flowcontrol, Iom_reactor.notify) Iom_stream.iojack
 
+(** The jack for controlling a connection initiator. *)
+type 'af initiator_jack =
+    ('af connection, Iom_stream.flowcontrol, Iom_stream.failed)
+        Iom_stream.ijack
+
+(** The jack for controlling a listener. *)
+type 'af listener_jack =
+    ('af connection, Iom_stream.flowcontrol, 'af Cf_socket.sockaddr
+    listener_notify) Iom_stream.ijack
+
+(** Evaluate [initiator ~s ~src ~dst k] to obtain the initiator jack for
+    binding the socket [s] to the local address [src] and connecting it in the
+    kernel context [k] to the destination address [dst].
+*)
+val initiator:
+    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
+    Iom_gadget.kernel -> 'af initiator_jack Iom_gadget.t
+
+(** Evaluate [listener ~s ~src ~backlog k] to obtain the listener jack for
+    binding the socket [s] to the local address [src], applying a listen queue
+    depth of [backlog] and accepting connections in the kernel context [k].
+*)
+val listener:
+    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 
+    src:'af Cf_socket.sockaddr -> backlog:int -> Iom_gadget.kernel ->
+    'af listener_jack Iom_gadget.t
+
+(** Evaluate [endpoint ~s ?addrs ~limits k] to obtain the endpoint jack for
+    the socket [s].  If [~addrs] is provided, then bind the socket to the
+    first address and connect to the second address before notifying ready to
+    send.
+*)
+val endpoint:
+    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    ?addrs:('af Cf_socket.sockaddr * 'af Cf_socket.sockaddr) ->
+    limits:Iom_octet_stream.limits -> Iom_gadget.kernel ->
+    endpoint_jack Iom_gadget.t
+
+(** {6 Signaling Connection} *)
+
+(** Use [inherit connector ~k ~s ~rwx plug] to derive a subclass gadget that
+    responds to connection events on the socket [s] in the kernel context [k].
+    The [rwx] parameter identifies the sort of {!Cf_poll.file} required.  The
+    base class inherits from {!Iom_reactor.source} and {!Cf_poll.file}.
+*)
 class virtual ['control, 'notify, 'connect] connector:
     k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
     rwx:[< Cf_poll.rwx ] -> ('connect, 'control, 'notify) Iom_stream.iplug ->
         constraint 'notify = [> Iom_stream.failed ]
         constraint 'connect = 'af #connection
 
+        (** Set [true] when the gadget is started.  Because reactor state is
+            mutable, attempts to start more than one gadget with the same
+            initial state object raises [Failure].
+        *)
         val mutable initiated_: bool
 
+        (** Method called during [self#start] to bind the socket to its local
+            address.
+        *)
         method virtual private bind: unit
         
+        (** Use [self#fail x] to close the connector socket, notify [`Failed
+            x] and terminate the gadget.
+        *)
         method private fail: 'a. exn -> 'a Iom_gadget.t
     end
 
+(** Use [inherit \['control, 'notify, 'connect\] initiator ~k ~s plug] to
+    derive a subclass for a gadget that initiates a connection to a remote
+    address in the kernel context [k] with the socket [s].
+*)
 class virtual ['control, 'notify, 'connect] initiator:
     k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
     ('connect, 'control, 'notify) Iom_stream.iplug ->
         inherit ['control, 'notify, 'connect] connector
         constraint 'connect = 'af #connection
 
+        (** Method called during [self#source] to construct the connection
+            object.
+        *)
         method virtual private connect: 'connect
+        
+        (** Returns [Some self#connect]. *)
         method private source: 'connect option
     end
 
+(** Use [inherit \['control, 'notify, 'connect\] listener ~k ~s ~backlog plug]
+    to derive a subclass for a gadget that listens for connections at a local
+    address in the kernel context [k] with the socket [s].
+*)
 class virtual ['control, 'notify, 'connect] listener:
     k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
     backlog:int -> ('connect, 'control, 'notify) Iom_stream.iplug ->
         constraint 'connect = 'af #connection
         constraint 'notify = [> 'address listener_notify ]
 
+        (** The list of pending connections accepted but not yet drained. *)
         val mutable stack_: 'connect list
 
+        (** Define [self#getsockname] to return the local address of the
+            listening socket.
+        *)
         method virtual private getsockname: 'address
+        
+        (** Define [self#accept] to accept a new connection and construct the
+            associated [connection] object for it.
+        *)
         method virtual private accept: 'connect
     end
 
+(** {6 Endpoint Control} *)
+
+(** A receiver gadget for stream socket input. *)
+class ['control, 'notify] receiver:
+    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    limits:Iom_octet_stream.limits ->
+    (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.iplug ->
+    ['control, [> Iom_reactor.complete ] as 'notify] Iom_reactor.reader
+
+(** A sender gadget for stream socket output. *)
+class ['control, 'notify] sender:
+    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.oplug ->
+    [Iom_octet_stream.fragment, 'control, 'notify] Iom_reactor.writer
+
+(** The endpoint control state for stream socket gadgets.  Extends the core
+    socket state type to add connection-oriented semantics.
+*)
 type ('ci, 'co, 'ni, 'no, 'x, 'cx, 'nx) state = [
+
+    (** Core states. *)
     | ('ci, 'co, 'ni, 'no) Iom_socket.state
+    
+    (** Socket is receiving only.  Sender has stopped. *)
     | `S_receiving of ('ci, 'ni) Iom_gadget.jack
+    
+    (** Socket is sending only.  Receiver has stopped. *)
     | `S_sending of ('co, 'no) Iom_gadget.jack
+    
+    (** Socket is initiating a connection. *)
     | `S_initiating of ('x, 'cx, 'nx) Iom_stream.ijack * 'ci
 ]
 constraint 'ni = [> Iom_reactor.complete ]
 
+(** Use [inherit \['i, 'o, 'control, 'notify, 'af\] endpoint ~k ~s ?addrs plug]
+    to derive an endpoint controller gadget for stream sockets specialized by
+    the socket address family.
+*)
 class virtual ['i, 'o, 'control, 'notify, 'af] endpoint:
     k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
     ?addrs:('af Cf_socket.sockaddr * 'af Cf_socket.sockaddr) ->
     ('i, 'o, 'control, 'notify) Iom_stream.ioplug ->
     object
-        inherit
-            ['i, 'o, 'control, 'notify, 'state, 'af, 'st] Iom_socket.endpoint
+        inherit ['i, 'o, 'control, 'notify, 'state, 'af, 'st]
+            Iom_socket.endpoint
 
         constraint 'i = #Iom_octet_stream.fragment
         constraint 'o = #Iom_octet_stream.fragment
         constraint 'state = ('ci, 'co, 'ni, 'no, 'x, 'cx, 'nx) state
         constraint 'st = Cf_socket.SOCK_STREAM.tag
 
+        (** Called in [self#state0] (unless [?addrs] is [None]) to construct
+            an initiator gadget for the source and destination address with
+            the plug provided.  The initiator is started and the state is set
+            to [`S_initiating].
+        *)
         method virtual private initiator:
             src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
             ('x, 'cx, 'nx) Iom_stream.iplug -> #Iom_gadget.start
     end
 
+(** {6 Specializer Functor}
+
+    Use the interface defined below to specialize the stream socket gadget for
+    a specific address family.
+*)
+
+(** Auxillary class types referenced in the module type [T] below. *)
 module Aux: sig
+    (** A synonym for the core [connection] class type. *)
     class type ['af] connection_aux = ['af] connection
 
-    (*
+    (** A subclass type of the core [initiator] class type that defines a
+        virtual method [aux] to produce a [connection] object.
+    *)
     class type virtual ['control, 'notify, 'connect] initiator_aux = object
         inherit ['control, 'notify, 'connect] initiator
         constraint 'connect = 'af #connection_aux
 
+        (** Called in [self#connect] to produce a [connection] object. *)
         method virtual private aux:
             ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
-
-        method private bind: unit
-        method private connect: 'connect
     end
 
+    (** A subclass type of the core [listener] class type that defines a
+        virtual method [aux] to produce a [connection] object.
+    *)
     class type virtual ['control, 'notify, 'connect] listener_aux = object
         inherit ['control, 'notify, 'connect] listener
         constraint 'connect = 'af #connection_aux
 
         method virtual private aux:
             ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
-
-        method private bind: unit
-        method private accept: 'connect
     end
 
+    (** A synonym for the core [endpoint] class. *)
     class type virtual ['i, 'o, 'control, 'notify, 'af] endpoint_aux =
         ['i, 'o, 'control, 'notify, 'af] endpoint
-    *)
 end
 
-type endpoint_jack =
-    (Iom_octet_stream.fragment, Iom_octet_stream.fragment,
-        Iom_stream.flowcontrol, Iom_reactor.notify) Iom_stream.iojack
-
-type 'af initiator_jack =
-    ('af connection, Iom_stream.flowcontrol, Iom_stream.failed)
-        Iom_stream.ijack
-
-type 'af listener_jack =
-    ('af connection, Iom_stream.flowcontrol, 'af Cf_socket.sockaddr
-    listener_notify) Iom_stream.ijack
-
-val initiator:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
-    Iom_gadget.kernel -> 'af initiator_jack Iom_gadget.t
-
-val listener:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 
-    src:'af Cf_socket.sockaddr -> backlog:int -> Iom_gadget.kernel ->
-    'af listener_jack Iom_gadget.t
-
-val endpoint:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    ?addrs:('af Cf_socket.sockaddr * 'af Cf_socket.sockaddr) ->
-    limits:Iom_octet_stream.limits -> Iom_gadget.kernel ->
-    endpoint_jack Iom_gadget.t
-
+(** The signature of modules produced by the [Create] functor. *)
 module type T = sig
     include Iom_socket.T with module P.ST = Cf_socket.SOCK_STREAM
-        
+    
+    (** The class of connection objects. *)
     class connection: Iom_stream.more -> t -> object
         inherit [P.AF.tag] Aux.connection_aux
         method local: address
         method remote: address
     end
 
+    (** The jack for controlling a socket connection initator gadget. *)
     type initiator =
         (connection, Iom_stream.flowcontrol, Iom_stream.failed)
         Iom_stream.ijack
 
+    (** The jack for controlling a socket connection listener gadget. *)
     type listener =
         (connection, Iom_stream.flowcontrol, address listener_notify)
         Iom_stream.ijack
     
+    (** The jack for controlling a socket endpoint gadget. *)
     type endpoint = endpoint_jack
     
+    (** Use [socket ()] as convenient constructor for socket descriptors. *)
     val socket: unit -> t
 
+    (** Evaluate [initator ?s ?src ~dst k] to obtain the jack for a connection
+        initiator to the destination [dst] in the kernel context [k].  If [src]
+        is provided, then bind explicity to that local address.  If [s] is
+        not provided, then construct a new socket.
+    *)
     val initiator:
         ?s:t -> ?src:address -> dst:address -> Iom_gadget.kernel ->
         initiator Iom_gadget.t
 
+    (** Evaluate [listener ?s ?src ~backlog k] to obtain the jack for a
+        connection listener in the kernel context [k].  If [src] is not
+        provided, then bind to an unspecified address.  If [s] is not provided,
+        then construct a new socket.
+    *)
     val listener:
         ?s:t -> ?src:address -> backlog:int -> Iom_gadget.kernel ->
         listener Iom_gadget.t
 
+    (** Evaluate [endpoint ?s ?addrs ~limits k] to obtain the jack for an
+        endpoint controller constrained on receive by [limits] in the kernel
+        context [k].  If [addrs] is provided, then bind explicitly to the first
+        address and connect to the second address before notifying ready to
+        send.  If [s] is not provided, then construct a new socket.
+    *)
     val endpoint:
         ?s:t -> ?addrs:(address * address) -> limits:Iom_octet_stream.limits ->
         Iom_gadget.kernel -> endpoint Iom_gadget.t
 end
 
+(** Use [Create(P)] to compose a module of stream socket gadgets specialized
+    for the address family of the socket module [P].
+*)
 module Create(P: Cf_socket.P with module ST = Cf_socket.SOCK_STREAM):
     T with module P = P
 

File iom/iom_socket.ml

             | _, `Failed x ->
                 self#fail x
             | _, _ ->
-                assert (not true);
                 self#next
                 
         method private onotify event =
             | _, `Failed x ->
                 self#fail x
             | _, _ ->
-                assert (not true);
                 self#next
                 
         method private control c =

File iom/iom_socket.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Core definitions for socket I/O gadgets. *)
+
+(** {6 Overview}
+
+    Sockets are full duplex communication mechanisms.  A single file descriptor
+    comprises two independent reader and writer gadgets, and a unifying
+    endpoint gadget for control of opening and closing semantics.  Classes and
+    modules defined here are inherited and included by classes and modules
+    defining specific forms of sockets.
+*)
+
+(** The core polymorphic variant type defining the state of a socket. *)
 type ('ci, 'co, 'ni, 'no) state = [
+        (** The socket has failed and is no longer usable. *)
         | `S_failed of exn
+        
+        (** The socket is established for simultaneous reading and writing. *)
         | `S_established of
             ('ci, 'ni) Iom_gadget.jack * ('co, 'no) Iom_gadget.jack
     ]
     constraint 'ni = [> Iom_stream.failed ]
     constraint 'no = [> Iom_reactor.notify ]
 
+(** Use [inherit endpoint ~k ~s plug] to derive a subclass for the socket
+    endpoint control semantics required by the type of socket [s].
+*)
 class virtual ['i, 'o, 'control, 'notify, 'state, 'af, 'st] endpoint :
     k:Iom_gadget.kernel -> s:('af, 'st) Cf_socket.t ->
     ('i, 'o, 'control, 'notify) Iom_stream.ioplug ->
         constraint 'notify = [> Iom_reactor.notify ]
         constraint 'state = [> ('ci, 'co, 'ni, 'no) state ]
 
+        (** The state of the socket. *)
         val state_: 'state
         
+        (** Called by [self#establish] to start a receiver I/O gadget for the
+            read side of the socket.
+        *)
         method virtual private receiver:
             ('i, 'ci, 'ni) Iom_stream.iplug -> #Iom_gadget.start
-
+        
+        (** Called by [self#establish] to start a sender I/O gadget for the
+            write side of the socket.
+        *)
         method virtual private sender:
             ('o, 'co, 'no) Iom_stream.oplug -> #Iom_gadget.start
 
+        (** Calls [self#receiver] and [self#sender], starts the resulting
+            receiver and sender gadgets, notifies [`Ready], and sets the socket
+            into the [`Established] state.
+        *)
         method private establish: 'state Iom_gadget.t
+        
+        (** Calls [self#establish].  Override in derived classes if connection
+            oriented semantics are required.
+        *)
         method private state0: 'state Iom_gadget.t
+        
+        (** Use [self#close] to close the socket and terminate the gadget.
+            Derived classes may need to override.
+        *)
         method private close: 'a. 'a Iom_gadget.t
+        
+        (** Use [self#reset] to shutdown the socket abortively, i.e. with
+            [Unix.shutdown Unix.SHUTDOWN_ALL] and call [self#close].  Derived
+            classes may need to override.
+        *)
         method private reset: 'a. 'a Iom_gadget.t
+        
+        (** Use [self#fail x] to notify [`Failed x] and call [self#reset]. *)
         method private fail: 'a. exn -> 'a Iom_gadget.t
+        
+        (** Handle notifications from the receiver gadget.  Derived classes may
+            need to override.  The base method catches [`Failed] notifications
+            and stops the sender before calling [self#fail].  Unexpected
+            notifications are ignored.
+        *)
         method private inotify: 'ni -> unit Iom_gadget.t
+        
+        (** Handle notifications from the sender gadget.  Derived classes may
+            need to override.  The base method forwards flow control
+            notifications up the stack, and also catches [`Failed]
+            notifications to stop the receiver before calling [self#fail].
+            Unexpected notifications are ignored.
+        *)
         method private onotify: 'no -> unit Iom_gadget.t
+        
+        (** Handle control messages.  Derived classes may need to override.
+            The base method handles the [`Established] state and ignores
+            all events in other states.  It forwards [`Stop] messages to both
+            the receiver and the sender, and it forwards flow control messages
+            to the receiver.
+        *)
         method private control: 'control -> unit Iom_gadget.t
+        
+        (** Guards on notification events as specified by the socket state.  If
+            the socket state is not [`Established] then no wires are guarded.
+            Derived classes may need to override for handling guards in
+            additional states.
+        *)
         method private stateguard: (Iom_gadget.gates, unit) Cf_cmonad.t
+        
+        (** Guards on the control wire, dispatching events with [self#control],
+            and calls [self#stateguard] for the rest of the guard.
+        *)
         method private guard: unit Iom_gadget.guard
     end
 
+(** The signature of modules produced by the [Create] functor. *)
 module type T = sig
+
+    (** The socket module. *)
     module P: Cf_socket.P
+    
+    (** The type of socket descriptors. *)
     type t = (P.AF.tag, P.ST.tag) Cf_socket.t
+    
+    (** The type of socket addresses. *)
     type address = P.AF.address
+    
+    (** The "unspecified" address for the socket. *)
     val unspecified_address: address
 end
 
+(** Use [Create(P)] to compose a module for including in modules for specific
+    socket types that comprises the core definitions for socket I/O gadgets.
+*)
 module Create(P: Cf_socket.P): T with module P = P
 
 (*--- End of File [ iom_socket.mli ] ---*)

File iom/iom_stream.ml

 open Cf_cmonad.Op
 
 type stop = [ `Stop ]
+type failed = [ `Failed of exn ]
 type ready = [ `Ready ]
 type wait = [ `Wait ]
-type failed = [ `Failed of exn ]
 
 type readywait = [ ready | wait ]
 type flowcontrol = [ stop | readywait ]

File iom/iom_stream.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Core I/O event driver framework.  *)
+
+(** {6 Overview}
+
+    The core types, functions and classes for composing I/O event driven
+    application services using the [Iom_gadget] monad are defined.
+    
+    Gadgets derived from [Iom_stream] process sequences of typed events with
+    flow control signaling.  Simplex streams are either {i consumers}, i.e.
+    they receive an event sequence while they are ready, or they are
+    {i producers}, i.e. they send an event sequence unless they are blocked.
+    Duplex streams both send and receive event sequences.
+    
+    All stream gadgets receive control events and send notification events.
+    Control and notification events are polymorphic variant types.  By
+    convention, stream gadgets do not process control events they don't
+    recognize.  Stream gadgets respond to the [`Stop] control event by
+    immediately (perhaps abortively) releasing all contained resources and
+    terminating its processing.  Stream gadgets send [`Failed of exn]
+    notification events to indicate they have unexpectedly terminated all
+    processing.  Flow control events, i.e. [`Ready] and [`Wait] are sent and/or
+    received as either control or notification events, as appropriate.
+    
+    Stream gadget may process events of any type, but [class event] is provided
+    as a convenient base class for streams of events that may terminate in an
+    orderly way, e.g. octet streams connected to pipes, files or TCP sockets.
+*)
+
+(** {6 Event Types} *)
+
+(** A control event that stops a gadget, abortively if necessary. *)
 type stop = [ `Stop ]
-type ready = [ `Ready ]
-type wait = [ `Wait ]
+
+(** A notification event that indicates unexpected termination. *)
 type failed = [ `Failed of exn ]
 
+(** A control or notification event signaling readiness to process further
+    events in the stream.
+*)
+type ready = [ `Ready ]
+
+(** A control or notification event signaling that further events in the stream
+    will be queued for later processing.
+*)
+type wait = [ `Wait ]
+
+(** A convenient type that sums the flow control events. *)
 type readywait = [ ready | wait ]
+
+(** A convenience type to represent the core control events of producer stream
+    gadgets.
+*)
 type flowcontrol = [ stop | readywait ]
+
+(** A convenient type to represent the core notification events of consumer
+    stream gadgets.
+*)
 type flownotify = [ failed | readywait ]
 
+(** A variant type that indicates whether an event is the final event before
+    the sequence orderly terminates.
+*)
 type more = Last | More
-class type event = object method more: more end
 
+(** A convenient class type for objects representing events in sequences that
+    may terminate in an orderly way, e.g. octet streams connected to pipes,
+    files or TCP sockets.
+*)
+class type event = object
+
+    (** Returns [Last] if this is the final event before orderly termination of
+        the event sequence.  Otherwise, returns [More].
+    *)
+    method more: more
+end
+
+(** {6 Plugs and Jacks} *)
+
+(** A plug type for all stream gadgets. *)
 type ('x, 'c, 'n) plug = 'x * ('c, 'n) Iom_gadget.plug
     constraint 'c = [> stop ]
     constraint 'n = [> failed ]
 
+(** A jack type for all stream gadgets. *)
 type ('x, 'c, 'n) jack = 'x * ('c, 'n) Iom_gadget.jack
     constraint 'c = [> stop ]
     constraint 'n = [> failed ]
 
+(** A specialization of the plug type for simplex producer stream gadgets. *)
 type ('i, 'c, 'n) iplug = ('i Iom_gadget.tx, 'c, 'n) plug
     constraint 'c = [> readywait ]
     
+(** A specialization of the jack type for simplex producer stream gadgets. *)
 type ('i, 'c, 'n) ijack = ('i Iom_gadget.rx, 'c, 'n) jack
     constraint 'c = [> readywait ]
 
+(** A specialization of the plug type for simplex consumer stream gadgets. *)
 type ('o, 'c, 'n) oplug = ('o Iom_gadget.rx, 'c, 'n) plug
     constraint 'n = [> readywait ]
 
+(** A specialization of the jack type for simplex consumer stream gadgets. *)
 type ('o, 'c, 'n) ojack = ('o Iom_gadget.tx, 'c, 'n) jack
     constraint 'n = [> readywait ]
 
+(** A specialization of the plug type for duplex stream gadgets. *)
 type ('i, 'o, 'c, 'n) ioplug = (('o, 'i) Iom_gadget.plug, 'c, 'n) plug
     constraint 'c = [> readywait ]
     constraint 'n = [> readywait ]
 
+(** A specialization of the jack type for duplex stream gadgets. *)
 type ('i, 'o, 'c, 'n) iojack = (('o, 'i) Iom_gadget.jack, 'c, 'n) jack
     constraint 'c = [> readywait ]
     constraint 'n = [> readywait ]
 
+(** Use [ijack f] to create a harness of wires, with an [iplug] at one end and
+    a matching [ijack] at the other.  The plug is applied to [f] to obtain a
+    gadget object, its start method is applied, and the [ijack] is returned.
+*)
 val ijack:
   (('i, 'c, 'n) iplug -> #Iom_gadget.start) -> ('i, 'c, 'n) ijack Iom_gadget.t
 
+(** Use [ojack f] to create a harness of wires, with an [oplug] at one end and
+    a matching [ojack] at the other.  The plug is applied to [f] to obtain a
+    gadget object, its start method is applied, and the [ojack] is returned.
+*)
 val ojack:
   (('o, 'c, 'n) oplug -> #Iom_gadget.start) -> ('o, 'c, 'n) ojack Iom_gadget.t
 
+(** Use [iojack f] to create a harness of wires, with an [ioplug] at one end
+    and a matching [iojack] at the other.  The plug is applied to [f] to obtain
+    a gadget object, its start method is applied, and the [iojack] is returned.
+*)
 val iojack:
   (('i, 'o, 'c, 'n) ioplug -> #Iom_gadget.start) ->
   ('i, 'o, 'c, 'n) iojack Iom_gadget.t
 
+(** {6 State Compositors}
+
+    The virtual classes defined below of useful for combining into concrete
+    classes to serve as the state of a stream gadget.
+*)
+
+(** The substate of a stream gadget concerned with consuming sequences of
+    events.  Use [inherit consume rx] to derive a state object that can consume
+    events of the type carried by the wire connected to the [rx] receiver.
+*)
 class virtual ['x] consume:
+    (** The receiver for events in the sequence. *)
     'x #Iom_gadget.rx ->
     object('self)
-        val consume_: bool
+        val consume_: bool  (** [true] if ready for consuming events. *)
 
+        (** The result of [self#push event] is applied in the [consume] method
+            to obtain the substate object after [event] is processed.
+        *)
         method virtual push: 'x -> 'self Iom_gadget.t
 
+        (** If the stream is ready for consuming events, then [self#consume f]
+            guards on the receiver, dispatching received events through
+            [self#push] and applying [f] to the resulting substate object.  If
+            [push] raises an exception, then [f] is applied to the exception
+            instead.  If the stream is not ready for consuming events, then
+            the receiver is not guarded.
+        *)
         method consume:
             ('self Cf_exnopt.t -> unit Iom_gadget.t) -> unit Iom_gadget.guard
 
+        (** Returns a copy of the substate with [consume_ = false]. *)
         method block: 'self Iom_gadget.t
+
+        (** Returns a copy of the substate with [consume_ = true]. *)
         method unblock: 'self Iom_gadget.t
     end
 
+(** The substate of a stream gadget concerned with producing sequences of
+    events.  Use [inherit produce tx] to derive a state object that can produce
+    events of the type carried by the wire connected to the [tx] transmitter.
+*)
 class virtual ['y] produce:
     'y #Iom_gadget.tx ->
     object('self)
-        val produce_: bool
+        val produce_: bool  (** [true] if ready for producing events. *)
 
+        (** The result of [self#shift] is evaluated to obtain a list of events
+            to produce and the optional substate to return in the [produce]
+            method.
+        *)
         method virtual shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
 
+        (** If the stream is ready for producing events, then [self#produce]
+            returns an optional substate indicating that further processing is
+            expected, returns [None] to indicate that processing is finished,
+            or returns an exception.  If the stream is not ready for producing
+            events, then the current substate is returned with no other effect.
+        *)
         method produce: 'self option Cf_exnopt.t Iom_gadget.t
+
+        (** Returns a copy of the substate with [produce_ = true]. *)
         method ready: 'self Iom_gadget.t
+
+        (** Returns a copy of the substate with [produce_ = false]. *)
         method wait: 'self Iom_gadget.t
     end
 
+(** The substate of a stream gadget concerned with both producing and consuming
+    sequences of events.  Use [inherit transform rx tx] to derive a state
+    object that can consume events of the type carried by [rx] and produce
+    events of the type carried by [tx].
+*)
 class virtual ['x, 'y] transform:
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
     object('self)
         inherit ['y] produce
     end
 
+(** The virtual base class of all stream gadget states. *)
 class virtual ['c, 'n] substate: object('self)
     constraint 'self = #Iom_gadget.next
     constraint 'c = [> stop ]
     method private stop: 'a. 'a Iom_gadget.t
 end
 
+(** Use [inherit control plug] to derive a stream gadget state that receives
+    events on the control wire in [plug].
+*)
 class virtual ['c, 'n] control:
     ('c #Iom_gadget.rx * 'n #Iom_gadget.tx) ->
     object
         inherit ['c, 'n] substate
+        
+        (** When events are received on the control wire in [plug], they are
+            dispatched to [self#control event] and the returned action is
+            evaluated.
+        *)
         method private control: 'c -> unit Iom_gadget.t
+        
+        (** Invoke the [guard] method on the inherited base class to guard on
+            the control events for [plug].
+        *)
         method private guard: unit Iom_gadget.guard
     end
 
+(** Use [inherit control plug] to derive a stream gadget state that receives
+    events on the notification wire in [jack].
+*)
 class virtual ['c, 'n] notify:
     ('n #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
     object
         inherit ['c, 'n] substate
+        
+        (** When events are received on the notification wire in [jack], they
+            are dispatched to [self#notify event] and the returned action is
+            evaluated.
+        *)
         method private notify: 'n -> unit Iom_gadget.t
+        
+        (** Invoke the [guard] method on the inherited base class to guard on
+            the notification events for [jack].
+        *)
         method private guard: unit Iom_gadget.guard
     end