Commits

jhwoodyatt  committed 466d314

Checkpoint. Initiating a major refactoring.

  • Participants
  • Parent commits 023a71c

Comments (0)

Files changed (35)

 
 ...Redesigning from scratch.
 
+Central concepts of the BEEP core are the session, channel and exchange
+abstractions.  These are implemented as follows:
+
+SESSION: A driver for controlling one or more channels.  The first channel is
+the management channel, and its profile is defined by the BEEP specification
+and implemented internally.  A session agent is the engine that drives the
+session.  It sends the profile list to advertise.  It receives the profile list
+advertised by the peer.  It sends requests to open and close channels and
+receives the responses/errors.  It receives requests to open and close channels
+and sends the responses/errors.  It directs the session driver to bind and
+unbind channel drivers with channel agents.  The session driver is a virtual
+base class with derived classes to implement transport mappings.
+
+CHANNEL: A driver for sending and receiving messages and responses through a
+session on a particular channel.  The internal management channel is coupled
+tightly to the session driver.  A channel agent is the engine that drives the
+channel.  It may send exchanges and receive their responses.  It may receive
+exchanges and send their responses.  It may do both at the same time.  It
+receives notification of session layer errors.  The channel driver is a virtual
+base class with derived classes to implement transport mappings.
+
+EXCHANGE: A driver for sending and receiving MSG entities coupled with their
+RPY entity, ERR entity, ANS entity stream or NUL responses.  Each of the core
+exchanges defined for the management channel are implemented internally.  An
+exchange agent is the engine that drives the exchange, possibly through one or
+more layers of message processors.  An exchange driver operates at the layer of
+MIME entity stream content of the five BEEP core types, i.e. MSG, RPY, ERR, ANS
+and NUL, coupled tightly with the channel driver.
+
+
++ (Beep_core)
+    Every BEEP message exchange, channel agent and  session agent is a stream
+    engine gadget.
+    
+    Session agents send and receive channel management message exchanges and
+    start/stop channel agents accordingly.  Channel agents send and receive
+    profile-specific message exchanges (the session management channel has a
+    profile defined in the BEEP core).
+    
+    The client message exchange renders a single Mime_stream emitter event on
+    its output jack and ingests a single Beep_core.response_scanner on its
+    input jack.  Channel agents send client message exchanges on their output
+    jacks.
+    
+    The service message exchange ingests a single Mime_stream.entity_scanner
+    on its input jack and renders a single Beep_core.response_emitter on its
+    output jack.  Channel agents receive service message exchanges on their
+    input jacks.
+
 # End of open issues

File beep/Makefile

 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 # OF THE POSSIBILITY OF SUCH DAMAGE. 
 
-REQUIRE = unix cf xml iom mime sasl
+REQUIRE = unix cf xml nx iom mime sasl
 PREDICATES =
 
 ###############################################################################
 
 opt:: $(TEST_OPT_PROGRAMS)
 
-t.% : t/t_%.ml $(TEST_LIBS:%=%.cma)
-	$(OCAMLC) -o $@ $(CMO_OPT) $(DEBUG_OPT) $(TEST_LINKOPT) \
-        $(TEST_LIBS:%=%.cma) $<
+t.% : t/t_%.ml $(TEST_CMA)
+	$(OCAMLC) -o $@ $(CMO_OPT) $(DEBUG_OPT) $(TEST_LINKOPT) $(TEST_CMA) $<
 
-t-opt.% : t/t_%.ml $(TEST_LIBS:%=%.cmxa)
-	$(OCAMLOPT) -o $@ $(CMX_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cmxa) $<
+t-opt.% : t/t_%.ml $(TEST_CMXA)
+	$(OCAMLOPT) -o $@ $(CMX_OPT) $(TEST_LINKOPT) $(TEST_CMXA) $<
 
 test:: $(TEST_PROGRAMS)
 	@for i in $(TEST_PROGRAMS); do echo; echo $$i; ./$$i; done
 
 default:: ocamltop
 
-ocamltop: $(TEST_LIBS:%=%.cma)
-	$(OCAMLMKTOP) -o $@ $(CMO_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cma)
+ocamltop: $(TEST_CMA)
+	$(OCAMLMKTOP) -o $@ $(CMO_OPT) $(TEST_LINKOPT) $(TEST_CMA)
 
 clean::
 	rm -f ocamltop

File beep/beep_channel.ml

-(*---------------------------------------------------------------------------*
-  IMPLEMENTATION  beep_channel.ml
-
-  Copyright (c) 2003-2005, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-open Iom_gadget
-open Cf_cmonad.Op
-
-module M_profile = struct
-    type encoding_t = E_none | E_base64
-
-    type t = {
-        uri: string;
-        encoding: encoding_t;
-        content: string;
-    }
-    
-    let cons ~uri ?(enc = E_none) content = {
-        uri = uri;
-        encoding = enc;
-        content = content;
-    }
-end
-
-module M_start = struct
-    type t = {
-        serverName: string option;
-        profiles: M_profile.t * M_profile.t list;
-    }
-    
-    let cons ?serverName = function
-        | [] -> invalid_arg "Beep_channel.M_start.cons: profile list empty."
-        | hd :: tl -> {
-            serverName = serverName;
-            profiles = hd, tl;
-        }
-end
-
-module type M_signal = sig
-    type t = {
-        code: int;
-        lang: string option;
-        content: string;
-    }
-    
-    val cons: ?loc:#Beep_locale.t -> int -> t
-end
-
-module M_signal = struct
-    type t = {
-        code: int;
-        lang: string option;
-        content: string;
-    }
-    
-    let default_locale_ = new Beep_locale.default []
-    
-    let cons ?loc =
-        let loc =
-            match loc with
-            | None -> default_locale_
-            | Some loc -> (loc :> Beep_locale.t)
-        in
-        fun code -> {
-            code = code;
-            lang = loc#lang;
-            content = loc#content code;
-        }
-end
-
-module M_close = M_signal
-module M_error = M_signal
-
-type control_rx_t =
-    | C_rx_exchange of Beep_exchange.t
-    | C_rx_close of M_close.t
-    | C_rx_error of M_error.t
-    | C_rx_release
-    | C_rx_abort
-
-type control_tx_t =
-    | C_tx_exchange of Beep_exchange.t
-    | C_tx_close of M_close.t
-    | C_tx_error of M_error.t
-    | C_tx_release
-    | C_tx_reset
-
-type control_duplex_t = control_tx_t rx * control_rx_t tx
-
-module Err = Beep_error
-module F = Beep_frame
-module E = Beep_exchange
-
-class profile =
-    object(self)
-        method private control_:
-            's. control_rx_t rx -> control_tx_t tx -> ('s, unit) t =
-            fun ctrlRx ctrlTx ->
-                let rec loop () =
-                    let ctrlRx = (ctrlRx :> control_rx_t rx) in
-                    let ctrlTx = (ctrlTx :> control_tx_t tx) in
-                    guard begin
-                        ctrlRx#get begin function
-                            | C_rx_abort
-                            | C_rx_release ->
-                                Cf_cmonad.return ()
-                            | C_rx_close _ ->
-                                ctrlTx#put C_tx_release
-                            | _ ->
-                                loop ()
-                        end
-                    end
-                in
-                start (loop ()) ()
-        
-        method profile: 's. ('s, control_duplex_t) t =
-            duplex >>= fun ((chRx, chTx), chIO) ->
-            self#control_ chRx chTx >>= fun () ->
-            Cf_cmonad.return chIO
-        
-        method tuning = false
-    end
-
-type client_start_t =
-    | CS_profile of M_profile.t * profile tx
-    | CS_error of M_error.t
-
-class type client =
-    object
-        method connect: 's. M_start.t tx -> client_start_t rx -> ('s, unit) t
-    end
-
-type service_start_t =
-    | SS_profile of M_profile.t * (unit, unit) t option * profile rx
-    | SS_error of M_error.t
-
-class type service =
-    object
-        method accept: 's. M_start.t -> ('s, service_start_t rx option) t
-    end
-
-(*--- End of File [ beep_channel.ml ] ---*)

File beep/beep_channel.mli

-(*---------------------------------------------------------------------------*
-  INTERFACE  beep_channel.mli
-
-  Copyright (c) 2003-2005, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-module M_profile: sig
-    type encoding_t = E_none | E_base64
-
-    type t = {
-        uri: string;
-        encoding: encoding_t;
-        content: string;
-    }
-    
-    val cons: uri:string -> ?enc:encoding_t -> string -> t
-end
-
-module M_start: sig
-    type t = {
-        serverName: string option;
-        profiles: M_profile.t * M_profile.t list;
-    }
-    
-    val cons: ?serverName:string -> M_profile.t list -> t
-end
-
-module type M_signal = sig
-    type t = {
-        code: int;
-        lang: string option;
-        content: string;
-    }
-    
-    val cons: ?loc:#Beep_locale.t -> int -> t
-end
-
-module M_close: M_signal
-module M_error: M_signal
-
-open Iom_gadget
-
-type control_rx_t =
-    | C_rx_exchange of Beep_exchange.t
-    | C_rx_close of M_close.t
-    | C_rx_error of M_error.t
-    | C_rx_release
-    | C_rx_abort
-
-type control_tx_t =
-    | C_tx_exchange of Beep_exchange.t
-    | C_tx_close of M_close.t
-    | C_tx_error of M_error.t
-    | C_tx_release
-    | C_tx_reset
-
-type control_duplex_t = control_tx_t rx * control_rx_t tx
-
-class profile:
-    object
-        method private control_:
-            's. control_rx_t rx -> control_tx_t tx -> ('s, unit) t
-        
-        method profile: 's. ('s, control_duplex_t) t
-        method tuning: bool
-    end
-
-type client_start_t =
-    | CS_profile of M_profile.t * profile tx
-    | CS_error of M_error.t
-
-class type client =
-    object
-        method connect: 's. M_start.t tx -> client_start_t rx -> ('s, unit) t
-    end
-
-type service_start_t =
-    | SS_profile of M_profile.t * (unit, unit) t option * profile rx
-    | SS_error of M_error.t
-
-class type service =
-    object
-        method accept: 's. M_start.t -> ('s, service_start_t rx option) t
-    end
-
-(*--- End of File [ beep_channel.mli ] ---*)

File beep/beep_core.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  beep_core.ml
+
+  Copyright (c) 2003-2007, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+open Cf_cmonad.Op
+
+(**)
+let errj = Cf_journal.stderr
+(**)
+
+module N32_map = Cf_rbtree.Map(Int32)
+module N32_set = Cf_rbtree.Set(Int32)
+
+(*--- standard reply codes ---*)
+let success = 200
+let service_unavailable = 421
+let session_request_declined = 450
+let session_request_failed = 451
+let authenticator_unavailable = 454
+let general_syntax_error = 500
+let parameter_syntax_error = 501
+let not_implemented = 504
+let authentication_required = 530
+let authentication_insufficient = 534
+let authentication_failed = 535
+let user_not_authorized = 537
+let encryption_required = 538
+let channel_request_declined = 550
+let parameter_invalid = 553
+let transaction_error = 554
+
+class localizer langs =
+    object
+        method lang =
+            match langs with
+            | [] -> None
+            | hd :: _ when hd = "en" -> None
+            | _ -> Some "en"
+                
+        method reply_code = function
+            | 421 -> "service not available"
+            | 450 -> "session request declined"
+            | 451 -> "session request failed"
+            | 454 -> "authenticator unavailable"
+            | 500 -> "general syntax error"
+            | 501 -> "general semantic error"
+            | 504 -> "not implemented"
+            | 530 -> "authentication required"
+            | 534 -> "authentication mechanism insufficient"
+            | 535 -> "authentication failed"
+            | 537 -> "user not authorized"
+            | 538 -> "authentication mechanism requires encryption"
+            | 550 -> "channel request declined"
+            | 553 -> "parameter invalid"
+            | 554 -> "transaction integrity error"
+            | x when x < 100 || x > 999 ->
+                 invalid_arg
+    			   "Beep_std_code.localizer#content: code not 3-digit integer"
+            | _ -> ""
+    end
+
+(*--- errors (session termination) ---*)
+type error =
+    | X_frame_bad_syntax
+    | X_frame_too_large
+    | X_frame_out_of_sequence
+    | X_frame_no_channel
+    | X_message_in_progress
+    | X_response_unexpected
+
+let error_to_string = function
+    | X_frame_bad_syntax -> "bad frame syntax"
+    | X_frame_too_large -> "frame too large"
+    | X_frame_out_of_sequence -> "frame out of sequence"
+    | X_frame_no_channel -> "no channel for frame"
+    | X_message_in_progress -> "message in progress"
+    | X_response_unexpected -> "response unexpected"
+
+exception Error of error
+
+(*--- frame format and parsing ---*)
+type header = { channo: int32; msgno: int32; seqno: int32 }
+    
+type msg_frame = [ `MSG of header * Iom_stream.more * Cf_message.t ]
+type rpy_frame = [ `RPY of header * Iom_stream.more * Cf_message.t ]
+type err_frame = [ `ERR of header * Iom_stream.more * Cf_message.t ]
+type ans_frame = [ `ANS of int32 * header * Iom_stream.more * Cf_message.t ]
+type nul_frame = [ `NUL of header ]
+
+type response_frame = [ rpy_frame | err_frame | ans_frame | nul_frame ]
+
+type frame = [ msg_frame | response_frame ]
+
+let parse_frame =
+    let module K = struct type t = MSG | RPY | ERR | ANS | NUL end in
+    let ef_ _ x =
+    	raise begin
+            match x with
+            | End_of_file -> x
+            | _ -> Error X_frame_bad_syntax
+        end
+    in
+    let keyword_ s =
+    	Scanf.kscanf s ef_ "%3[A-Z]" begin function
+    		| "MSG" -> K.MSG
+    		| "RPY" -> K.RPY
+    		| "ERR" -> K.ERR
+    		| "ANS" -> K.ANS
+    		| "NUL" -> K.NUL
+    		| _ -> raise (Error X_frame_bad_syntax)
+    	end
+    in
+    let uint31_ n =
+    	if Int32.compare n 0l < 0 then
+    		raise (Error X_frame_bad_syntax)
+    in
+    let more_ v =
+    	match String.unsafe_get v 0 with
+    	| '.' -> Iom_stream.Last
+    	| _ -> Iom_stream.More
+    in
+    let int32_of_maxint_ = Int32.of_int max_int in
+    let size_ ?limit v =
+    	uint31_ v;
+    	match limit with
+    	| Some limit when Int32.compare v limit > 0 ->
+    		raise (Error X_frame_too_large)
+    	| _ when Int32.compare v int32_of_maxint_ > 0 ->
+    		raise (Error X_frame_too_large)
+    	| _ ->
+    		Int32.to_int v
+    in
+    let common_ ?limit s =
+    	Scanf.kscanf s ef_ " %lu %lu %[.*] %lu %lu"
+    	begin fun a b c d e ->
+    		uint31_ a;
+    		uint31_ b;
+    		let c = more_ c in
+    		let e = size_ ?limit e in
+    		{ channo = a; msgno = b; seqno = d }, c, e
+    	end
+    in
+    let crlf_ s = Scanf.kscanf s ef_ "\r\n%n" (fun x -> x) in
+    let payload_ more m hlen size =
+    	let mlen = Cf_message.length m in
+    	if mlen < hlen + size + 5 then raise End_of_file;
+    	let m = Cf_message.shift ~pos:hlen m in
+    	let data, m = Cf_message.split ~pos:size m in
+    	let trailer, m = Cf_message.split ~pos:5 m in
+    	if Cf_message.contents trailer <> "END\r\n" then
+    		raise (Error X_frame_bad_syntax);
+    	more, data, m
+    in
+    let single_ ?limit s m =
+    	let header, more, size = common_ ?limit s in
+    	let hlen = crlf_ s in
+    	let more, data, m = payload_ more m hlen size in
+    	(header, more, data), m
+    in
+    fun ?limit m ->
+    	let s = Scanf.Scanning.from_function (Cf_message.to_function m) in
+    	match keyword_ s with
+    	| K.MSG ->
+    		let hd, tl = single_ ?limit s m in (`MSG hd), tl
+    	| K.RPY ->
+    		let hd, tl = single_ ?limit s m in (`RPY hd), tl
+    	| K.ERR ->
+    		let hd, tl = single_ ?limit s m in (`ERR hd), tl
+    	| K.ANS ->
+    		let header, more, size = common_ ?limit s in
+    		let ansno = Scanf.kscanf s ef_ " %lu" (fun x -> uint31_ x; x) in
+    		let hlen = crlf_ s in
+    		let more, data, m = payload_ more m hlen size in
+    		`ANS (ansno, header, more, data), m
+    	| K.NUL ->
+    		Scanf.kscanf s ef_ " %lu %lu . %lu 0\r\nEND\r\n%n"
+    		begin fun a b c n ->
+    			uint31_ a;
+    			uint31_ b;
+    			let header = { channo = a; msgno = b; seqno = c } in
+    			let m = Cf_message.shift ~pos:n m in
+    			(`NUL header), m
+    		end
+
+let emit_frame =
+    let trailer_ = [ "END\r\n", 0, 5 ] in
+    let common_ b header more size =
+    	let more = match more with Iom_stream.More -> '*' | _ -> '.' in
+    	Printf.bprintf b " %lu %lu %c %lu %u" header.channo header.msgno more
+    		header.seqno size
+    in
+    let payload_ b m =
+    	Buffer.add_string b "\r\n";
+    	let hlen = Buffer.length b in
+    	(Buffer.contents b, 0, hlen) :: (m @ trailer_)
+    in
+    let single_ keyword header more m =
+    	let b = Buffer.create 40 in
+    	Buffer.add_string b keyword;
+    	let size = Cf_message.length m in
+    	common_ b header more size;
+    	payload_ b m
+    in
+    function
+    | `MSG (header, more, data) ->
+    	single_ "MSG" header more data
+    | `RPY (header, more, data) ->
+    	single_ "RPY" header more data
+    | `ERR (header, more, data) ->
+    	single_ "ERR" header more data
+    | `ANS (ansno, header, more, data) ->
+    	let b = Buffer.create 40 in
+    	Buffer.add_string b "ANS";
+    	let size = Cf_message.length data in
+    	common_ b header more size;
+    	Printf.bprintf b " %lu" ansno;
+    	payload_ b data
+    | `NUL header ->
+    	let b = Buffer.create 40 in
+    	Buffer.add_string b "NUL";
+    	Printf.bprintf b " %lu %lu . %lu 0\r\nEND\r\n" header.channo
+    		header.msgno header.seqno;
+    	let length = Buffer.length b in
+    	[ Buffer.contents b, 0, length ]
+
+(*--- entity I/O ---*)
+let default_mime_ct = {
+    Mime_entity.ct_type = Mime_entity.CT_application;
+    ct_subtype = "beep+xml";
+    ct_parameters = Mime_atom.Map.of_list [ "charset", "utf8" ];
+}
+
+let default_mime_cte = Mime_entity.CTE_binary
+
+let default_mime_emit_adapter = lazy begin
+    new Mime_entity.basic_emit_adapter
+        ~ct:default_mime_ct ~cte:default_mime_cte []
+end
+
+let default_mime_headers = lazy begin
+    let a = Lazy.force default_mime_emit_adapter in
+    a#emit
+end
+
+class eprofile limits = object
+    val limits_: Iom_octet_stream.limits = limits
+    method limits = limits_
+end
+
+type eijack = {
+    eij_flowTx: Iom_stream.readywait Iom_gadget.tx;
+    eij_eventRx: Mime_stream.entity_scan_event Iom_gadget.rx;
+}
+
+type eiplug = {
+    eip_flowRx: Iom_stream.readywait Iom_gadget.rx;
+    eip_eventTx: Mime_stream.entity_scan_event Iom_gadget.tx;
+    eip_limits: Iom_octet_stream.limits;
+}
+
+type eis_variant =
+    | S_ei_body of Iom_octet_stream.fragment Iom_gadget.tx
+    | S_ei_header
+    | S_ei_failed
+
+type eistate = {
+    eis_ready: bool;
+    eis_more: Iom_stream.more;
+    eis_buffer: Cf_message.substring Cf_deque.t;
+    eis_length: int;
+    eis_variant: eis_variant;
+}
+
+let ei_state0 = {
+    eis_ready = false;
+    eis_more = Iom_stream.More;
+    eis_buffer = Cf_deque.nil;
+    eis_length = 0;
+    eis_variant = S_ei_header;
+}
+
+let ei_harness limits =
+    Iom_gadget.duplex >>= fun ((flowRx, eventTx), (eventRx, flowTx)) ->
+    let eip = {
+        eip_flowRx = flowRx;
+        eip_eventTx = eventTx;
+        eip_limits = limits;
+    } in
+    let eij = {
+        eij_flowTx = flowTx;
+        eij_eventRx = eventRx;
+    } in
+    Cf_cmonad.return (eip, eij)
+
+let ei_failed eis failedTx x =
+    let failedTx = (failedTx :> Iom_stream.failed Iom_gadget.tx) in
+    failedTx#put (`Failed x) >>= fun () ->
+    Cf_cmonad.return (0, Some { eis with eis_variant = S_ei_failed })
+
+let ei_push eis more buffer length =
+    let eis = {
+        eis with
+        eis_more = more;
+        eis_buffer = buffer;
+        eis_length = length;
+    } in
+    Cf_cmonad.return (0, Some eis)
+
+let ei_drain eis fragmentTx more buffer =
+    let data = Cf_message.drain buffer in
+    let length = Cf_message.length data in
+    let fragment = new Iom_octet_stream.fragment more data in
+    fragmentTx#put fragment >>= fun () ->
+    let eisopt =
+        match more with
+        | Iom_stream.Last ->
+            None
+        | Iom_stream.More ->
+            Some {
+                eis with
+                eis_ready = true;
+                eis_buffer = Cf_deque.nil;
+                eis_length = 0;
+            }
+    in
+    Cf_cmonad.return (length, eisopt)
+
+let ei_body eis eip fragmentTx more buffer length =
+    if not eis.eis_ready || length < eip.eip_limits.Iom_octet_stream.low then
+        ei_push eis more buffer length
+    else
+        ei_drain eis fragmentTx more buffer
+
+let ei_headers eis eip failedTx more buffer length =
+    let data = Cf_message.drain buffer in
+    match
+        try
+            let headers, body = Mime_lex.split_into_fields data in
+            let scanner = new Mime_entity.basic_scan_adapter headers in
+            scanner#force;
+            Cf_exnopt.U (scanner, body)
+        with
+        | x ->
+            Cf_exnopt.X x
+    with
+    | Cf_exnopt.U (scanner, body) ->
+        Iom_gadget.simplex >>= fun (fragmentRx, fragmentTx) ->
+        eip.eip_eventTx#put (scanner, fragmentRx) >>= fun () ->
+        let length = Cf_message.length body in
+        let buffer = Cf_message.push body Cf_deque.nil in
+        ei_body eis eip fragmentTx more buffer length
+    | Cf_exnopt.X End_of_file ->
+        if more = Iom_stream.Last then begin
+            let x = Mime_stream.Scan_error Mime_stream.X_header_incomplete in
+            ei_failed eis failedTx x
+        end
+        else if length < eip.eip_limits.Iom_octet_stream.high then begin
+            let x = Mime_stream.Scan_error Mime_stream.X_header_too_long in
+            ei_failed eis failedTx x
+        end
+        else
+            ei_push eis more buffer length
+    | Cf_exnopt.X x ->
+        ei_failed eis failedTx x
+
+let ei_frame eis eip failedTx more data =
+    let length = eis.eis_length + (Cf_message.length data) in
+    match eis.eis_variant with
+    | S_ei_failed ->
+        let eisopt = match more with Iom_stream.More -> Some eis | _ -> None in
+        Cf_cmonad.return (length, eisopt)
+    | _ ->
+        let buffer = Cf_message.push data eis.eis_buffer in
+        match eis.eis_variant with
+        | S_ei_body fragmentTx when eis.eis_ready ->
+            ei_body eis eip fragmentTx more buffer length
+        | S_ei_header when eis.eis_ready ->
+            ei_headers eis eip failedTx more buffer length
+        | _ ->
+            ei_push eis more buffer length
+
+let ei_flow_guard eis eip cont =
+    eip.eip_flowRx#get begin function
+        | `Wait when eis.eis_ready ->
+            cont (0, Some { eis with eis_ready = false })
+        | `Ready when not eis.eis_ready ->
+            begin match eis.eis_variant with
+            | S_ei_body fragmentTx when eis.eis_length > 0 ->
+                ei_drain eis fragmentTx eis.eis_more eis.eis_buffer >>= cont
+            | _ ->
+                cont (0, Some { eis with eis_ready = true })
+            end
+        | `Ready
+        | `Wait ->
+            cont (0, Some eis)
+    end
+
+type eojack = {
+    eoj_flowRx: Iom_stream.readywait Iom_gadget.rx;
+    eoj_eventTx: Mime_stream.entity_emit_event Iom_gadget.tx;
+}
+
+type eoplug = {
+    eop_flowTx: Iom_stream.readywait Iom_gadget.tx;
+    eop_eventRx: Mime_stream.entity_emit_event Iom_gadget.rx;
+    eop_limits: Iom_octet_stream.limits;
+}
+
+type eos_variant =
+    | S_eo_body of Iom_octet_stream.fragment Iom_gadget.rx
+    | S_eo_header
+
+type eostate = {
+    eos_ready: bool;
+    eos_more: Iom_stream.more;
+    eos_buffer: Cf_message.substring Cf_deque.t;
+    eos_length: int;
+    eos_variant: eos_variant;
+}
+
+let eo_state0 = {
+    eos_ready = false;
+    eos_more = Iom_stream.More;
+    eos_buffer = Cf_deque.nil;
+    eos_length = 0;
+    eos_variant = S_eo_header;
+}
+
+let eo_harness limits =
+    Iom_gadget.duplex >>= fun ((eventRx, flowTx), (flowRx, eventTx)) ->
+    let eop = {
+        eop_flowTx = flowTx;
+        eop_eventRx = eventRx;
+        eop_limits = limits;
+    } in
+    let eoj = {
+        eoj_flowRx = flowRx;
+        eoj_eventTx = eventTx;
+    } in
+    Cf_cmonad.return (eop, eoj)
+
+let eo_work eos eop window more buffer length =
+    match more with
+    | _ when window < eop.eop_limits.Iom_octet_stream.low && eos.eos_ready ->
+        let eos = {
+            eos with
+            eos_ready = false;
+            eos_more = more;
+            eos_buffer = buffer;
+            eos_length = length;
+        } in
+        eop.eop_flowTx#put `Wait >>= fun () ->
+        Cf_cmonad.return (Some eos, None)
+    | _ when window < eop.eop_limits.Iom_octet_stream.low ->
+        let eos = {
+            eos with
+            eos_buffer = buffer;
+            eos_length = length;
+        } in
+        Cf_cmonad.return (Some eos, None)
+    | Iom_stream.Last when length > window ->
+        let payload, buffer = Cf_message.pop ~len:window buffer in
+        let length = length - window in
+        let eos = {
+            eos with
+            eos_ready = false;
+            eos_more = more;
+            eos_buffer = buffer;
+            eos_length = length;
+        } in
+        Cf_cmonad.return (Some eos, Some (window, Iom_stream.More, payload))
+    | Iom_stream.Last ->
+        Cf_cmonad.return (None, Some (length, more, Cf_message.drain buffer))
+    | Iom_stream.More ->
+        let limit =
+            if window > eop.eop_limits.Iom_octet_stream.high then
+                eop.eop_limits.Iom_octet_stream.high
+            else
+                window
+        in
+        if length > limit then begin
+            let payload, buffer = Cf_message.pop ~len:window buffer in
+            let length = length - limit in
+            let eos = {
+                eos with
+                eos_ready = false;
+                eos_more = more;
+                eos_buffer = buffer;
+                eos_length = length;
+            } in
+            let rv = (Some eos, Some (limit, Iom_stream.More, payload)) in
+            eop.eop_flowTx#put `Wait >>= fun () ->
+            Cf_cmonad.return rv
+        end
+        else begin
+            let payload = Cf_message.drain buffer in
+            let length = Cf_message.length payload in
+            let waiting = not eos.eos_ready in
+            let eos = {
+                eos with
+                eos_ready = true;
+                eos_buffer = Cf_deque.nil;
+                eos_length = 0;
+            } in
+            let result = Some eos, Some (length, more, payload) in
+            if waiting then begin
+                eop.eop_flowTx#put `Ready >>= fun () ->
+                Cf_cmonad.return result
+            end
+            else
+                Cf_cmonad.return result
+        end
+
+let eo_ready eos eop window =
+    eo_work eos eop window eos.eos_more eos.eos_buffer eos.eos_length
+
+let eo_event_guard eos eop window cont =
+    match eos.eos_variant with
+    | S_eo_header ->
+        eop.eop_eventRx#get begin fun (headers, fragmentRx) ->
+            let data = headers#emit in
+            let buffer = Cf_message.push data eos.eos_buffer in
+            let length = eos.eos_length + Cf_message.length data in
+            assert (eos.eos_more == Iom_stream.More);
+            let eos = {
+                eos with
+                eos_ready = true;
+                eos_buffer = buffer;
+                eos_length = length;
+                eos_variant = S_eo_body fragmentRx;
+            } in
+            let v = Some eos, None in
+            if window < length then
+                cont v
+            else begin
+                eop.eop_flowTx#put `Ready >>= fun () ->
+                cont v
+            end
+        end
+    | S_eo_body fragmentRx ->
+        fragmentRx#get begin fun fragment ->
+            let data = fragment#data in
+            let buffer = Cf_message.push data eos.eos_buffer in
+            let length = eos.eos_length + Cf_message.length data in
+            eo_work eos eop window fragment#more buffer length >>= cont
+        end
+
+(*--- exchange I/O ---*)
+type sxrsp =
+    | M_srv_rpy of eoplug
+    | M_srv_err of eoplug
+    | M_srv_ans of int32 * eoplug
+    | M_srv_nul
+
+type sxplug = {
+    sxp_msgno: int32;
+    sxp_req: eiplug;
+    sxp_rspRx: sxrsp Iom_gadget.rx;
+    sxp_failedTx: Iom_stream.failed Iom_gadget.tx;
+}
+
+type sxjack = {
+    sxj_msgno: int32;
+    sxj_req: eijack;
+    sxj_rspTx: sxrsp Iom_gadget.tx;
+    sxj_failedRx: Iom_stream.failed Iom_gadget.rx;
+}
+
+let sx_harness msgno limits =
+    Iom_gadget.duplex >>= fun ((rspRx, failedTx), (failedRx, rspTx)) ->
+    ei_harness limits >>= fun (eip, eij) ->
+    let sxp = {
+        sxp_msgno = msgno;
+        sxp_req = eip;
+        sxp_rspRx = rspRx;
+        sxp_failedTx = failedTx;
+    } in
+    let sxj = {
+        sxj_msgno = msgno;
+        sxj_req = eij;
+        sxj_rspTx = rspTx;
+        sxj_failedRx = failedRx;
+    } in
+    Cf_cmonad.return (sxp, sxj)
+
+let sx_msgno_check sxp header = (Int32.compare header.msgno sxp.sxp_msgno = 0)
+
+type cxrsp =
+    | M_clt_rpy of eijack
+    | M_clt_err of eijack
+    | M_clt_ans of int32 * eijack
+    | M_clt_nul
+
+type cxplug = {
+    cxp_msgno: int32;
+    cxp_req: eoplug;
+    cxp_rspTx: cxrsp Iom_gadget.tx;
+    cxp_failedTx: Iom_stream.failed Iom_gadget.tx;
+}
+    
+type cxjack = {
+    cxj_msgno: int32;
+    cxj_req: eojack;
+    cxj_rspRx: cxrsp Iom_gadget.rx;
+    cxj_failedRx: Iom_stream.failed Iom_gadget.rx;
+}
+
+let cx_harness msgno limits =
+    Iom_gadget.duplex >>= fun ((failedRx, rspTx), (rspRx, failedTx)) ->
+    eo_harness limits >>= fun (eop, eoj) ->
+    let cxp = {
+        cxp_msgno = msgno;
+        cxp_req = eop;
+        cxp_rspTx = rspTx;
+        cxp_failedTx = failedTx;
+    } in
+    let cxj = {
+        cxj_msgno = msgno;
+        cxj_req = eoj;
+        cxj_rspRx = rspRx;
+        cxj_failedRx = failedRx;
+    } in
+    Cf_cmonad.return (cxp, cxj)
+
+let cx_msgno_check cxp header = (Int32.compare header.msgno cxp.cxp_msgno = 0)
+
+type mrxvariant =
+    | S_mrx_clt of cxplug
+    | S_mrx_msg of sxplug * eistate
+    | S_mrx_rpy of cxplug * eiplug * eistate
+    | S_mrx_err of cxplug * eiplug * eistate
+    | S_mrx_ans of cxplug * (eiplug * eistate) N32_map.t
+
+let mrx_failed x = function
+    | S_mrx_clt cxp
+    | S_mrx_rpy (cxp, _, _)
+    | S_mrx_err (cxp, _, _)
+    | S_mrx_ans (cxp, _) ->
+        cxp.cxp_failedTx#put x
+    | S_mrx_msg (sxp, _) ->
+        sxp.sxp_failedTx#put x
+
+type mtxvariant =
+    | S_mtx_srv of sxplug
+    | S_mtx_msg of cxplug * eostate
+    | S_mtx_rpy of sxplug * eoplug * eostate
+    | S_mtx_err of sxplug * eoplug * eostate
+    | S_mtx_ans of
+        sxplug * (eoplug * eostate) N32_map.t * (int32 * eoplug) Cf_deque.t
+
+let mtx_failed x = function
+    | S_mtx_srv sxp
+    | S_mtx_rpy (sxp, _, _)
+    | S_mtx_err (sxp, _, _)
+    | S_mtx_ans (sxp, _, _) ->
+        sxp.sxp_failedTx#put x
+    | S_mtx_msg (cxp, _) ->
+        cxp.cxp_failedTx#put x
+
+(*--- channel I/O ---*)
+class ['msg, 'rpy, 'err, 'ans] cprofile msg rpy err ans = object
+    constraint 'msg = #eprofile
+    constraint 'rpy = #eprofile
+    constraint 'err = #eprofile
+    constraint 'ans = #eprofile
+    
+    val msg_: 'msg = msg
+    val rpy_: 'rpy = rpy
+    val err_: 'err = err
+    val ans_: 'ans = ans
+    
+    method msg = msg_
+    method rpy = rpy_
+    method err = err_
+    method ans = ans_
+end
+
+class type cprofile_basic = [eprofile, eprofile, eprofile, eprofile] cprofile
+
+type cnotify = Iom_stream.failed
+
+type cplug = {
+    cp_channo: int32;
+    cp_profile: cprofile_basic;
+    cp_notifyTx: cnotify Iom_gadget.tx;
+    cp_exchangeRx: cxplug Iom_gadget.rx;
+    cp_exchangeTx: sxjack Iom_gadget.tx;
+}
+
+type cjack = {
+    cj_notifyRx: cnotify Iom_gadget.rx;
+    cj_exchangeRx: sxjack Iom_gadget.rx;
+    cj_exchangeTx: cxplug Iom_gadget.tx;
+}
+
+let c_harness channo profile =
+    let profile = (profile :> cprofile_basic) in
+    Iom_gadget.duplex >>= fun ((upRx, dnTx), (dnRx, upTx)) ->
+    Iom_gadget.simplex >>= fun (notifyRx, notifyTx) ->
+    let cp = {
+        cp_channo = channo;
+        cp_profile = profile;
+        cp_notifyTx = notifyTx;
+        cp_exchangeRx = dnRx;
+        cp_exchangeTx = upTx;
+    } in
+    let cj = {
+        cj_notifyRx = notifyRx;
+        cj_exchangeRx = upRx;
+        cj_exchangeTx = dnTx;
+    } in
+    Cf_cmonad.return (cp, cj)
+
+type cstate = {
+    cs_rxseqno: int32;
+    cs_rxvq: mrxvariant Cf_deque.t;
+    cs_rxdq: (eiplug * eistate) Cf_deque.t;
+    cs_txseqno: int32;
+    cs_txvq: mtxvariant Cf_deque.t;
+    cs_closing: bool;
+}
+
+let c_failed cs cp error =
+    let x = Error error in
+    let fx = `Failed x in
+    cp.cp_notifyTx#put fx >>= fun () ->
+    let z = Cf_deque.B.to_seq cs.cs_rxvq in
+    let z = Cf_seq.map (mrx_failed fx) z in
+    Cf_seq.C.sequence z >>= fun () ->
+    let z = Cf_deque.B.to_seq cs.cs_txvq in
+    let z = Cf_seq.map (mtx_failed fx) z in
+    Cf_seq.C.sequence z >>= fun () ->
+    Cf_cmonad.return (Cf_exnopt.X x)
+
+let c_rxframe_header cs cp header =
+    if Int32.compare cp.cp_channo header.channo <> 0 then
+        Some X_frame_no_channel
+    else if Int32.compare cs.cs_rxseqno header.seqno <> 0 then
+        Some X_frame_out_of_sequence
+    else
+        None
+
+let c_rxframe_aux ?ansv vcons cs eis eip failedTx rxvq txvq seqno more data =
+    ei_frame eis eip failedTx more data >>= fun (ack, eisopt) ->
+    let rxvq, rxdq =
+        match eisopt, ansv with
+        | None, None ->
+            rxvq, cs.cs_rxdq
+        | None, Some (ansno, cxp, am) ->
+            Cf_deque.B.push (S_mrx_ans (cxp, N32_map.delete ansno am)) rxvq,
+            cs.cs_rxdq
+        | Some { eis_more = Iom_stream.Last }, None ->
+            rxvq, Cf_deque.A.push (eip, eis) cs.cs_rxdq
+        | Some { eis_more = Iom_stream.Last }, Some (ansno, cxp, am) ->
+            Cf_deque.B.push (S_mrx_ans (cxp, N32_map.delete ansno am)) rxvq,
+            Cf_deque.A.push (eip, eis) cs.cs_rxdq
+        | Some eis, _ ->
+            Cf_deque.B.push (vcons eis) rxvq, cs.cs_rxdq
+    in
+    let cs = {
+        cs with
+        cs_rxseqno = seqno;
+        cs_rxvq = rxvq;
+        cs_rxdq = rxdq;
+        cs_txvq = txvq;
+    } in
+    Cf_cmonad.return (Cf_exnopt.U (ack, cs))
+
+let c_seqno_adjust cs data =
+    Int32.add cs.cs_rxseqno (Int32.of_int (Cf_message.length data))
+
+let c_mrx_variant_cons_msg sxp eis = S_mrx_msg (sxp, eis)
+
+let c_rxframe_msg cs cp header more data =
+    match c_rxframe_header cs cp header with
+    | Some error ->
+        c_failed cs cp error
+    | None ->
+        match Cf_deque.B.pop cs.cs_rxvq with
+        | None
+        | Some (S_mrx_clt _, _) ->
+            if Cf_deque.predicate begin function
+                | S_mtx_msg _ ->
+                    true
+                | S_mtx_srv sxp
+                | S_mtx_rpy (sxp, _, _)
+                | S_mtx_err (sxp, _, _)
+                | S_mtx_ans (sxp, _, _) ->
+                    sx_msgno_check sxp header
+            end cs.cs_txvq
+            then begin
+                let limits = cp.cp_profile#msg#limits in
+                sx_harness header.msgno limits >>= fun (sxp, sxj) ->
+                cp.cp_exchangeTx#put sxj >>= fun () ->
+                let seqno = c_seqno_adjust cs data in
+                let txvq = Cf_deque.A.push (S_mtx_srv sxp) cs.cs_txvq in
+                let vcons = c_mrx_variant_cons_msg sxp in
+                c_rxframe_aux vcons cs ei_state0 sxp.sxp_req sxp.sxp_failedTx
+                    cs.cs_rxvq txvq seqno more data
+            end
+            else
+                c_failed cs cp X_message_in_progress
+        | Some (S_mrx_msg (sxp, eis), rxvq)
+          when sx_msgno_check sxp header -> begin
+            let seqno = c_seqno_adjust cs data in
+            let vcons = c_mrx_variant_cons_msg sxp in
+            c_rxframe_aux vcons cs eis sxp.sxp_req sxp.sxp_failedTx rxvq
+                cs.cs_txvq seqno more data
+          end
+        | _ ->
+            c_failed cs cp X_message_in_progress
+
+type crxrsp1 =
+    | F_rsp1_rpy of
+        (cxplug -> eiplug -> eistate -> mrxvariant) *
+        (cprofile_basic -> eprofile) *
+        (eijack -> cxrsp)
+    | F_rsp1_err of
+        (cxplug -> eiplug -> eistate -> mrxvariant) *
+        (cprofile_basic -> eprofile) *
+        (eijack -> cxrsp)
+
+let crxrsp1_rpy =
+    let variant cxp eip eis = S_mrx_rpy (cxp, eip, eis) in
+    let profile p = p#rpy in
+    let message eij = M_clt_err eij in
+    F_rsp1_rpy (variant, profile, message)
+
+let crxrsp1_err =
+    let variant cxp eip eis = S_mrx_err (cxp, eip, eis) in
+    let profile p = p#err in
+    let message eij = M_clt_err eij in
+    F_rsp1_err (variant, profile, message)
+
+let c_rxframe_rsp1 rspv cs cp header more data =
+    match c_rxframe_header cs cp header with
+    | Some error ->
+        c_failed cs cp error
+    | None ->
+        match rspv, Cf_deque.B.pop cs.cs_rxvq with
+        | F_rsp1_rpy (vcons, eprf, mcons), Some (S_mrx_clt cxp, rxvq)
+        | F_rsp1_err (vcons, eprf, mcons), Some (S_mrx_clt cxp, rxvq)
+          when cx_msgno_check cxp header -> begin
+            let eprf = eprf cp.cp_profile in
+            ei_harness eprf#limits >>= fun (eip, eij) ->
+            cxp.cxp_rspTx#put (mcons eij) >>= fun () ->
+            let seqno = c_seqno_adjust cs data in
+            let vcons = vcons cxp eip in
+            c_rxframe_aux vcons cs ei_state0 eip cxp.cxp_failedTx rxvq
+                cs.cs_txvq seqno more data
+          end
+        | F_rsp1_rpy (vcons, _, _), Some (S_mrx_rpy (cxp, eip, eis), rxvq)
+        | F_rsp1_err (vcons, _, _), Some (S_mrx_err (cxp, eip, eis), rxvq)
+          when cx_msgno_check cxp header -> begin
+            let seqno = c_seqno_adjust cs data in
+            let vcons = vcons cxp eip in
+            c_rxframe_aux vcons cs eis eip cxp.cxp_failedTx rxvq cs.cs_txvq
+                seqno more data
+          end
+        | _ ->
+            c_failed cs cp X_response_unexpected
+
+let c_mrx_variant_cons_ans cxp ansno am eip eis =
+    S_mrx_ans (cxp, N32_map.replace (ansno, (eip, eis)) am)
+
+let c_rxframe_ans cs cp ansno header more data =
+    match c_rxframe_header cs cp header with
+    | Some error ->
+        c_failed cs cp error
+    | None ->
+        match Cf_deque.B.pop cs.cs_rxvq with
+        | Some (S_mrx_clt cxp, rxvq) when cx_msgno_check cxp header -> begin
+            ei_harness cp.cp_profile#ans#limits >>= fun (eip, eij) ->
+            cxp.cxp_rspTx#put (M_clt_ans (ansno, eij)) >>= fun () ->
+            let seqno = c_seqno_adjust cs data in
+            let ansv = ansno, cxp, N32_map.nil in
+            let vcons = c_mrx_variant_cons_ans cxp ansno N32_map.nil eip in
+            c_rxframe_aux ~ansv vcons cs ei_state0 eip cxp.cxp_failedTx rxvq
+                cs.cs_txvq seqno more data
+          end
+        | Some (S_mrx_ans (cxp, am), rxvq)
+          when cx_msgno_check cxp header -> begin
+            let seqno = c_seqno_adjust cs data in
+            match
+                try Cf_exnopt.U (N32_map.search ansno am)
+                with x -> Cf_exnopt.X x
+            with
+            | Cf_exnopt.U (eip, eis) ->
+                let ansv = ansno, cxp, am in
+                let vcons = c_mrx_variant_cons_ans cxp ansno am eip in
+                c_rxframe_aux ~ansv vcons cs eis eip cxp.cxp_failedTx rxvq
+                    cs.cs_txvq seqno more data
+            | Cf_exnopt.X Not_found ->
+                ei_harness cp.cp_profile#ans#limits >>= fun (eip, eij) ->
+                cxp.cxp_rspTx#put (M_clt_ans (ansno, eij)) >>= fun () ->
+                let ansv = ansno, cxp, am in
+                let vcons = c_mrx_variant_cons_ans cxp ansno am eip in
+                c_rxframe_aux ~ansv vcons cs ei_state0 eip cxp.cxp_failedTx
+                    rxvq cs.cs_txvq seqno more data
+            | Cf_exnopt.X _ as v ->
+                assert (not true);
+                Cf_cmonad.return v
+          end
+        | _ ->
+            c_failed cs cp X_response_unexpected
+
+let c_rxframe_nul_aux cs cp cxp rxvq am =
+    if N32_map.empty am then begin
+        cxp.cxp_rspTx#put M_clt_nul >>= fun () ->
+        Cf_cmonad.return (Cf_exnopt.U (0, { cs with cs_rxvq = rxvq }))
+    end
+    else
+        c_failed cs cp X_message_in_progress
+
+let c_rxframe_nul cs cp header =
+    match c_rxframe_header cs cp header with
+    | Some error ->
+        c_failed cs cp error
+    | None ->
+        match Cf_deque.B.pop cs.cs_rxvq with
+        | Some (S_mrx_clt cxp, rxvq) when cx_msgno_check cxp header ->
+            c_rxframe_nul_aux cs cp cxp rxvq N32_map.nil
+        | Some (S_mrx_ans (cxp, am), rxvq) when cx_msgno_check cxp header ->
+            c_rxframe_nul_aux cs cp cxp rxvq am
+        | _ ->
+            c_failed cs cp X_response_unexpected
+
+let c_rxframe cs cp = function
+    | `MSG (header, more, data) ->
+        c_rxframe_msg cs cp header more data
+    | `RPY (header, more, data) ->
+        c_rxframe_rsp1 crxrsp1_rpy cs cp header more data
+    | `ERR (header, more, data) ->
+        c_rxframe_rsp1 crxrsp1_err cs cp header more data
+    | `ANS (ansno, header, more, data) ->
+        c_rxframe_ans cs cp ansno header more data
+    | (`NUL header : [< frame ]) ->
+        c_rxframe_nul cs cp header
+
+let c_rxflow_drain_guard cs eis eip rxdq cont =
+    ei_flow_guard eis eip begin fun (ack, eisopt) ->
+        let rxdq =
+            match eisopt with
+            | None ->
+                rxdq
+            | Some eis ->
+                assert (eis.eis_more == Iom_stream.Last);
+                Cf_deque.B.push (eip, eis) rxdq
+        in
+        cont (ack, { cs with cs_rxdq = rxdq })
+    end
+
+let c_rxflow_msg_guard cs eis sxp rxvq cont =
+    ei_flow_guard eis sxp.sxp_req begin fun (ack, eisopt) ->
+        let rxvq =
+            match eisopt with
+            | None ->
+                rxvq
+            | Some eis ->
+                assert (eis.eis_more == Iom_stream.More);
+                Cf_deque.B.push (S_mrx_msg (sxp, eis)) rxvq
+        in
+        cont (ack, { cs with cs_rxvq = rxvq })
+    end
+
+let c_rxflow_rsp1_guard frsp cs eis cxp eip rxvq cont =
+    ei_flow_guard eis eip begin fun (ack, eisopt) ->
+        let rxvq =
+            match eisopt with
+            | None ->
+                rxvq
+            | Some eis ->
+                assert (eis.eis_more == Iom_stream.More);
+                let vcons =
+                    match frsp with
+                    | F_rsp1_rpy (vcons, _, _)
+                    | F_rsp1_err (vcons, _, _) ->
+                        vcons
+                in
+                Cf_deque.B.push (vcons cxp eip eis) rxvq
+        in
+        cont (ack, { cs with cs_rxvq = rxvq })
+    end
+
+let c_rxflow_ans1_guard cs cxp am rxvq cont (ansno, (eip, eis)) =
+    ei_flow_guard eis eip begin fun (ack, eisopt) ->
+        let am =
+            match eisopt with
+            | None ->
+                N32_map.delete ansno am
+            | Some eis ->
+                assert (eis.eis_more == Iom_stream.More);
+                N32_map.replace (ansno, (eip, eis)) am
+         in
+        let mrx = S_mrx_ans (cxp, am) in
+        cont (ack, { cs with cs_rxvq = Cf_deque.B.push mrx rxvq })
+    end
+
+let c_rxflow_ans_guard cs cxp am rxvq cont =
+    let z = N32_map.to_seq_incr am in
+    let z = Cf_seq.map (c_rxflow_ans1_guard cs cxp am rxvq cont) z in
+    Cf_seq.C.sequence z
+
+let c_rxflow_guard cs cont =
+    match Cf_deque.B.pop cs.cs_rxdq with
+    | Some ((eip, eis), rxdq) ->
+        c_rxflow_drain_guard cs eis eip rxdq cont
+    | None ->
+        match Cf_deque.B.pop cs.cs_rxvq with
+        | None ->
+            Cf_cmonad.nil
+        | Some (rxv, rxvq) ->
+            match rxv with
+            | S_mrx_clt _ ->
+                Cf_cmonad.nil
+            | S_mrx_msg (sxp, eis) ->
+                c_rxflow_msg_guard cs eis sxp rxvq cont
+            | S_mrx_rpy (cxp, eip, eis) ->
+                c_rxflow_rsp1_guard crxrsp1_rpy cs eis cxp eip rxvq cont
+            | S_mrx_err (cxp, eip, eis) ->
+                c_rxflow_rsp1_guard crxrsp1_err cs eis cxp eip rxvq cont
+            | S_mrx_ans (cxp, am) ->
+                c_rxflow_ans_guard cs cxp am rxvq cont
+
+let _ =
+    errj#fail "Unimplemented: Beep_core.c_txevent_guard"
+    (* guard on the head of the transmit queue.  if the head of the queue is
+    S_mtx_srv, then guard on the rspRx event, else guard on the entity.  if the
+    head of the queue is S_mtx_ans, then guard on every entity in the answer
+    queue, and make sure to rotate the queue. *)
+
+let _ =
+    errj#fail "Unimplemented: Beep_core.c_txready"
+    (* loop through messages in the transmit queue until the queue is drained,
+    the window is consumed, or a response is still incomplete.  call eo_ready
+    on each entity.  when the head of the queue is S_mtx_ans, then loop through
+    the answer queue.  the answer remains pending if any of its messages are
+    still pending. *)
+
+(*--- End of File [ beep_core.ml ] ---*)

File beep/beep_core.mli

+(*---------------------------------------------------------------------------*
+  INTERFACE  beep_core.mli
+
+  Copyright (c) 2003-2007, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(** BEEP Core -- Blocks Extensible Exchange Protocol (RFC 3080) core. *)
+
+(** {6 Overview}
+    
+    This module implements the Blocks Extensible Exchange Protocol (BEEP)
+    core, defined in RFC 3080.  It uses [Iom] for single-threaded concurrency.
+*)
+
+(** {5 Sets and Maps} *)
+
+module N32_map: Cf_map.T with type Key.t = int32
+module N32_set: Cf_set.T with type Element.t = int32
+
+
+(** {5 Standard Reply Codes} *)
+
+val success: int
+val service_unavailable: int
+val session_request_declined: int
+val session_request_failed: int
+val authenticator_unavailable: int
+val general_syntax_error: int
+val parameter_syntax_error: int
+val not_implemented: int
+val authentication_required: int
+val authentication_insufficient: int
+val authentication_failed: int
+val user_not_authorized: int
+val encryption_required: int
+val channel_request_declined: int
+val parameter_invalid: int
+val transaction_error: int
+
+class localizer:
+    string list ->
+    object
+    	method lang: string option
+    	method reply_code: int -> string
+    end
+
+(** {5 Errors} *)
+type error =
+    | X_frame_bad_syntax
+    | X_frame_too_large
+    | X_frame_out_of_sequence
+    | X_frame_no_channel
+    | X_message_in_progress
+    | X_response_unexpected
+
+val error_to_string: error -> string
+
+exception Error of error
+
+(** {5 Frames} *)
+type header = { channo: int32; msgno: int32; seqno: int32 }
+
+type msg_frame = [ `MSG of header * Iom_stream.more * Cf_message.t ]
+type rpy_frame = [ `RPY of header * Iom_stream.more * Cf_message.t ]
+type err_frame = [ `ERR of header * Iom_stream.more * Cf_message.t ]
+type ans_frame = [ `ANS of int32 * header * Iom_stream.more * Cf_message.t ]
+type nul_frame = [ `NUL of header ]
+
+type response_frame = [ rpy_frame | err_frame | ans_frame | nul_frame ]
+
+type frame = [ msg_frame | response_frame ]
+
+val parse_frame: ?limit:Int32.t -> Cf_message.t -> [> frame ] * Cf_message.t
+val emit_frame: [< frame ] -> Cf_message.t
+
+(** {5 Message Exchange} *)
+
+val default_mime_ct: Mime_entity.ct
+val default_mime_cte: Mime_entity.cte
+val default_mime_emit_adapter: Mime_entity.basic_emit_adapter Lazy.t
+val default_mime_headers: Cf_message.t Lazy.t
+
+(*--- End of File [ beep_core.mli ] ---*)

File beep/beep_entity_exchange.ml

-(*---------------------------------------------------------------------------*
-  IMPLEMENTATION  beep_entity_exchange.ml
-
-  Copyright (c) 2003-2005, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-(**)
-module type X_tag = sig
-    val tag: string
-end
-
-module X_create(T: X_tag) = struct
-    let tag = Printf.sprintf "[%s] " T.tag
-
-    let printf fmt =
-        print_string tag;
-        Printf.printf fmt
-    
-    let sprintf (fmt : ('a, unit, string) format) =
-        Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
-end
-
-module X = X_create(struct let tag = "Beep_entity_exchange" end)
-(**)
-
-open Cf_cmonad.Op
-open Iom_gadget
-
-module Err = Beep_error
-module F = Beep_frame
-module E = Beep_exchange
-module FE = Beep_frame_exchange
-
-module N_map = Cf_rbtree.Map(Cf_ordered.Int_order)
-
-type cancel_t = Cancel
-type rx_event_t = [ `Entity of Mime_entity.parsing | `Error of exn ]
-type 'a response_t = [ `RPY of 'a | `ERR of 'a | `ANS of 'a | `NUL ]
-
-type ('up, 'dn) c_state_t =
-    | C_initial of 'dn E.response_t rx * 'up response_t tx
-    | C_single of 'dn rx * ('up -> 'up response_t) * Mime_entity.parsing
-    | C_multi of
-        ('dn rx * Mime_entity.parsing) N_map.t *
-        (int * 'dn E.answer_t rx) option
-    constraint 'up = [> rx_event_t ]
-    constraint 'dn = [< FE.entity_rx_t ]
-
-let default_parsing = Mime_entity.parse [ "\r\n", 0 ,2 ]
-
-let client:
-    ?initial:unit -> ?lim:int -> ?header:Mime_entity.emitting ->
-    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
-    (cancel_t #rx * [> [> rx_event_t ] response_t ] #tx) ->
-    ([< [< Beep_frame_exchange.entity_rx_t ] Beep_exchange.response_t ] #rx *
-        [> Beep_frame_exchange.client_tx_t ] #tx) ->
-    ('s, unit) t
-= fun ?initial ?(lim = 4096) ?header msg hp (cancelRx, rpyTx) (dnRx, dnTx) ->
-    let cancelRx = (cancelRx :> cancel_t rx) in
-    let rpyTx = (rpyTx :> 'up response_t tx) in
-    let dnTx = (dnTx :> FE.client_tx_t tx) in
-    
-    (*--- Put the MSG (unless this is the initial exchange of a session) ---*)
-    begin
-        match initial with
-        | Some () ->
-            if msg != [] then invalid_arg (X.sprintf "client: initial MSG");
-            Cf_cmonad.return ()
-        | None ->
-            begin
-                match header with
-                | Some h -> dnTx#put (`Header h)
-                | None -> Cf_cmonad.return ()
-            end >>= fun () ->
-            dnTx#put (`Payload (F.Last, msg))
-    end >>= fun () ->
-    
-    (*--- Main loop ---*)
-    let rec loop () =
-        load >>= fun s ->
-        guard begin
-            cancelRx#get (fun Cancel -> dnTx#put `Abort) >>= fun () ->
-            match s with
-            | C_initial (dnRx, rpyTx) ->
-                get_dnRx_ dnRx rpyTx
-            | C_single (msgRx, consF, p) ->
-                get_msgRx_ msgRx consF p
-            | C_multi (ansMap, ansOpt) ->
-                begin
-                    match ansOpt with
-                    | None ->
-                        Cf_cmonad.return ()
-                    | Some (ansN, ansRx) ->
-                        get_ansRx0_ ansMap ansN ansRx
-                end >>= fun () ->
-                let mapF (n, (dnRx, p)) = get_ansRxN_ n dnRx p ansMap ansOpt in
-                let z = N_map.to_seq_incr ansMap in
-                let z = Cf_seq.map mapF z in
-                Cf_seq.C.sequence z
-        end
-    
-    (*--- Get an incoming response event ---*)
-    and get_dnRx_ dnRx rpyTx =
-        let dnRx = (dnRx :> 'dn E.response_t rx) in
-        let rpyTx = (rpyTx :> 'up response_t tx) in
-        dnRx#get begin function
-            | `RPY msgRx ->
-                do_dnRx_single_ (fun v -> `RPY v) msgRx
-            | `ERR msgRx ->
-                do_dnRx_single_ (fun v -> `ERR v) msgRx
-            | `ANS ansRx ->
-                store (C_multi (N_map.nil, Some (0, ansRx))) >>= loop
-            | `NUL ->
-                rpyTx#put `NUL
-        end
-    
-    (*--- Do the RPY/ERR response ---*)
-    and do_dnRx_single_ consF msgRx =
-        store (C_single (msgRx, consF, default_parsing)) >>= loop
-        
-    (*--- Get an incoming RPY/ERR frame event ---*)
-    and get_msgRx_ msgRx consF p =
-        let msgRx = (msgRx :> 'dn rx) in
-        msgRx#get begin function
-            | `Header p ->
-                store (C_single (msgRx, consF, p)) >>= loop
-            | `Error x ->
-                rpyTx#put (consF (`Error x))
-            | `Payload (more, msg) ->
-                let body = p#body @ msg in
-                let p = p#put_body body in
-                match more with
-                | F.More when Cf_message.length body > lim ->
-                    rpyTx#put (consF (`Error (Err.X Err.X_entity_too_large)))
-                | F.More ->
-                    store (C_single (msgRx, consF, p)) >>= loop
-                | F.Last ->
-                    rpyTx#put (consF (`Entity p))
-        end
-    
-    (*--- Get a new incoming ANS entity or NUL frame ---*)
-    and get_ansRx0_ ansMap ansN ansRx =
-        let ansRx = (ansRx :> 'dn E.answer_t rx) in
-        ansRx#get begin function
-            | E.Ans_final sync ->
-                assert (N_map.empty ansMap);
-                start sync () >>= fun () ->
-                rpyTx#put `NUL
-            | E.Ans_entity dnRx ->
-                let ansTup = dnRx, default_parsing in
-                let ansMap = N_map.replace (ansN, ansTup) ansMap in
-                let ansN = succ ansN in
-                store (C_multi (ansMap, Some (ansN, ansRx))) >>= loop
-        end
-    
-    (*--- Get an ANS frame ---*)
-    and get_ansRxN_ ansN dnRx p ansMap ansOpt =
-        let dnRx = (dnRx :> 'dn rx) in
-        dnRx#get begin function
-            | `Header p ->
-                do_ansRxN_push_ ansN dnRx p ansMap ansOpt
-            | `Error x ->
-                rpyTx#put (`ANS (`Error x)) >>= fun () ->
-                let ansMap = N_map.delete ansN ansMap in
-                store (C_multi (ansMap, ansOpt)) >>= loop
-            | `Payload (more, msg) ->
-                let p = p#put_body (p#body @ msg) in
-                match more with
-                | F.More ->
-                    do_ansRxN_push_ ansN dnRx p ansMap ansOpt
-                | F.Last ->
-                    rpyTx#put (`ANS (`Entity p)) >>= fun () ->
-                    let ansMap = N_map.delete ansN ansMap in
-                    store (C_multi (ansMap, ansOpt)) >>= loop                
-        end
-    
-    (*--- Store an ANS frame with the '*' continuation indicator ---*)
-    and do_ansRxN_push_ ansN dnRx p ansMap ansOpt =
-        let ansTup = dnRx, p in
-        let ansMap = N_map.replace (ansN, ansTup) ansMap in
-        store (C_multi (ansMap, ansOpt)) >>= loop
-    in
-    
-    (*--- Start the main loop ---*)
-    let dnRx = (dnRx :> 'dn E.response_t rx) in
-    start (loop ()) (C_initial (dnRx, rpyTx))
-
-type 'dn s_state_t =
-    | S_receiving of
-        Mime_entity.emitting response_t F.handle_t rx * FE.entity_rx_t rx *
-        Mime_entity.parsing
-    | S_ready of Mime_entity.emitting response_t F.handle_t rx
-    | S_answering of
-        Mime_entity.emitting response_t F.handle_t rx * 'dn E.answer_t tx
-    constraint 'dn = [> FE.service_tx_t ]
-
-let service:
-    ?initial:unit -> ?lim:int ->
-    [< Beep_frame_exchange.header_processing_t ] ->
-    ([< Mime_entity.emitting response_t ] Beep_frame.handle_t #rx *
-        [> rx_event_t ] #tx) ->
-    ([< Beep_frame_exchange.entity_rx_t ] #rx *
-        [> [> Beep_frame_exchange.service_tx_t ]
-        Beep_exchange.response_t ] #tx) ->
-    ('s, unit) t
-= fun ?initial ?(lim = 4096) hp (ctrlRx, ctrlTx) (dnRx, dnTx) ->
-    let hp = (hp :> FE.header_processing_t) in
-    let ctrlTx = (ctrlTx :> rx_event_t tx) in
-    let dnTx = (dnTx :> 'dn E.response_t tx) in
-    
-    (*--- The main loop ---*)
-    let rec loop () =
-        load >>= fun s ->
-        guard begin
-            match s with
-            | S_receiving (ctrlRx, dnRx, p) ->
-                get_dnRx_ ctrlRx dnRx p
-            | S_ready ctrlRx ->
-                get_ctrlRx_ ctrlRx
-            | S_answering (ctrlRx, ansTx) ->
-                get_ctrlRx_ans_ ctrlRx ansTx
-        end
-        
-    (*--- Get a MSG entity event ---*)
-    and get_dnRx_ ctrlRx dnRx p =
-        let dnRx = (dnRx :> FE.entity_rx_t rx) in
-        dnRx#get begin function
-            | `Header p ->
-                store (S_receiving (ctrlRx, dnRx, p)) >>= loop
-            | `Error x ->
-                ctrlTx#put (`Error x)
-            | `Payload (more, msg) ->
-                match more with
-                | F.Last ->
-                    store (S_ready ctrlRx) >>= fun () ->
-                    let p = p#put_body (p#body @ msg) in
-                    ctrlTx#put (`Entity p) >>= loop
-                | F.More ->
-                    let body = p#body @ msg in
-                    if Cf_message.length body > lim then
-                        ctrlTx#put (`Error (Err.X Err.X_entity_too_large))
-                    else begin
-                        let p = p#put_body body in
-                        store (S_receiving (ctrlRx, dnRx, p)) >>= loop
-                    end
-        end
-    
-    (*--- Get a control event ---*)
-    and get_ctrlRx_ ctrlRx =
-        let ctrlRx =
-            (ctrlRx :> Mime_entity.emitting response_t F.handle_t rx)
-        in
-        ctrlRx#get begin fun rh ->
-            match rh.F.h_data with
-            | `RPY p ->
-                let consF msgRx = `RPY msgRx in
-                do_ctrlRx_single_ consF rh.F.h_ack p
-            | `ERR p ->
-                let consF msgRx = `ERR msgRx in
-                do_ctrlRx_single_ consF rh.F.h_ack p
-            | `ANS p ->
-                simplex >>= fun (ansRx, ansTx) ->
-                dnTx#put (`ANS ansRx) >>= fun () ->
-                store (S_answering (ctrlRx, ansTx)) >>= fun () ->
-                do_ctrlRx_ans_ ansTx rh.F.h_ack p >>= loop
-            | `NUL ->
-                dnTx#put `NUL
-        end
-    
-    (*--- Get a RPY/ERR entity ---*)
-    and do_ctrlRx_single_ consF sync e =
-        simplex >>= fun (msgRx, msgTx) ->
-        let msgTx = (msgTx :> 'dn tx) in
-        begin
-            match hp with
-            | `H_leave -> Cf_cmonad.return ()
-            | _ -> msgTx#put (`Header e)
-        end >>= fun () ->
-        let ph = { F.h_data = F.Last, e#emit; F.h_ack = sync } in
-        msgTx#put (`Payload ph) >>= fun () ->
-        dnTx#put (consF msgRx)
-    
-    (*--- Get a control event (answering) ---*)
-    and get_ctrlRx_ans_ ctrlRx ansTx =
-        let ctrlRx =
-            (ctrlRx :> Mime_entity.emitting response_t F.handle_t rx)
-        in
-        let ansTx = (ansTx :> 'dn E.answer_t tx) in
-        ctrlRx#get begin fun rh ->
-            match rh.F.h_data with
-            | `RPY _
-            | `ERR _ ->
-                loop ()
-            | `NUL ->
-                let sync =
-                    match rh.F.h_ack with
-                    | None -> Cf_cmonad.return ()
-                    | Some r -> r
-                in
-                ansTx#put (E.Ans_final sync)
-            | `ANS e ->
-                do_ctrlRx_ans_ ansTx rh.F.h_ack e >>= loop
-        end
-    
-    (*--- Do ANS entity ---*)
-    and do_ctrlRx_ans_ ansTx sync e =
-        let ansTx = (ansTx :> 'dn E.answer_t tx) in
-        simplex >>= fun (msgRx, msgTx) ->
-        let msgTx = (msgTx :> 'dn tx) in
-        begin
-            match hp with
-            | `H_leave -> Cf_cmonad.return ()
-            | _ -> msgTx#put (`Header e)
-        end >>= fun () ->
-        let ph = { F.h_data = F.Last, e#emit; F.h_ack = sync } in
-        msgTx#put (`Payload ph) >>= fun () ->
-        ansTx#put (E.Ans_entity msgRx)
-    in
-    
-    (*--- Start the main loop ---*)
-    let ctrlRx = (ctrlRx :> Mime_entity.emitting response_t F.handle_t rx) in
-    let state0 =
-        match initial with
-        | Some () ->
-            S_ready ctrlRx
-        | None ->
-            let dnRx = (dnRx :> FE.entity_rx_t rx) in
-            S_receiving (ctrlRx, dnRx, default_parsing)
-    in
-    start (loop ()) state0
-
-class virtual client ?initial ?lim ?header m hp =
-    object(self)
-        inherit FE.client ?initial hp
-        
-        method private virtual entity:
-            's. cancel_t tx -> rx_event_t response_t rx -> ('s, unit) t
-
-        method private frame dnTx dnRx =
-            duplex >>= fun ((ctrlRx, ctrlTx), ctrlIO) ->
-            let ctrlRx = (ctrlRx :> rx_event_t response_t rx) in
-            self#entity ctrlTx ctrlRx >>= fun () ->
-            client ?initial ?lim ?header m hp ctrlIO (dnRx, dnTx)
-    end
-
-class virtual service ?initial ?lim (state0 : 's) hp =
-    object(self)
-        inherit FE.service ?initial hp
-
-        method private virtual entity:
-            rx_event_t -> Mime_entity.emitting response_t F.handle_t tx ->
-            ('s, unit) t
-        
-        method private frame dnRx dnTx =
-            duplex >>= fun ((ctrlRx, ctrlTx), ctrlIO) ->
-            let ctrlRx = (ctrlRx :> rx_event_t rx) in
-            service ?initial ?lim hp ctrlIO (dnRx, dnTx) >>= fun () ->
-            let r =
-                match initial with
-                | Some () ->
-                    self#entity (`Entity default_parsing) ctrlTx
-                | None ->
-                    guard (ctrlRx#get (fun v -> self#entity v ctrlTx))
-            in
-            start r state0
-    end
-
-let crlf_string_ = "\r\n"
-
-(* OBSOLETE
-let emit_aux ~f msg =
-    let buffer = Buffer.create 80 in
-    let pp = Format.formatter_of_buffer buffer in
-    let out, flush, _, spaces =
-        Format.pp_get_all_formatter_output_functions pp ()
-    in
-    let newline () = out crlf_string_ 0 2 in
-    let pp = Format.make_formatter out flush in
-    Format.pp_set_all_formatter_output_functions
-        pp ~out ~flush ~newline ~spaces;
-    let header = f pp msg in
-    Format.pp_print_newline pp ();
-    let body = Cf_message.create (Buffer.contents buffer) in
-    header, body
-*)
-
-(*--- End of File [ beep_entity_exchange.ml ] ---*)

File beep/beep_entity_exchange.mli

-(*---------------------------------------------------------------------------*
-  INTERFACE  beep_entity_exchange.mli
-
-  Copyright (c) 2003-2005, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-open Iom_gadget
-
-type cancel_t = Cancel
-type rx_event_t = [ `Entity of Mime_entity.parsing | `Error of exn ]
-type 'a response_t = [ `RPY of 'a | `ERR of 'a | `ANS of 'a | `NUL ]
-
-val client:
-    ?initial:unit -> ?lim:int -> ?header:Mime_entity.emitting ->
-    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
-    (cancel_t #rx * [> [> rx_event_t ] response_t ] #tx) ->
-    ([< [< Beep_frame_exchange.entity_rx_t ] Beep_exchange.response_t ] #rx *
-        [> Beep_frame_exchange.client_tx_t ] #tx) ->
-    ('s, unit) t
-
-val service:
-    ?initial:unit -> ?lim:int ->
-    [< Beep_frame_exchange.header_processing_t ] ->
-    ([< Mime_entity.emitting response_t ] Beep_frame.handle_t #rx *
-        [> rx_event_t ] #tx) ->
-    ([< Beep_frame_exchange.entity_rx_t ] #rx *
-        [> [> Beep_frame_exchange.service_tx_t ]
-        Beep_exchange.response_t ] #tx) ->
-    ('s, unit) t
-
-class virtual client:
-    ?initial:unit -> ?lim:int -> ?header:Mime_entity.emitting ->
-    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
-    object            
-        inherit Beep_exchange.client
-        
-        method private virtual entity:
-            's. cancel_t tx -> rx_event_t response_t rx -> ('s, unit) t
-    end
-
-class virtual service:
-    ?initial:unit -> ?lim:int -> 's ->
-    [< Beep_frame_exchange.header_processing_t ] ->
-    object
-        inherit Beep_exchange.service
-        
-        method private virtual entity:
-            rx_event_t ->
-            Mime_entity.emitting response_t Beep_frame.handle_t tx ->
-            ('s, unit) t
-    end
-
-(* OBSOLETE
-val emit_aux:
-    f:(Format.formatter -> 'a -> Mime_entity.emitting) -> 'a ->
-    Mime_entity.emitting
-*)
-
-(*--- End of File [ beep_entity_exchange.mli ] ---*)

File beep/beep_error.ml

-(*---------------------------------------------------------------------------*
-  IMPLEMENTATION  beep_error.ml
-
-  Copyright (c) 2003-2005, James H. Woodyatt
-  All rights reserved.
-
-  Redistribution and use in source and binary forms, with or without
-  modification, are permitted provided that the following conditions
-  are met:
-
-    Redistributions of source code must retain the above copyright
-    notice, this list of conditions and the following disclaimer.
-
-    Redistributions in binary form must reproduce the above copyright
-    notice, this list of conditions and the following disclaimer in
-    the documentation and/or other materials provided with the
-    distribution
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-  OF THE POSSIBILITY OF SUCH DAMAGE. 
- *---------------------------------------------------------------------------*)
-
-type t =
-    | X_control_error
-    | X_initial_timeout
-    | X_release_timeout
-    | X_bad_frame_syntax
-    | X_channel_closed
-    | X_out_of_sequence
-    | X_incomplete_entity
-    | X_bad_frame_type
-    | X_more_answers_expected
-    | X_not_requested
-    | X_window_exceeded
-    | X_transport_error
-    | X_bad_entity_header of Cf_lex.cursor
-    | X_entity_too_large
-    | X_incomplete_entity_header of Cf_message.t
-    | X_invalid_entity_header