Commits

Anonymous committed 0a2b036

Migrate to using Nx library.

Comments (0)

Files changed (18)

 released versions of the library.
 
 
+===== Version 0.4 =====
+
+Highlights of this release:
+
++ Migrate to using new "Network Extensions" (Nx) library.
+
 ===== Version 0.3 =====
 
 This release is yet another major overhaul.  The programming interfaces are
 # The following META file is a guess by ocamlfind:
 name="iom"
-version="0.3"
+version="0.4"
 description="OCaml NAE Reactive I/O Monads"
 requires="unix cf"
 archive(byte)="iom.cma"
 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 # OF THE POSSIBILITY OF SUCH DAMAGE. 
 
-REQUIRE = cf
+REQUIRE = cf nx
 PREDICATES =
 
 ###############################################################################
 opt:: $(TEST_OPT_PROGRAMS)
 
 t.% : t/t_%.ml $(TEST_LIBS:%=%.cma)
-	$(OCAMLC) -o $@ $(CMO_OPT) $(DEBUG_OPT) $(TEST_LINKOPT) \
-        $(TEST_LIBS:%=%.cma) $<
+	$(OCAMLC) -o $@ $(CMO_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cma) $<
 
 t-opt.% : t/t_%.ml $(TEST_LIBS:%=%.cmxa)
 	$(OCAMLOPT) -o $@ $(CMX_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cmxa) $<
 DOC_SOURCES = $(IOM_MLI_FILES) $(IOM_ML_FILES)
 
 DOC_INCLUDE_CF = `ocamlfind query cf`
+DOC_INCLUDE_NX = `ocamlfind query nx`
+DOC_INCLUDES = $(DOC_INCLUDE_CF) $(DOC_INCLUDE_NX)
 
 doc::
 	@mkdir -p doc
-	$(OCAMLDOC) -v -d doc -html -colorize-code -m A $(DOC_SOURCES)
-
-#	ocamldoc.opt -v -d doc -html -colorize-code -m A \
-#	  -I $(DOC_INCLUDE_CF) $(DOC_SOURCES)
+	$(OCAMLDOC) -v -d doc -html -colorize-code -m A $(DOC_SOURCES) \
+	  -I $(DOC_INCLUDES) $(DOC_SOURCES)
 
 ###############################################################################
 

iom/iom_gadget.ml

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-type input = Cf_poll.more and output = unit
+type input = Nx_poll.more and output = unit
 type work = (input, output) Cf_gadget.work
 type gates = (input, output) Cf_gadget.gates
 type 'a guard = (input, output, 'a) Cf_gadget.guard
 open Cf_cmonad.Op
 
 type queue = <get: unit t> Queue.t
-type kernel = { k_poll_: Cf_poll.t; k_queue_: queue }
+type kernel = { k_poll_: Nx_poll.t; k_queue_: queue }
 
 let poll e k =
-    let e = (e :> unit t Cf_poll.event) in
+    let e = (e :> unit t Nx_poll.event) in
     e#load ~q:k.k_queue_ k.k_poll_
 
 let run =
             ingest1_ q
     and ingest0_ q =
         Cf_gadget.read >>= function
-        | Cf_poll.More ->
+        | Nx_poll.More ->
             ingest1_ q >>= fun () ->
             ingest0_ q
-        | Cf_poll.Last ->
+        | Nx_poll.Last ->
             Cf_cmonad.nil 
     in
     let rec runloop_ p w =
         match Lazy.force w with
         | Cf_flow.Z -> ()
         | Cf_flow.P ((), tl) -> assert (not true); runloop_ p tl
-        | Cf_flow.Q f -> runloop_ p (lazy (f (Cf_poll.cycle p)))
+        | Cf_flow.Q f -> runloop_ p (lazy (f (Nx_poll.cycle p)))
     in
     fun f ->
-        let p = Cf_poll.create () and q = Queue.create () in
+        let p = Nx_poll.create () and q = Queue.create () in
         let k = { k_poll_ = p; k_queue_ = q; } in
         let m = f k >>= fun () -> Cf_gadget.start (ingest0_ k.k_queue_) in
         let w = Cf_gadget.eval m in

iom/iom_gadget.mli

     [read] and [write] functions.
     
     Evaluating an I/O gadget monad results in the execution of an event loop
-    (using the [Cf_poll] module).  The underlying [Cf_flow] object conceptually
+    (using the [Nx_poll] module).  The underlying [Cf_flow] object conceptually
     reads I/O events from the system and writes commands to wait for more.
     
     An I/O gadget monad is a continuation monad initialized with an abstract
 (** Use [poll e k] to load the I/O event into the polling mux and event queue
     comprising the kernel [k].
 *)
-val poll: unit t #Cf_poll.event -> kernel -> unit
+val poll: unit t #Nx_poll.event -> kernel -> unit
 
 (** Use [run f s] to construct a new I/O event kernel, apply it to the function
     [f] to obtain a top-level process for the event loop, and start the process

iom/iom_octet_stream.ml

                 Cf_cmonad.return (consume_, buffer, mark)
     end
 
-let no_limits_emit_ = { low = 1; high = max_int }
+let no_limits_emitter_ = { low = 1; high = max_int }
 
-class ['x, 'y] emit ?limits waitTx xRx yTx =
+class ['x, 'y] emitter ?limits waitTx xRx yTx =
     let limits =
         match limits with
-        | None -> no_limits_emit_
+        | None -> no_limits_emitter_
         | Some limits -> normalize_limits limits
     in
     object(self)
                 Cf_cmonad.return (Cf_exnopt.U (s, obj))
     end
 
-let x_negative_adjust_ = "Iom_octet_stream.parse: negative adjustment!"
+let x_negative_adjust_ = "Iom_octet_stream.scanner: negative adjustment!"
 
-let no_limits_parse_ = { low = max_int; high = max_int }
+let no_limits_scanner_ = { low = max_int; high = max_int }
 
-class ['x, 'y] parse ?limits waitTx xRx yTx =
+class ['x, 'y] scanner ?limits waitTx xRx yTx =
     let limits =
         match limits with
-        | None -> no_limits_parse_
+        | None -> no_limits_scanner_
         | Some limits -> normalize_limits limits
     in
     object(self)
             match
                 try Cf_exnopt.U self#parse with x -> Cf_exnopt.X x
             with
-            | Cf_exnopt.X x ->
-                Cf_cmonad.return (Cf_exnopt.X x)
+            | Cf_exnopt.X _ as v ->
+                Cf_cmonad.return v
             | Cf_exnopt.U None ->
                 Cf_cmonad.return (Cf_exnopt.U ([], Some self))
             | Cf_exnopt.U (Some (s, len)) ->

iom/iom_octet_stream.mli

 (** Use [inherit \['x, 'y\] transform ~limits flowTx inRx outTx] to define a
     subclass for transforming streams of ['x] events received at [inRx] into
     streams of ['y] events transmitted at [outTx].  Conventionally, both ['x]
-    and ['y] are subtypes of [fragment], but this is not required.
+    and ['y] are subtypes of [fragment], but this is not required.  Logically,
+    events are represented by stream fragments, represented as a blocks of
+    octets and an optional sentinel indicating the end of the stream.
 *)
 class virtual ['x, 'y] transform:
     limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
             int -> (bool * Cf_message.substring Cf_deque.t * int) Iom_gadget.t
     end
 
-(** Use [inherit emit ?limits flowTx inRx outTx] to derive an octet stream
+(** Use [inherit emitter ?limits flowTx inRx outTx] to derive an octet stream
     transform class (for use with {!Iom_layer.osimplex} or
-    {!Iom_layer.duplex}).  Override the [self#fragment] and [self#push] methods
+    {!Iom_layer.duplex}).  Override the [self#fragment] and [self#emit] methods
     in the subtype to implement the discipline for producing events to output.
 *)
-class ['x, 'y] emit:
+class ['x, 'y] emitter:
     ?limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
     object('self)
 
         (** Override [self#fragment inEvent] to return the octet stream
             fragment components of the input event to the basis implementation
-            of [self#push].
+            of [self#push].  Basis method returns [Iom_stream.Last] and an
+            empty message fragment.
         *)
         method private fragment: 'x -> Iom_stream.more * Cf_message.t
         
         (** Override [self#emit] to examine the contents of the current buffer,
             identify all the output events that can be shifted, and the length
-            of the octets in the buffer to shift.
+            of the octets in the buffer to shift.  Basis method return an empty
+            list of events and zero length for the shift.
         *)
         method private emit: 'y list * int
 
         method shift: ('y list * 'self option) Cf_exnopt.t Iom_gadget.t
     end
 
-(** Use [inherit parse ?limits flowTx inRx outTx] to derive an octet stream
+(** Use [inherit scanner ?limits flowTx inRx outTx] to derive an octet stream
     transform class (for use with {!Iom_layer.isimplex} or
     {!Iom_layer.duplex}).  Override the [self#lex] and/or [self#parse] methods
     in the subtype to implement the discipline for producing events to output.
 *)
-class ['x, 'y] parse:
+class ['x, 'y] scanner:
     ?limits:limits -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
     'x #Iom_gadget.rx -> 'y #Iom_gadget.tx ->
     object('self)
         method private lex: Cf_message.t -> ('y * int) Cf_lex.t
         
         (** Parse the octet buffer by calling [self#lex] in a loop until no
-            more output objects are parsed.  Returns [None] if no output is
+            more output objects are scanned.  Returns [None] if no output is
             recognized, otherwise returns [Some (output, length)] where
             [output] is a list of events of type ['y] and [length] is the
             number octets to shift with [self#shift_octets] in the [shift]

iom/iom_reactor.ml

         inherit Iom_gadget.start as super
         inherit Iom_gadget.next
     	constraint 'control = [> Iom_stream.stop ]
-        constraint 'self = unit Iom_gadget.t #Cf_poll.event
+        constraint 'self = unit Iom_gadget.t #Nx_poll.event
 
         val mutable started_ = false
 
             let eopt = self#source in
             match eopt, source_ with
             | None, true ->
-                Cf_poll.Loaded p
+                Nx_poll.Loaded p
             | None, false ->
-                Cf_poll.Unloaded
+                Nx_poll.Unloaded
             | Some event, true ->
-                Cf_poll.Working (p, Iom_gadget.start (sourceTx#put event))
+                Nx_poll.Working (p, Iom_gadget.start (sourceTx#put event))
             | Some event, false ->
-                Cf_poll.Final (Iom_gadget.start (sourceTx#put event))
+                Nx_poll.Final (Iom_gadget.start (sourceTx#put event))
 
         method private stop =
             if source_ then self#unload;
 
 class ['control, 'signal] signaler ~k ~n plug = object
     inherit ['control, 'signal] source ~k plug
-    inherit [unit Iom_gadget.t] Cf_poll.signal n
+    inherit [unit Iom_gadget.t] Nx_poll.signal n
     constraint 'signal = [> signal ]
     method private source = Some (`Signal n)
 end
 
 class ['control, 'idle] idler ~k plug = object
     inherit ['control, 'idle] source ~k plug
-    inherit [unit Iom_gadget.t] Cf_poll.idle
+    inherit [unit Iom_gadget.t] Nx_poll.idle
     constraint 'idle = [> idle ]
     
     method private source =
 
 class ['control, 'time] timer ~k ?t0 ~dt plug = object
     inherit ['control, 'signal] source ~k plug as super
-    inherit [unit Iom_gadget.t] Cf_poll.time ?t0 dt
+    inherit [unit Iom_gadget.t] Nx_poll.time ?t0 dt
     constraint 'time = [> time ]
     
     method private source = Some (`Time epoch_)
         false
 
 class virtual ['fragment, 'control, 'notify] reader_aux
-    ~k ?(rwx: Cf_poll.rwx = `R) ~fd ~limits plug =
+    ~k ?(rwx: Nx_poll.rwx = `R) ~fd ~limits plug =
     let limits = Iom_octet_stream.normalize_limits limits in
     let fragmentTx, (controlRx, notifyTx) = plug in
     let notifyTx = (notifyTx : 'notify Iom_gadget.tx) in
     let plug = (controlRx, fragmentTx) in
     object(self:'self)
         inherit ['control, 'fragment] source ~k plug as source
-        inherit [unit Iom_gadget.t] Cf_poll.file rwx fd as poll
+        inherit [unit Iom_gadget.t] Nx_poll.file rwx fd as poll
 
         constraint 'fragment = #Iom_octet_stream.fragment
         constraint 'notify = [> Iom_stream.failed ]
             try
                 match self#source with
                 | None ->
-                    Cf_poll.Loaded p
+                    Nx_poll.Loaded p
                 | Some fragment ->
                     let more = fragment#more in
                     let out = fragmentTx#put fragment in
                     match more with
-                    | Iom_stream.More -> Cf_poll.Working (p, out)
-                    | Iom_stream.Last -> Cf_poll.Final out
+                    | Iom_stream.More -> Nx_poll.Working (p, out)
+                    | Iom_stream.Last -> Nx_poll.Final out
             with
             | Unix.Unix_error (code, _, _) when retry_needed code ->
-                Cf_poll.Loaded p
+                Nx_poll.Loaded p
             | x ->
-                Cf_poll.Final (self#fail x)
+                Nx_poll.Final (self#fail x)
         
         method start =
             try
     let notifyTx = (notifyTx : 'notify Iom_gadget.tx) in
     object(self:'self)
         inherit ['control] core controlRx as core
-        inherit [unit Iom_gadget.t] Cf_poll.file `W fd as poll
+        inherit [unit Iom_gadget.t] Nx_poll.file `W fd as poll
 
         constraint 'fragment = #Iom_octet_stream.fragment
         constraint 'notify = [> notify ]
             try
                 self#consume;
                 if Cf_deque.empty buffer_ then begin
-                    Cf_poll.Final begin
+                    Nx_poll.Final begin
                         notifyTx#put `Ready >>= fun () ->
                         if more_ = Iom_stream.Last then
                             self#complete
                     end
                 end
                 else
-                    Cf_poll.Loaded p
+                    Nx_poll.Loaded p
             with
             | x ->
-                Cf_poll.Final (self#fail x)
+                Nx_poll.Final (self#fail x)
     	
     	method private guard =
     		controlRx#get self#control >>= fun ()  ->

iom/iom_reactor.mli

 
 (** {6 Overview}
 
-    The {!Cf_poll} imperative event loop mechanism is lifted into a suite of
+    The {!Nx_poll} imperative event loop mechanism is lifted into a suite of
     I/O gadgets.  Compositions may use them to respond to process signals, loop
     idling conditions, timer events and file descriptor read/write/exception
     events discoverable through {!Unix.select}.
 
 (** The core gadget for I/O reactors, comprising the basic control interface
     for stream gadgets and a type constraint on subtypes requiring the
-    inheritance of the {!Cf_poll.event} class.  {b Note well:} these gadgets
+    inheritance of the {!Nx_poll.event} class.  {b Note well:} these gadgets
     state objects are mutable, so care must be taken when writing derived
     classes to consider the side effects of mutation and not violate the
     requirements of the I/O gadget monad.
         inherit Iom_gadget.start
         inherit Iom_gadget.next
     	constraint 'control = [> Iom_stream.stop ]
-        constraint 'self = unit Iom_gadget.t #Cf_poll.event
+        constraint 'self = unit Iom_gadget.t #Nx_poll.event
 
         (** Set [true] when the gadget is started.  Because reactor state is
             mutable, attempts to start more than one gadget with the same
             reload the polling event and send the message (if available) on the
             source wire.
         *)
-        method private service: Cf_poll.t -> unit Iom_gadget.t Cf_poll.state
+        method private service: Nx_poll.t -> unit Iom_gadget.t Nx_poll.state
         
         (** Load the polling event and set [source_] to [true]. *)
     	method private ready: unit Iom_gadget.t
     k:Iom_gadget.kernel -> n:int -> ('control, 'signal) Iom_gadget.plug ->
     object
         inherit ['control, 'signal] source
-        inherit [unit Iom_gadget.t] Cf_poll.signal
+        inherit [unit Iom_gadget.t] Nx_poll.signal
         constraint 'signal = [> signal ]
         
         (** Returns [Some (`Signal n)]. *)
     k:Iom_gadget.kernel -> ('control, 'idle) Iom_gadget.plug ->
     object
         inherit ['control, 'idle] source
-        inherit [unit Iom_gadget.t] Cf_poll.idle
+        inherit [unit Iom_gadget.t] Nx_poll.idle
         constraint 'idle = [> idle ]
         
         (** Returns [Some (`Idle epoch)] if [epoch_] is not [None], otherwise
     ('control, 'time) Iom_gadget.plug ->
     object
         inherit ['control, 'time] source
-        inherit [unit Iom_gadget.t] Cf_poll.time
+        inherit [unit Iom_gadget.t] Nx_poll.time
         constraint 'time = [> time ]
         
         (** Returns [Some (`Time epoch_)]. *)
     polling event indicates it is ready.
 *)
 class virtual ['fragment, 'control, 'notify] reader_aux:
-    k:Iom_gadget.kernel ->  ?rwx:Cf_poll.rwx -> fd:Unix.file_descr ->
+    k:Iom_gadget.kernel ->  ?rwx:Nx_poll.rwx -> fd:Unix.file_descr ->
     limits:Iom_octet_stream.limits ->
     ('fragment, 'control, 'notify) Iom_stream.iplug ->
     object('self)
         inherit ['control, 'fragment] source
-        inherit [unit Iom_gadget.t] Cf_poll.file
+        inherit [unit Iom_gadget.t] Nx_poll.file
 
         constraint 'fragment = #Iom_octet_stream.fragment
         constraint 'notify = [> Iom_stream.failed ]
 
 (** A stream gadget that reads octet stream fragments with [Unix.read]. *)
 class ['control, 'notify] reader:
-    k:Iom_gadget.kernel -> ?rwx:Cf_poll.rwx -> fd:Unix.file_descr ->
+    k:Iom_gadget.kernel -> ?rwx:Nx_poll.rwx -> fd:Unix.file_descr ->
     limits:Iom_octet_stream.limits ->
     (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.iplug ->
     object
     ('fragment, 'control, 'notify) Iom_stream.oplug ->
     object('self)
         inherit ['control] core
-        inherit [unit Iom_gadget.t] Cf_poll.file
+        inherit [unit Iom_gadget.t] Nx_poll.file
 
         constraint 'notify = [> notify ]
         constraint 'fragment = #Iom_octet_stream.fragment
             exception arises or the file descriptor blocks.  Notifies [`Ready]
             if the queue drains completely.
         *)
-        method private service: Cf_poll.t -> unit Iom_gadget.t Cf_poll.state
+        method private service: Nx_poll.t -> unit Iom_gadget.t Nx_poll.state
         
         (** Receive an octet stream fragment.  If the buffer queue is currently
             empty, the fragment is pushed into the buffer and [self#consume] is

iom/iom_sock_stream.ml

 
 class ['af] connection more socket = object
     method more: Iom_stream.more = more
-    method socket: ('af, [ `SOCK_STREAM ]) Cf_socket.t = socket
+    method socket: ('af, [ `SOCK_STREAM ]) Nx_socket.t = socket
 end
 
 class virtual ['control, 'notify, 'connect] connector ~k ~s:socket ~rwx plug =
     let _ = (socket : 'socket) in
-    let fd = Cf_socket.to_unix_file_descr socket in
+    let fd = Nx_socket.to_unix_file_descr socket in
     let connectTx, (controlRx, notifyTx) = plug in
     let notifyTx = (notifyTx : 'notify Iom_gadget.tx) in
     let plug = (controlRx, connectTx) in
     object(self:'self)
         inherit ['control, 'connect] Iom_reactor.source ~k plug as super
-        inherit [unit Iom_gadget.t] Cf_poll.file rwx fd as poll
+        inherit [unit Iom_gadget.t] Nx_poll.file rwx fd as poll
 
-        constraint 'socket = ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t
+        constraint 'socket = ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t
         constraint 'connect = 'af #connection
         
         val mutable initiated_ = false
         method virtual private bind: unit
                 
         method private stop =
-            Cf_socket.close socket;
+            Nx_socket.close socket;
             super#stop
 
         method private fail: 'a. exn -> 'a Iom_gadget.t = fun x ->
         method private service p =
             try
                 match super#service p with
-                | Cf_poll.Working (_, out) -> Cf_poll.Final out
+                | Nx_poll.Working (_, out) -> Nx_poll.Final out
                 | s -> s
             with
             | x ->
-                Cf_poll.Final (self#fail x)
+                Nx_poll.Final (self#fail x)
         
         method private ready =
             match initiated_ with
             try
                 ignore self#source;
                 assert (not true);
-                Cf_poll.Unloaded
+                Nx_poll.Unloaded
             with
             | Unix.Unix_error (code, _, _)
               when Iom_reactor.retry_needed code ->
                 let out = seqput_ stack_ connectTx in
                 stack_ <- [];
-                Cf_poll.Working (p, out)
+                Nx_poll.Working (p, out)
             | x ->
                 let out =
                     seqput_ stack_ connectTx >>= fun () ->
                 stack_ <- [];
                 match x with
                 | Unix.Unix_error ((Unix.EMFILE | Unix.ENFILE), _, _) ->
-                    Cf_poll.Working (p, out)
+                    Nx_poll.Working (p, out)
                 | _ ->
-                    Cf_poll.Final out
+                    Nx_poll.Final out
 
         method private ready =
             match initiated_ with
                 super#ready
             | false ->
                 try
-                    Cf_socket.listen s backlog;
+                    Nx_socket.listen s backlog;
                     let address = self#getsockname in
                     notifyTx#put (`Listen address) >>= fun () ->
                     super#ready
     end
 
 class ['control, 'notify] receiver ~k ~s:socket ~limits plug =
-    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
-    let fd = Cf_socket.to_unix_file_descr socket in
+    let _ = (socket : ('af, [ `SOCK_STREAM ]) Nx_socket.t) in
+    let fd = Nx_socket.to_unix_file_descr socket in
     let fragmentTx, (_, notifyTx) = plug in
     object(self)
         inherit ['control, 'notify] Iom_reactor.reader ~k ~fd ~limits plug
 
         method private octets n =
             let v =
-                Cf_socket.recv socket buffer_ cursor_ n
-                    Cf_socket.msg_flags_none
+                Nx_socket.recv socket buffer_ cursor_ n
+                    Nx_socket.msg_flags_none
             in
             if v = 0 then raise End_of_file;
             v
             try
                 match self#source with
                 | None ->
-                    Cf_poll.Loaded p
+                    Nx_poll.Loaded p
                 | Some fragment ->
                     let more = fragment#more in
                     let out = fragmentTx#put fragment in
                     match more with
                     | Iom_stream.More ->
-                        Cf_poll.Working (p, out)
+                        Nx_poll.Working (p, out)
                     | Iom_stream.Last ->
                         let out = out >>= fun () -> notifyTx#put `Complete in
-                        Cf_poll.Final out
+                        Nx_poll.Final out
             with
             | Unix.Unix_error (code, _, _)
               when Iom_reactor.retry_needed code ->
-                Cf_poll.Loaded p
+                Nx_poll.Loaded p
             | x ->
-                Cf_poll.Final (notifyTx#put (`Failed x))
+                Nx_poll.Final (notifyTx#put (`Failed x))
     end
 
 class ['control, 'notify] sender ~k ~s:socket plug =
-    let _ = (socket : ('af, [ `SOCK_STREAM ]) Cf_socket.t) in
-    let fd = Cf_socket.to_unix_file_descr socket in
+    let _ = (socket : ('af, [ `SOCK_STREAM ]) Nx_socket.t) in
+    let fd = Nx_socket.to_unix_file_descr socket in
     object
         inherit [Iom_octet_stream.fragment, 'control, 'notify]
             Iom_reactor.writer ~k ~fd plug as super
         
         method private octets buf pos len =
-            Cf_socket.send socket buf pos len Cf_socket.msg_flags_none
+            Nx_socket.send socket buf pos len Nx_socket.msg_flags_none
 
         method private complete =
             Unix.shutdown fd Unix.SHUTDOWN_SEND;
         constraint 'i = #Iom_octet_stream.fragment
         constraint 'o = #Iom_octet_stream.fragment
         constraint 'state = ('ci, 'co, 'ni, 'no, 'x, 'cx, 'nx) state
-        constraint 'st = Cf_socket.SOCK_STREAM.tag
+        constraint 'st = Nx_socket.SOCK_STREAM.tag
 
         method virtual private initiator:
-            src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
+            src:'af Nx_socket.sockaddr -> dst:'af Nx_socket.sockaddr ->
             ('x, 'cx, 'nx) Iom_stream.iplug -> #Iom_gadget.start
         
         method private state0 =
             constraint 'connect = 'af #connection_aux
         
             method virtual private aux:
-                ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
+                ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t -> 'connect
         
-            method private bind = Cf_socket.bind s src
+            method private bind = Nx_socket.bind s src
             
             method private connect =
                 begin
-                    try Cf_socket.connect s dst
+                    try Nx_socket.connect s dst
                     with Unix.Unix_error (Unix.EISCONN, _, _) -> ()
                 end;
                 self#aux s
             constraint 'connect = 'af #connection_aux
 
             method virtual private aux:
-                ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
+                ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t -> 'connect
 
-            method private bind = Cf_socket.bind s src
+            method private bind = Nx_socket.bind s src
 
             method private accept =
-                let s, _ = Cf_socket.accept s in
+                let s, _ = Nx_socket.accept s in
                 self#aux s
         end
 
     Iom_stream.ijack
 
 type 'af listener_jack =
-    ('af connection, Iom_stream.flowcontrol, 'af Cf_socket.sockaddr
+    ('af connection, Iom_stream.flowcontrol, 'af Nx_socket.sockaddr
     listener_notify) Iom_stream.ijack
 
 class ['af] initiator0 ~k ~s ~src ~dst plug = object
     Iom_stream.ijack (new initiator0 ~k ~s ~src ~dst)
 
 class ['af] listener0 ~k ~s ~src ~backlog plug = object
-    inherit [Iom_stream.flowcontrol, 'af Cf_socket.sockaddr listener_notify,
+    inherit [Iom_stream.flowcontrol, 'af Nx_socket.sockaddr listener_notify,
         'af connection] Aux.listener_aux ~k ~s ~backlog ~src plug
         
-    method getsockname = Cf_socket.getsockname s
+    method getsockname = Nx_socket.getsockname s
     method private aux s = new connection Iom_stream.More s
 end
 
     Iom_stream.iojack (new endpoint0 ~k ~s ?addrs ~limits)
 
 module type T = sig
-    include Iom_socket.T with module P.ST = Cf_socket.SOCK_STREAM
+    include Iom_socket.T with module P.ST = Nx_socket.SOCK_STREAM
         
     class connection: Iom_stream.more -> t -> object
         inherit [P.AF.tag] Aux.connection_aux
         Iom_gadget.kernel -> endpoint Iom_gadget.t
 end
 
-module Create(P: Cf_socket.P with module ST = Cf_socket.SOCK_STREAM) = struct
+module Create(P: Nx_socket.P with module ST = Nx_socket.SOCK_STREAM) = struct
     include Iom_socket.Create(P)
         
     class connection more socket =
-        let local = P.AF.of_sockaddr (Cf_socket.getsockname socket) in
-        let remote = P.AF.of_sockaddr (Cf_socket.getpeername socket) in
+        let local = P.AF.of_sockaddr (Nx_socket.getsockname socket) in
+        let remote = P.AF.of_sockaddr (Nx_socket.getpeername socket) in
         object
             inherit [P.AF.tag] Aux.connection_aux more socket
             
     type endpoint = endpoint_jack
     
     let socket () = 
-        (Cf_socket.create P.AF.domain P.ST.socktype P.protocol : t)
+        (Nx_socket.create P.AF.domain P.ST.socktype P.protocol : t)
     
     let addrsoptmap_ = function
         | None ->
             inherit [Iom_stream.flowcontrol, 'address listener_notify,
                 connection] Aux.listener_aux ~k ~s ~backlog ~src plug
                 
-            method getsockname = P.AF.of_sockaddr (Cf_socket.getsockname s)
+            method getsockname = P.AF.of_sockaddr (Nx_socket.getsockname s)
             method private aux s = new connection Iom_stream.More s
         end
 

iom/iom_sock_stream.mli

 
 (** An I/O stream event representing a connection event. *)
 class ['af] connection:
-    Iom_stream.more -> ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    Iom_stream.more -> ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
     object
         (** Flag to indicate whether more connections are expected. *)
         method more: Iom_stream.more
         
         (** The socket descriptor. *)
-        method socket: ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t
+        method socket: ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t
     end
 
 (** A listener has started at the specified address. *)
 
 (** The jack for controlling a listener. *)
 type 'af listener_jack =
-    ('af connection, Iom_stream.flowcontrol, 'af Cf_socket.sockaddr
+    ('af connection, Iom_stream.flowcontrol, 'af Nx_socket.sockaddr
     listener_notify) Iom_stream.ijack
 
 (** Evaluate [initiator ~s ~src ~dst k] to obtain the initiator jack for
     kernel context [k] to the destination address [dst].
 *)
 val initiator:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
+    s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
+    src:'af Nx_socket.sockaddr -> dst:'af Nx_socket.sockaddr ->
     Iom_gadget.kernel -> 'af initiator_jack Iom_gadget.t
 
 (** Evaluate [listener ~s ~src ~backlog k] to obtain the listener jack for
     depth of [backlog] and accepting connections in the kernel context [k].
 *)
 val listener:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 
-    src:'af Cf_socket.sockaddr -> backlog:int -> Iom_gadget.kernel ->
+    s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t -> 
+    src:'af Nx_socket.sockaddr -> backlog:int -> Iom_gadget.kernel ->
     'af listener_jack Iom_gadget.t
 
 (** Evaluate [endpoint ~s ?addrs ~limits k] to obtain the endpoint jack for
     send.
 *)
 val endpoint:
-    s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    ?addrs:('af Cf_socket.sockaddr * 'af Cf_socket.sockaddr) ->
+    s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
+    ?addrs:('af Nx_socket.sockaddr * 'af Nx_socket.sockaddr) ->
     limits:Iom_octet_stream.limits -> Iom_gadget.kernel ->
     endpoint_jack Iom_gadget.t
 
 
 (** Use [inherit connector ~k ~s ~rwx plug] to derive a subclass gadget that
     responds to connection events on the socket [s] in the kernel context [k].
-    The [rwx] parameter identifies the sort of {!Cf_poll.file} required.  The
-    base class inherits from {!Iom_reactor.source} and {!Cf_poll.file}.
+    The [rwx] parameter identifies the sort of {!Nx_poll.file} required.  The
+    base class inherits from {!Iom_reactor.source} and {!Nx_poll.file}.
 *)
 class virtual ['control, 'notify, 'connect] connector:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    rwx:[< Cf_poll.rwx ] -> ('connect, 'control, 'notify) Iom_stream.iplug ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
+    rwx:[< Nx_poll.rwx ] -> ('connect, 'control, 'notify) Iom_stream.iplug ->
     object
         inherit ['control, 'connect] Iom_reactor.source
-        inherit [unit Iom_gadget.t] Cf_poll.file
+        inherit [unit Iom_gadget.t] Nx_poll.file
 
         constraint 'control = [> Iom_stream.flowcontrol ]
         constraint 'notify = [> Iom_stream.failed ]
     address in the kernel context [k] with the socket [s].
 *)
 class virtual ['control, 'notify, 'connect] initiator:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
     ('connect, 'control, 'notify) Iom_stream.iplug ->
     object
         inherit ['control, 'notify, 'connect] connector
     address in the kernel context [k] with the socket [s].
 *)
 class virtual ['control, 'notify, 'connect] listener:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
     backlog:int -> ('connect, 'control, 'notify) Iom_stream.iplug ->
     object
         inherit ['control, 'notify, 'connect] connector
 
 (** A receiver gadget for stream socket input. *)
 class ['control, 'notify] receiver:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
     limits:Iom_octet_stream.limits ->
     (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.iplug ->
     ['control, [> Iom_reactor.complete ] as 'notify] Iom_reactor.reader
 
 (** A sender gadget for stream socket output. *)
 class ['control, 'notify] sender:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
     (Iom_octet_stream.fragment, 'control, 'notify) Iom_stream.oplug ->
     [Iom_octet_stream.fragment, 'control, 'notify] Iom_reactor.writer
 
     the socket address family.
 *)
 class virtual ['i, 'o, 'control, 'notify, 'af] endpoint:
-    k:Iom_gadget.kernel -> s:('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t ->
-    ?addrs:('af Cf_socket.sockaddr * 'af Cf_socket.sockaddr) ->
+    k:Iom_gadget.kernel -> s:('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t ->
+    ?addrs:('af Nx_socket.sockaddr * 'af Nx_socket.sockaddr) ->
     ('i, 'o, 'control, 'notify) Iom_stream.ioplug ->
     object
         inherit ['i, 'o, 'control, 'notify, 'state, 'af, 'st]
         constraint 'i = #Iom_octet_stream.fragment
         constraint 'o = #Iom_octet_stream.fragment
         constraint 'state = ('ci, 'co, 'ni, 'no, 'x, 'cx, 'nx) state
-        constraint 'st = Cf_socket.SOCK_STREAM.tag
+        constraint 'st = Nx_socket.SOCK_STREAM.tag
 
         (** Called in [self#state0] (unless [?addrs] is [None]) to construct
             an initiator gadget for the source and destination address with
             to [`S_initiating].
         *)
         method virtual private initiator:
-            src:'af Cf_socket.sockaddr -> dst:'af Cf_socket.sockaddr ->
+            src:'af Nx_socket.sockaddr -> dst:'af Nx_socket.sockaddr ->
             ('x, 'cx, 'nx) Iom_stream.iplug -> #Iom_gadget.start
     end
 
 
         (** Called in [self#connect] to produce a [connection] object. *)
         method virtual private aux:
-            ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
+            ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t -> 'connect
     end
 
     (** A subclass type of the core [listener] class type that defines a
         constraint 'connect = 'af #connection_aux
 
         method virtual private aux:
-            ('af, Cf_socket.SOCK_STREAM.tag) Cf_socket.t -> 'connect
+            ('af, Nx_socket.SOCK_STREAM.tag) Nx_socket.t -> 'connect
     end
 
     (** A synonym for the core [endpoint] class. *)
 
 (** The signature of modules produced by the [Create] functor. *)
 module type T = sig
-    include Iom_socket.T with module P.ST = Cf_socket.SOCK_STREAM
+    include Iom_socket.T with module P.ST = Nx_socket.SOCK_STREAM
     
     (** The class of connection objects. *)
     class connection: Iom_stream.more -> t -> object
 (** Use [Create(P)] to compose a module of stream socket gadgets specialized
     for the address family of the socket module [P].
 *)
-module Create(P: Cf_socket.P with module ST = Cf_socket.SOCK_STREAM):
+module Create(P: Nx_socket.P with module ST = Nx_socket.SOCK_STREAM):
     T with module P = P
 
 (*--- End of File [ iom_sock_stream.mli ] ---*)

iom/iom_socket.ml

         method private state0 = self#establish
         
         method private close: 'a. 'a Iom_gadget.t =
-            Cf_socket.close s;
+            Nx_socket.close s;
             Iom_gadget.abort
         
         method private reset: 'a. 'a Iom_gadget.t =
-            let fd = Cf_socket.to_unix_file_descr s in
+            let fd = Nx_socket.to_unix_file_descr s in
             Unix.shutdown fd Unix.SHUTDOWN_ALL;
             self#close
         
     end
 
 module type T = sig
-    module P: Cf_socket.P
-    type t = (P.AF.tag, P.ST.tag) Cf_socket.t
+    module P: Nx_socket.P
+    type t = (P.AF.tag, P.ST.tag) Nx_socket.t
     type address = P.AF.address
     val unspecified_address: address
 end
 
-module Create(P: Cf_socket.P) = struct
+module Create(P: Nx_socket.P) = struct
     module P = P
-    type t = (P.AF.tag, P.ST.tag) Cf_socket.t
+    type t = (P.AF.tag, P.ST.tag) Nx_socket.t
     type address = P.AF.address
     let unspecified_address = P.AF.of_sockaddr P.AF.unspecified
 end

iom/iom_socket.mli

     endpoint control semantics required by the type of socket [s].
 *)
 class virtual ['i, 'o, 'control, 'notify, 'state, 'af, 'st] endpoint :
-    k:Iom_gadget.kernel -> s:('af, 'st) Cf_socket.t ->
+    k:Iom_gadget.kernel -> s:('af, 'st) Nx_socket.t ->
     ('i, 'o, 'control, 'notify) Iom_stream.ioplug ->
     object
         inherit Iom_gadget.start
 module type T = sig
 
     (** The socket module. *)
-    module P: Cf_socket.P
+    module P: Nx_socket.P
     
     (** The type of socket descriptors. *)
-    type t = (P.AF.tag, P.ST.tag) Cf_socket.t
+    type t = (P.AF.tag, P.ST.tag) Nx_socket.t
     
     (** The type of socket addresses. *)
     type address = P.AF.address
 (** Use [Create(P)] to compose a module for including in modules for specific
     socket types that comprises the core definitions for socket I/O gadgets.
 *)
-module Create(P: Cf_socket.P): T with module P = P
+module Create(P: Nx_socket.P): T with module P = P
 
 (*--- End of File [ iom_socket.mli ] ---*)

iom/iom_tcp4_socket.ml

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-include Iom_sock_stream.Create(Cf_ip4_proto.TCP)
+include Iom_sock_stream.Create(Nx_ip4_proto.TCP)
 
 (*--- End of File [ iom_tcp4_socket.ml ] ---*)

iom/iom_tcp4_socket.mli

 
 (** Reactor for TCP (with IPv4). *)
 
-include Iom_sock_stream.T with module P = Cf_ip4_proto.TCP
+include Iom_sock_stream.T with module P = Nx_ip4_proto.TCP
 
 (*--- End of File [ iom_tcp4_socket.mli ] ---*)

iom/iom_tcp6_socket.ml

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-include Iom_sock_stream.Create(Cf_ip6_proto.TCP)
+include Iom_sock_stream.Create(Nx_ip6_proto.TCP)
 
 (*--- End of File [ iom_tcp6_socket.ml ] ---*)

iom/iom_tcp6_socket.mli

 
 (** Reactor for TCP (with IPv6). *)
 
-include Iom_sock_stream.T with module P = Cf_ip6_proto.TCP
+include Iom_sock_stream.T with module P = Nx_ip6_proto.TCP
 
 (*--- End of File [ iom_tcp6_socket.mli ] ---*)
     end
     
     class parse flowTx readRx okayTx = object
-        inherit [Iom_octet_stream.fragment, unit] Iom_octet_stream.parse
+        inherit [Iom_octet_stream.fragment, unit] Iom_octet_stream.scanner
             flowTx readRx okayTx as super
         
         val lex_ = F.lex ()
     let limits0 = { Iom_octet_stream.low = 1; high = 5 }
     
     class emit flowTx dataRx writeTx = object(self)
-        inherit [string, Iom_octet_stream.fragment] Iom_octet_stream.emit
+        inherit [string, Iom_octet_stream.fragment] Iom_octet_stream.emitter
             ~limits:limits0 flowTx dataRx writeTx as super
         
         method private fragment x = Iom_stream.Last, (Cf_message.create x)