1. james woodyatt
  2. oni

Commits

jhwoodyatt  committed 9464ebb

Checkpoint. Add comments for ocamldoc generator, and a new README file.

  • Participants
  • Parent commits cbcb91e
  • Branches default

Comments (0)

Files changed (11)

File iom/CHANGES

View file
  • Ignore whitespace
 
 
 --j h woodyatt <jhw@wetware.com>
+  san francisco, ca

File iom/ISSUES

View file
  • Ignore whitespace
 Open issues in development:
 
++ (unspecified): Support needed for more types of sockets.  Right now, we only
+    have support for [AF_INET] and [AF_INET6] address families, and the
+    [SOCK_STREAM] type, i.e. TCP for IPv4 and IPv6.  Need support for UDP and
+    for [AF_LOCAL] address families.
+
 + (Iom_reactor_fork): A [fork/wait] reactor would be cool.  While we're
     dreaming, it would also be nice to have pipes and socketpairs, complete
     with file descriptor passing.  Oh, and [popen] is quite desirable.

File iom/Makefile

View file
  • Ignore whitespace
 iom.cmxa iom.a : $(IOM_CMI_FILES) $(IOM_CMX_FILES)
 	$(OCAMLOPT) -a -o $@ $(CMXA_OPT) $(IOM_CMX_FILES)
 
-install:: $(IOM_P_LIB_FILES) iom.cma iom.cmxa iom.a
+install:: $(IOM_P_LIB_FILES) iom.cma
 	{ test ! -f iom.cmxa || extra="iom.cmxa iom.a"; }; \
 	ocamlfind install iom \
 	  $(IOM_MLI_FILES) $(IOM_CMI_FILES) $(IOM_P_H_FILES) \

File iom/README

View file
  • Ignore whitespace
+===== Pagoda Core Foundation (cf) library =====
+
+This distribution is the Objective Caml Network Application Environment (NAE)
+Core Foundation library, which implements functional composition of reactive
+network I/O.
+
+Note: see the ISSUES file for a list of open problems in this release.
+
+===== Required Components =====
+
+This library requires the following external components:
+
+- Objective Caml (v3.07+2 or newer)
+- Findlib (tested with v0.8.1 and v1.0.4)
+- OCaml NAE Core Foundation (cf-0.4)
+
+Principle development was on Mac OS X 10.3.  The final version of this
+library also compiled successfully and passed all self-tests without warnings
+on Suse Linux 9.0 for x86-32.  Other platforms with POSIX-like environments
+should require only a minimal porting effort.
+
+===== Building =====
+
+No 'configure' script is provided.  Compile the library with:
+
+    $ make default
+
+Compile both bytecode and native versions with:
+
+    $ make default opt
+
+Execute tests for byte and native versions with:
+
+    $ make test test.opt
+
+Install the library with ocamlfind using:
+
+    $ make install
+
+Uninstall the library with ocamlfind using:
+
+    $ make uninstall
+
+Make the reference documentation with ocamldoc using:
+
+    $ make doc
+
+
+--j h woodyatt <jhw@wetware.com>
+  San Francisco, CA

File iom/iom_gadget.ml

View file
  • Ignore whitespace
 let wire = Cf_gadget.wire
 let null = Cf_gadget.null
 
-type 'a simplex_t = 'a rx * 'a tx
 let fsimplex = Cf_gadget.fsimplex
 let simplex = Cf_gadget.simplex
-
-type ('a, 'b) duplex_t = ('a rx * 'b tx) * ('b rx * 'a tx)
 let duplex = Cf_gadget.duplex
 
 let load = Cf_gadget.load

File iom/iom_gadget.mli

View file
  • Ignore whitespace
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** I/O gadget monad.  *)
+
+(** {6 Overview}
+
+    A specialization of [Cf_gadget] in the {b Cf} library, specifically
+    targeted to processing a stream of I/O events in concurrent,
+    single-threaded network services.  Evaluating an I/O gadget monad results
+    in the execution of an event loop (using the [Cf_poll] module).  The
+    underlying [Cf_flow] object conceptually reads I/O events from the system
+    and writes commands to wait for more.
+    
+    An I/O gadget monad is a continuation monad initialized with an abstract
+    unit of work for the scheduler.  Monadic functions are defined for the
+    following basic operations:
+    - starting a new I/O gadget process;
+    - creating a new wire (with or without endpoints);
+    - loading, storing and modifying a process state;
+    - sending an event on a wire;
+    - guarding a process on the reception of events from a sequence of wires;
+    - scheduling work to be performed when an I/O event occurs.
+
+*)
+
+(** {6 Types} *)
+
+(** The abstract type of a unit of work in the scheduler. *)
 type 's work_t
+
+(** The abstract type of a process block. *)
 type 's gate_t
+
+(** The abstract type of an event wire. *)
 type 's wire_t
 
+(** A continuation monad for composing logical sequences of scheduling work
+    within a specific process.  The scheduler may interleave units of work from
+    different processes in any order.
+*)
 type ('s, 'a) t = ('s work_t, 'a) Cf_cmonad.t
+
+(** A continuation monad for composing sequences of wires to block a process
+    when waiting for an event.
+*)
 type ('s, 'a) guard_t = ('s gate_t, 'a) Cf_cmonad.t
 
+(** {6 Functions and Classes} *)
+
+(** Use [start g s] to start a new process on the gadget [g] with the initial
+    state [s].
+*)
 val start: ('s0, unit) t -> 's0 -> ('s1, unit) t
+
+(** Use [guard z] to evaluate the guard continuation monad [z] to block the
+    current process until an event is received on a wires in the sequence used
+    in the composition of the continuation.  If none of the wires in the guard
+    are still connected to processes, then the process terminates.
+*)
 val guard: ('s, unit) guard_t -> ('s, unit) t
 
+(** Use [wire >>= f] to get a new wire from the scheduler and apply it to the
+    bound function [f].
+*)
 val wire: ('s, 'a wire_t) t
+
+(** The null wire.  Events sent on the null wire are discarded immediately, and
+    guards on the null wire never receive any events.
+*)
 val null: 'a wire_t
 
+(** The class type of wire connectors.  Both receivers and transmitters are
+    subtypes of this class type.
+*)
 class type connector =
     object
+        (** For a connector [c], [c#check] returns [false] if the other end of
+            the wire has been collected by the garbage collector.  If [c#check]
+            returns [true], then the other end of the wire may still be
+            connected to a process.
+        *)
         method check: bool
+        
+        (** This methods returns a string that may be used to distinguish the
+            underlying wire from other wires in the same scheduler.
+            
+            {b Warning}: {i in long running processes, the identifier number
+            used to construct this string may overflow its representation.  In
+            this event, no attempt is made to keep from assigning the same
+            identifier to more than one wire simultaneously.}
+        *)
         method id: string
     end
 
+(** The class of connectors used for receiving events.  Use [new rx w] to
+    construct a receive connector for the wire [w].
+*)
 class ['a] rx:
     'a wire_t ->
     object
         inherit connector
+        
+        (** Produce a guard to receive an event on the underlying wire and
+            dispatch it to the given function.  Use the continuation monad
+            binding operator [( >>= )] to produce a guard for a sequence of
+            wires.
+        *)
         method get: 's. ('a -> ('s, unit) t) -> ('s, unit) guard_t
     end
 
+(** The class of connectors used for sending events.  Use [new tx w] to
+    construct a transmit connector for the wire [w].
+*)
 class ['a] tx:
     'a wire_t ->
     object
         inherit connector
+        
+        (** Send the given event on the underlying wire. *)
         method put: 's. 'a -> ('s, unit) t
     end
 
-type 'a simplex_t = 'a rx * 'a tx
+(** Use [fsimplex ~f >>= g] to get a new wire from the scheduler and apply it
+    to the function [f] to get a receive and transmit connector pair to apply
+    to the bound function [g].
+*)
+val fsimplex: f:('a wire_t -> 'a #rx * 'a #tx) -> ('s, 'a rx * 'a tx) t
 
-val fsimplex: f:('a wire_t -> 'a #rx * 'a #tx) -> ('s, 'a rx * 'a tx) t
+(** Use [simplex >>= f] to get a new wire from the scheduler, construct a
+    receive and transmit connector pair, and apply it to the bound function
+    [f].
+*)
 val simplex: ('s, 'a rx * 'a tx) t
 
-type ('a, 'b) duplex_t = ('a rx * 'b tx) * ('b rx * 'a tx)
+(** Use [duplex >>= f] to get a pair of new wires from the scheduler, construct
+    a {i pair} of receive and transmit connector pairs, one for each wire, and
+    apply it to the bound function [f].
+*)
+val duplex: ('s, ('a rx * 'b tx) * ('b rx * 'a tx)) t
 
-val duplex: ('s, ('a, 'b) duplex_t) t
+(** Use [load >>= f] to load the internal state of the current process and
+    apply it to the bound function [f].
+*)
+val load: ('s, 's) t
 
-val load: ('s, 's) t
+(** Use [store s] to store the value [s] as the new internal state of the
+    current process.
+*)
 val store: 's -> ('s, unit) t
+
+(** Use [modify f] to load the internal state of the current process, apply it
+    to the function f to obtain a new state, and store the new state in the
+    current process.
+*)
 val modify: ('s -> 's) -> ('s, unit) t
 
+(** Use [wrap a b w] to start a new process that receives events from the
+    receive connector [a], sends event to the transmit connector [b], and
+    processes the stream of input and output events with the flow [w].
+*)
 val wrap: 'i #rx -> 'o #tx -> ('i, 'o) Cf_flow.t -> ('s, unit) t
 
+(** The abstract type of the system I/O event multiplexer. *)
 type kernel_t
 
+(** Use [run f s] to construct a new I/O event kernel, apply it to the function
+    [f] to obtain a top-level process for the event loop, and start the process
+    running with the initial state [s].
+*)
 val run: (kernel_t -> ('s, unit) t) -> 's -> unit
 
-type control_t = C_final | C_load | C_unload
+(** The type of events used for controlling I/O polling event reactors. *)
+type control_t =
+    | C_final   (** Terminate the reactor. *)
+    | C_load    (** Begin waiting for the event. *)
+    | C_unload  (** Stop waiting for the event. *)
 
+(** A reactor that waits for I/O events.  Use [poll ~e k] to construct a
+    reactor process that waits for the event [e] to occur and starts the
+    associated process (which usually sends an event on a transmit connector).
+    Use [poll ~c ~e k] tp construct a reactor process that waits for
+    {!control_t} events on the receiver [c], and loads/unloads the event [e]
+    in the kernel [k] in a loop until it receives the [C_final] event.
+*)
 val poll:
     ?c:control_t #rx -> e:(unit, unit) t #Cf_poll.event ->
     kernel_t -> ('s, unit) t

File iom/iom_reactor.mli

View file
  • Ignore whitespace
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Miscellaneous I/O reactors (signals, timeouts and idle). *)
+
+(** {6 Overview}
+
+    This module contains low-level I/O reactor functions useful for reacting
+    to timer events, system process signals, and event loop idle conditions.
+    
+    These functions are all specializations of {!Iom_gadget.poll} for each
+    subclass of {!Cf_poll.event}.
+*)
+
+(** {6 Types} *)
+
+(** The type of system signal events.  The integer is the signal number. *)
 type e_signal_t = [ `Signal of int ]
+
+(** The type of timer events.  The timestamp is the time of the event. *)
 type e_time_t = [ `Time of Cf_tai64n.t ]
+
+(** The type of event loop idle events.  The timestamp is the time when the
+    idle condition occured.
+*)
 type e_idle_t = [ `Idle of Cf_tai64n.t ]
 
+(** {6 Functions} *)
+
+(** Use [signal ?c ~r ~n k] to compose an I/O reactor that waits with the
+    kernel [k] for the system signal number [n] and sends [`Signal n] on the
+    transmitter [r].  The optional [?c] argument is passed unchanged to the
+    underlying call to {!Iom_gadget.poll}.
+*)
 val signal:
     ?c:Iom_gadget.control_t #Iom_gadget.rx ->
     r:[> e_signal_t ] #Iom_gadget.tx -> n:int -> Iom_gadget.kernel_t ->
     ('s, unit) Iom_gadget.t
 
+(** Use [time ?c ~r ~t k] to compose an I/O reactor that waits with the kernel
+    [k] for the system time [t] to arrive, and sends [`Time t] on the
+    transmitter [r].  The optional [?c] argument is passed unchanged to the
+    underlying call to {!Iom_gadget.poll}.
+*)
 val time:
     ?c:Iom_gadget.control_t #Iom_gadget.rx -> r:[> e_time_t ] #Iom_gadget.tx ->
     t:Cf_tai64n.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
 
+
+(** Use [idle ?c ~r k] to compose an I/O reactor that waits for the event loop
+    in the kernel [k] to become idle (causing the program to block for some
+    non-zero length of time for system events to occur), and sends [`Idle t] on
+    the transmitter [r], where [t] is the time at which the idle condition
+    occurs.  The optional [?c] argument is passed unchanged to the underlying
+    call to {!Iom_gadget.poll}.
+*)
 val idle:
     ?c:Iom_gadget.control_t #Iom_gadget.rx -> r:[> e_idle_t ] #Iom_gadget.tx ->
     Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t

File iom/iom_sock_stream.ml

View file
  • Ignore whitespace
     | IO_tx_release                 (* initiate orderly release *)
     | IO_tx_extend of int           (* extend application receive window *)
     | IO_tx_window of int           (* set application receive window *)
- (* | IO_tx_peek of (Cf_message.t -> int option) peek until extend permitted *)
     | IO_tx_reset                   (* abortive close *)
     | IO_tx_unlink                  (* flush output and stop input *)
 
         ('s, unit) Iom_gadget.t
     
     type listener_tx_t =
+        | L_limit of int
         | L_extend of int
-        | L_limit of int
         | L_close
     
     type listener_rx_t =
         | I_error of exn
 
     type listener_tx_t =
+        | L_limit of int
         | L_extend of int
-        | L_limit of int
         | L_close
     
     type listener_rx_t =

File iom/iom_sock_stream.mli

View file
  • Ignore whitespace
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Reactor for SOCK_STREAM sockets. *)
+
+(** {6 Overview}
+
+    This module implements a functorial interface for constructing reactors
+    for I/O on [SOCK_STREAM] sockets.  The {!Iom_tcp4_socket} and
+    {!Iom_tcp6_socket} modules are specializations for TCP sockets (IPv4 and
+    IPv6, respectively).
+    
+    A {!Iom_sock_stream.T} module defines three reactor functions:
+    - [initiator]: create an endpoint by connecting to a remote listener;
+    - [listener]: create endpoints by listening for remote connectors;
+    - [endpoint]: input, output and shutdown on established endpoints.
+    
+    The types of events used with the [initiator] and [listener] reactors are
+    specialized for each type of socket.  The types of events used with the
+    [endpoint] reactor are common to all [SOCK_STREAM] reactors.
+*)
+
+(** {6 Common Types}
+
+    The [endpoint_tx_t] and [endpoint_rx_t] types are defined in common for
+    use with all [endpoint] reactors.
+*)
+
+(** The type of events for sending to [endpoint] reactors. *)
 type endpoint_tx_t =
-    | IO_tx_data of Cf_message.t	(* send data to peer *)
-    | IO_tx_release                 (* initiate orderly release *)
-    | IO_tx_extend of int           (* extend application receive window *)
-    | IO_tx_window of int           (* set application receive window *)
- (* | IO_tx_peek of (Cf_message.t -> int option) peek until extend permitted *)
-    | IO_tx_reset                   (* abortive close *)
-    | IO_tx_unlink                  (* flush output and stop input *)
+    | IO_tx_data of Cf_message.t	(** send data to peer *)
+    | IO_tx_release                 (** initiate orderly release *)
+    | IO_tx_extend of int           (** extend application receive window *)
+    | IO_tx_window of int           (** set application receive window *)
+    | IO_tx_reset                   (** abortive close *)
+    | IO_tx_unlink                  (** flush output and stop input *)
 
+(** The type of events for receiving from [endpoint] reactors. *)
 type endpoint_rx_t =
-    | IO_rx_error of exn            (* socket closed on error condition *)
-    | IO_rx_data of Cf_message.t	(* data received from peer *)
-    | IO_rx_release                 (* peer initiated orderly release *)
-    | IO_rx_blocked                 (* transmit flow control *)
-    | IO_rx_ready                   (* ready for transmit *)
-    | IO_rx_closed                  (* orderly close complete *)
-    | IO_rx_unlinked                (* output flushed and input stopped *)
+    | IO_rx_error of exn            (** socket closed on error condition *)
+    | IO_rx_data of Cf_message.t	(** data received from peer *)
+    | IO_rx_release                 (** peer initiated orderly release *)
+    | IO_rx_blocked                 (** transmit flow control *)
+    | IO_rx_ready                   (** ready for transmit *)
+    | IO_rx_closed                  (** orderly close complete *)
+    | IO_rx_unlinked                (** output flushed and input stopped *)
 
+(** {6 Functorial Interface} *)
+
+(** The signature of modules produced by the [Create] functor below.  Modules
+    of this type define the I/O reactors for a specialization of [SOCK_STREAM]
+    sockets.
+*)
 module type T = sig
+
+    (** The specific socket module used as input to the functor. *)
     module S: Cf_sock_stream.T
     
+    (** The type of connection establishment events is a triple composed of the
+        following three values:
+        - A socket object
+        - The local address of the endpoint
+        - The remote address of the endpoint
+    *)
     type connection_t = S.t * S.address_t * S.address_t
 
+    (** The type of events to receive from the [initiator] reactor. *)
     type initiator_rx_t =
-        | I_connect of connection_t
-        | I_error of exn
+        | I_connect of connection_t     (** Connection established. *)
+        | I_error of exn                (** Error trying to connect. *)
     
+    (** Use [initiator ~c ?cancel ?sock ?src addr k] to start a reactor that
+        will attempt to connect to the address [addr] with the kernel [k] and
+        send its results on the control transmitter [c].  If the [?cancel]
+        receiver is specified, then the connection can be aborted by sending a
+        unit value on the corresponding transmitter.  The reactor uses the
+        socket [?sock] if provided, otherwise it constructs a new socket
+        implicitly.  The socket will be bound to the address [?src] if
+        provided, otherwise the socket is bound to the unspecified address.
+        
+        The reactor terminates after sending an event or after receiving a
+        cancel event.  The socket is closed on termination of the reactor if
+        the socket was implicitly created when the reactors was started and a
+        connection was not successfully established.
+    *)
     val initiator:
         c:initiator_rx_t #Iom_gadget.tx -> ?cancel:unit #Iom_gadget.rx ->
         ?sock:S.t -> ?src:S.address_t -> S.address_t -> Iom_gadget.kernel_t ->
         ('s, unit) Iom_gadget.t
     
+    (** The type of events to send to the [listener] reactor. *)
     type listener_tx_t =
-        | L_extend of int
-        | L_limit of int
-        | L_close
+        | L_limit of int    (** Set the maximum of accepted connections. *)
+        | L_extend of int   (** Extend the maximum of accepted connections. *)
+        | L_close           (** Close the listener socket. *)
     
+    (** The type of events to receive from the [listener] reactor. *)
     type listener_rx_t =
-        | L_bind of S.address_t
-        | L_connect of connection_t
-        | L_error of exn
+        | L_bind of S.address_t         (** Notification of local address. *)
+        | L_connect of connection_t     (** Connection established. *)
+        | L_error of exn                (** Error listening for connections. *)
+    
+    (** Use [listener ~c ?sock ?qlen addr k] to start a reactor that binds to
+        the local address [addr] and uses the kernel [k] to listen for and
+        accept new connections from remote initiators.  The duplex [c] is a
+        coupling of a control receiver and a control transmitter.  The reactor
+        uses the socket [?sock] if provided, otherwise it constructs a new
+        socket implicitly.  The backlog is optionally specified with the
+        [?qlen] parameter.
         
+        The reactor closes the socket and terminates after receiving the
+        [L_close] event or after transmitting the [L_error] event.  An [L_bind]
+        event is always (and only) the first event transmitted by the reactor.
+    *)
     val listener:
         c:(listener_tx_t #Iom_gadget.rx * listener_rx_t #Iom_gadget.tx) ->
         ?sock:S.t -> ?qlen:int -> S.address_t -> Iom_gadget.kernel_t ->
         ('s, unit) Iom_gadget.t
     
+    (** Use [endpoint ~c sock k] to start a reactor that performs raw I/O on
+        the socket [sock] with the kernel [k], communicating on the control
+        duplex [c] with its controller.
+        
+        The reactor terminates after sending an [IO_rx_error], [IO_rx_closed]
+        or [IO_rx_unlinked] event, or after receiving an [IO_tx_reset] event.
+        The socket is closed with the reactors terminates, except when the
+        closing event is [IO_rx_unlinked].
+    *)
     val endpoint:
         c:(endpoint_tx_t #Iom_gadget.rx * endpoint_rx_t #Iom_gadget.tx) -> 
         S.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
 end
 
+(** The functor that produces a module of type [T]. *)
 module Create(S: Cf_sock_stream.T): T with module S = S
 
 (*--- End of File [ iom_sock_stream.mli ] ---*)

File iom/iom_tcp4_socket.mli

View file
  • Ignore whitespace
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Reactor for TCP (with IPv4). *)
+
 include Iom_sock_stream.T with module S = Cf_tcp4_socket
 
 (*--- End of File [ iom_tcp4_socket.mli ] ---*)

File iom/iom_tcp6_socket.mli

View file
  • Ignore whitespace
   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(** Reactor for TCP (with IPv6). *)
+
 include Iom_sock_stream.T with module S = Cf_tcp6_socket
 
 (*--- End of File [ iom_tcp6_socket.mli ] ---*)