Commits

Anonymous committed 9e0aa79

Checkpoint. Beep_core and Beep_simple are feature-complete and ready for
testing. Beep_session is still under construction, with work remaining to
be done to manage channel and session start/close events. Greeting events
are unfinished, but the feature is basically there. Beep_serial is almost
feature-complete, and testing is already underway.

  • Participants
  • Parent commits 75c61bf

Comments (0)

Files changed (12)

File beep/.depend

+beep_simple.cmi: beep_core.cmi 
+beep_session.cmi: beep_core.cmi 
+beep_serial.cmi: beep_session.cmi beep_core.cmi 
 beep_core.cmo: beep_core.cmi 
 beep_core.cmx: beep_core.cmi 
-beep_serial.cmo: beep_core.cmi beep_serial.cmi 
-beep_serial.cmx: beep_core.cmx beep_serial.cmi 
+beep_simple.cmo: beep_core.cmi beep_simple.cmi 
+beep_simple.cmx: beep_core.cmx beep_simple.cmi 
+beep_session.cmo: beep_simple.cmi beep_core.cmi beep_session.cmi 
+beep_session.cmx: beep_simple.cmx beep_core.cmx beep_session.cmi 
+beep_serial.cmo: beep_session.cmi beep_core.cmi beep_serial.cmi 
+beep_serial.cmx: beep_session.cmx beep_core.cmx beep_serial.cmi 
 beep_core.cmo: beep_core.cmi 
 beep_core.cmx: beep_core.cmi 
-beep_serial.cmo: beep_core.cmi beep_serial.cmi 
-beep_serial.cmx: beep_core.cmx beep_serial.cmi 
+beep_simple.cmo: beep_core.cmi beep_simple.cmi 
+beep_simple.cmx: beep_core.cmx beep_simple.cmi 
+beep_session.cmo: beep_simple.cmi beep_core.cmi beep_session.cmi 
+beep_session.cmx: beep_simple.cmx beep_core.cmx beep_session.cmi 
+beep_serial.cmo: beep_session.cmi beep_core.cmi beep_serial.cmi 
+beep_serial.cmx: beep_session.cmx beep_core.cmx beep_serial.cmi 
 Open issues in development:
 
++ (T_beep.T2) Sessions preliminarily sending <greeting/> elements (immediately
+    on creation, rather than at programmatic opportunity), but receiving them
+    is still broken.  Incomplete support for starting and closing channels and
+    sessions, and also tests still need to be written.  Distinction between
+    tuning reset and transport close needs to be thought all the way out.
+
 # End of open issues

File beep/Makefile

 BEEP_YACC_ML_FILES = $(BEEP_YACC_MODULES:%=%.ml)
 BEEP_YACC_MLI_FILES = $(BEEP_YACC_MODULES:%=%.mli)
 
-BEEP_MODULES = core serial
+BEEP_MODULES = core simple session serial
 
 BEEP_PRIMITIVES =  # no primitives yet
 

File beep/beep_core.ml

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-open Cf_cmonad.Op
-
 (**)
 let errj = Cf_journal.stderr
+let _ = errj#setlimit `None
 (**)
 
+open Cf_cmonad.Op
+
 module N32_map = Cf_rbtree.Map(Int32)
 module N32_set = Cf_rbtree.Set(Int32)
 
 
 class localizer langs =
     object
-        method lang =
+        method greeting_lang = langs
+        
+        method reply_lang =
             match langs with
             | [] -> None
             | hd :: _ when hd = "en" -> None
             | _ -> Some "en"
                 
-        method reply_code = function
+        method reply_content = function
             | 421 -> "service not available"
             | 450 -> "session request declined"
             | 451 -> "session request failed"
             | x when x < 100 || x > 999 ->
                  invalid_arg
     			   "Beep_std_code.localizer#content: code not 3-digit integer"
-            | _ -> ""
+            | _ -> "unspecified error"
     end
 
+let default_localizer = new localizer []
+
 (*--- frame type ---*)
 type header = { channo: int32; msgno: int32; seqno: int32 }
 
 type msg_frame = [ `MSG of header * Iom_stream.more * Cf_message.t ]
 type rpy_frame = [ `RPY of header * Iom_stream.more * Cf_message.t ]
 type err_frame = [ `ERR of header * Iom_stream.more * Cf_message.t ]
-type ans_frame = [ `ANS of int32 * header * Iom_stream.more * Cf_message.t ]
+type ans_frame = [ `ANS of header * Iom_stream.more * Cf_message.t * int32 ]
 type nul_frame = [ `NUL of header ]
 
 type response_frame = [ rpy_frame | err_frame | ans_frame | nul_frame ]
 
 exception Error of error
 
+(*--- default MIME parameters ---*)
+let default_xml_mime_ct = {
+    Mime_entity.ct_type = Mime_entity.CT_application;
+    ct_subtype = "beep+xml";
+    ct_parameters = Mime_atom.Map.nil;
+}
+
+let default_xml_mime_emit_adapter =
+    new Mime_entity.basic_emit_adapter ~ct:default_xml_mime_ct []
+
+let default_limits = { Iom_octet_stream.low = 4096; high = 4096 }
+
 (*--- frame format and parsing ---*)
 let parse_frame =
     let bad_syntax = Error X_frame_bad_syntax in
     		let ansno = Scanf.kscanf s ef " %lu" (fun x -> uint31 x; x) in
     		let hlen = crlf s in
     		let more, data, m = payload more m hlen size in
-    		`ANS (ansno, header, more, data), m
+    		`ANS (header, more, data, ansno), m
     	| K.NUL ->
     		Scanf.kscanf s ef " %lu %lu . %lu 0\r\nEND\r\n%n"
     		begin fun a b c n ->
     	single "RPY" header more data
     | `ERR (header, more, data) ->
     	single "ERR" header more data
-    | `ANS (ansno, header, more, data) ->
+    | `ANS (header, more, data, ansno) ->
     	let b = Buffer.create 40 in
     	Buffer.add_string b "ANS";
     	let size = Cf_message.length data in
     	let length = Buffer.length b in
     	[ Buffer.contents b, 0, length ]
 
-(*--- entity I/O ---*)
-let default_mime_ct = {
-    Mime_entity.ct_type = Mime_entity.CT_application;
-    ct_subtype = "beep+xml";
-    ct_parameters = Mime_atom.Map.of_list [ "charset", "utf8" ];
-}
-
-let default_mime_cte = Mime_entity.CTE_binary
-
-let default_mime_emit_adapter = lazy begin
-    new Mime_entity.basic_emit_adapter
-        ~ct:default_mime_ct ~cte:default_mime_cte []
-end
-
-let default_mime_headers = lazy begin
-    let a = Lazy.force default_mime_emit_adapter in
-    a#emit
-end
-
-class eprofile limits = object
-    val limits_: Iom_octet_stream.limits = limits
-    method limits = limits_
-end
-
+(*--- channel and exchange driver types ---*)
 type iefix = {
     ief_flowTx: Iom_stream.readywait Iom_gadget.tx;
     ief_eventRx: Mime_stream.entity_scan_event Iom_gadget.rx;
     iep_limits: Iom_octet_stream.limits;
 }
 
+type oe_event =
+    | OE_headers of
+        Mime_entity.basic_emit_adapter *
+        Iom_octet_stream.fragment Iom_gadget.rx
+    | OE_octets of
+        Iom_octet_stream.fragment Iom_gadget.rx
+
+type oefix = {
+    oef_flowRx: Iom_stream.readywait Iom_gadget.rx;
+    oef_eventTx: oe_event Iom_gadget.tx;
+}
+
+type oepad = {
+    oep_flowTx: Iom_stream.readywait Iom_gadget.tx;
+    oep_eventRx: oe_event Iom_gadget.rx;
+    oep_limits: Iom_octet_stream.limits;
+}
+
+type cxrsp =
+    | M_clt_rpy of iefix
+    | M_clt_err of iefix
+    | M_clt_ans of int32 * iefix
+    | M_clt_nul
+
+type sxrsp =
+    | M_srv_rpy of oepad
+    | M_srv_err of oepad
+    | M_srv_ans of int32 * oepad
+    | M_srv_nul
+
+type chnotify =
+    | M_ch_failed of int * exn
+    | M_ch_close of int
+    | M_ch_ok
+    | M_ch_error of int
+
+type snotify =
+    | M_ch0_stop of int32
+    | M_ch0_close of int32 * int
+    | M_ch0_ok of int32
+    | M_ch0_error of int32 * int
+
+type tag (* shadow type for management channel *)
+
+type role = R_initiator | R_listener
+
+type pad = {
+    p_role: role;
+    p_ch0pad: chpad Lazy.t;
+    p_ch0fix: tag chfix Lazy.t;
+    p_notifyRx: snotify Iom_gadget.rx;
+    p_notifyTx: snotify Iom_gadget.tx;
+}
+and chpad = {
+    chp_channo: int32;
+    chp_notifyTx: chnotify Iom_gadget.tx;
+    chp_exchangeRx: cxpad Iom_gadget.rx;
+    chp_sxMsg: int32 -> bool -> sxpad Iom_gadget.t;
+    chp_cxRpy: (iepad * cxrsp) Iom_gadget.t;
+    chp_cxErr: (iepad * cxrsp) Iom_gadget.t;
+    chp_cxAns: int32 -> (iepad * cxrsp) Iom_gadget.t;
+}
+and 'p chfix = {
+    chf_channo: int32;
+    chf_notifyTx: snotify Iom_gadget.tx;
+    chf_notifyRx: chnotify Iom_gadget.rx;
+    chf_exchangeRx: 'p sxfix Iom_gadget.rx;
+    chf_cxMsg: int32 -> bool -> 'p cxfix Iom_gadget.t;
+    chf_sxRpy: (sxrsp * oefix) Iom_gadget.t;
+    chf_sxErr: (sxrsp * oefix) Iom_gadget.t;
+    chf_sxAns: int32 -> (sxrsp * oefix) Iom_gadget.t;
+}
+and cxpad = {
+    cxp_msgno: int32;
+    cxp_greeting: bool;
+    cxp_req: oepad;
+    cxp_abortTx: unit Iom_gadget.tx;
+    cxp_rspTx: cxrsp Iom_gadget.tx;
+}
+and 'p sxfix = {
+    sxf_msgno: int32;
+    sxf_req: iefix;
+    sxf_abortRx: unit Iom_gadget.rx;
+    sxf_rspTx: sxrsp Iom_gadget.tx;
+    sxf_chfix: 'p chfix;
+}
+and sxpad = {
+    sxp_msgno: int32;
+    sxp_greeting: bool;
+    sxp_req: iepad;
+    sxp_abortTx: unit Iom_gadget.tx;
+    sxp_rspRx: sxrsp Iom_gadget.rx;
+}
+and 'p cxfix = {
+    cxf_msgno: int32;
+    cxf_req: oefix;
+    cxf_abortRx: unit Iom_gadget.rx;
+    cxf_rspRx: cxrsp Iom_gadget.rx;
+    cxf_notifyTx: chnotify Iom_gadget.tx;
+}
+
+(*--- entity I/O ---*)
 type ies_variant =
     | S_iev_body of Iom_octet_stream.fragment Iom_gadget.tx
     | S_iev_header
     ies_variant = S_iev_header;
 }
 
-let ie_harness limits =
-    Iom_gadget.duplex >>= fun ((flowRx, eventTx), (eventRx, flowTx)) ->
-    let iep = {
-        iep_flowRx = flowRx;
-        iep_eventTx = eventTx;
-        iep_limits = limits;
-    } in
-    let ief = {
-        ief_flowTx = flowTx;
-        ief_eventRx = eventRx;
-    } in
-    Cf_cmonad.return (iep, ief)
-
-let ie_failed ies failedTx x =
-    let failedTx = (failedTx :> Iom_stream.failed Iom_gadget.tx) in
-    failedTx#put (`Failed x) >>= fun () ->
+let ie_failed ies notifyTx x =
+    notifyTx#put (M_ch_failed (general_syntax_error, x)) >>= fun () ->
     Cf_cmonad.return (0, Some { ies with ies_variant = S_iev_failed })
 
 let ie_push ies more buffer length =
             Cf_exnopt.X x
     with
     | Cf_exnopt.U (scanner, body) ->
-        Iom_gadget.simplex >>= fun (fragmentRx, fragmentTx) ->
-        iep.iep_eventTx#put (scanner, fragmentRx) >>= fun () ->
+        Iom_gadget.simplex >>= fun (octetsRx, fragmentTx) ->
+        iep.iep_eventTx#put (scanner, octetsRx) >>= fun () ->
         let length = Cf_message.length body in
         let buffer = Cf_message.push body Cf_deque.nil in
         ie_body ies iep fragmentTx more buffer length
             cont (0, Some ies)
     end
 
-type oefix = {
-    oef_flowRx: Iom_stream.readywait Iom_gadget.rx;
-    oef_eventTx: Mime_stream.entity_emit_event Iom_gadget.tx;
-}
-
-type oepad = {
-    oep_flowTx: Iom_stream.readywait Iom_gadget.tx;
-    oep_eventRx: Mime_stream.entity_emit_event Iom_gadget.rx;
-    oep_limits: Iom_octet_stream.limits;
-}
-
 type oes_variant =
     | S_oev_body of Iom_octet_stream.fragment Iom_gadget.rx
     | S_oev_header
     oes_variant = S_oev_header;
 }
 
-let oe_harness limits =
-    Iom_gadget.duplex >>= fun ((eventRx, flowTx), (flowRx, eventTx)) ->
-    let oep = {
-        oep_flowTx = flowTx;
-        oep_eventRx = eventRx;
-        oep_limits = limits;
-    } in
-    let oef = {
-        oef_flowRx = flowRx;
-        oef_eventTx = eventTx;
-    } in
-    Cf_cmonad.return (oep, oef)
-
 let oe_work window oes oep more buffer length =
     match more with
     | Iom_stream.More
 let oe_event_guard window oes oep cont =
     match oes.oes_variant with
     | S_oev_header ->
-        oep.oep_eventRx#get begin fun (headers, fragmentRx) ->
-            let data = headers#emit in
-            let buffer = Cf_message.push data oes.oes_buffer in
-            let length = oes.oes_length + Cf_message.length data in
+        oep.oep_eventRx#get begin fun event ->
+            let buffer, length, octetsRx =
+                match event with
+                | OE_headers (headers, octetsRx) ->
+                    let data = headers#to_message in
+                    let buffer = Cf_message.push data oes.oes_buffer in
+                    let length = oes.oes_length + Cf_message.length data in
+                    buffer, length, octetsRx
+                | OE_octets octetsRx ->
+                    oes.oes_buffer, oes.oes_length, octetsRx
+            in
             assert (oes.oes_more == Iom_stream.More);
             let oes = {
                 oes with
                 oes_ready = true;
                 oes_buffer = buffer;
                 oes_length = length;
-                oes_variant = S_oev_body fragmentRx;
+                oes_variant = S_oev_body octetsRx;
             } in
             let v = Some oes, None in
             if window < length then
                 cont v
             end
         end
-    | S_oev_body fragmentRx ->
-        fragmentRx#get begin fun fragment ->
+    | S_oev_body octetsRx ->
+        octetsRx#get begin fun fragment ->
             let data = fragment#data in
             let buffer = Cf_message.push data oes.oes_buffer in
             let length = oes.oes_length + Cf_message.length data in
             oe_work window oes oep fragment#more buffer length >>= cont
         end
 
-(*--- exchange I/O ---*)
-class ['msg, 'rpy, 'err, 'ans] chprofile ~msg ~rpy ~err ~ans = object
-    constraint 'msg = #eprofile
-    constraint 'rpy = #eprofile
-    constraint 'err = #eprofile
-    constraint 'ans = #eprofile
-    
-    val msg_: 'msg = msg
-    val rpy_: 'rpy = rpy
-    val err_: 'err = err
-    val ans_: 'ans = ans
-    
-    method msg = msg_
-    method rpy = rpy_
-    method err = err_
-    method ans = ans_
+class eform ~limits = object
+    method icore =
+        Iom_gadget.duplex >>= fun ((flowRx, eventTx), (eventRx, flowTx)) ->
+        let iep = {
+            iep_flowRx = flowRx;
+            iep_eventTx = eventTx;
+            iep_limits = limits;
+        } in
+        let ief = {
+            ief_flowTx = flowTx;
+            ief_eventRx = eventRx;
+        } in
+        Cf_cmonad.return (iep, ief)
+
+    method ocore =
+        Iom_gadget.duplex >>= fun ((eventRx, flowTx), (flowRx, eventTx)) ->
+        let oep = {
+            oep_flowTx = flowTx;
+            oep_eventRx = eventRx;
+            oep_limits = limits;
+        } in
+        let oef = {
+            oef_flowRx = flowRx;
+            oef_eventTx = eventTx;
+        } in
+        Cf_cmonad.return (oep, oef)
 end
 
-type sxrsp =
-    | M_srv_rpy of oepad
-    | M_srv_err of oepad
-    | M_srv_ans of int32 * oepad
-    | M_srv_nul
-
-type sxpad = {
-    sxp_msgno: int32;
-    sxp_req: iepad;
-    sxp_rspRx: sxrsp Iom_gadget.rx;
-    sxp_failedTx: Iom_stream.failed Iom_gadget.tx;
-}
-
-type sxfix = {
-    sxf_msgno: int32;
-    sxf_req: iefix;
-    sxf_rspTx: sxrsp Iom_gadget.tx;
-    sxf_failedRx: Iom_stream.failed Iom_gadget.rx;
-}
-
-let sx_harness ep msgno =
-    Iom_gadget.duplex >>= fun ((rspRx, failedTx), (failedRx, rspTx)) ->
-    ie_harness ep#limits >>= fun (iep, ief) ->
-    let sxp = {
-        sxp_msgno = msgno;
-        sxp_req = iep;
-        sxp_rspRx = rspRx;
-        sxp_failedTx = failedTx;
-    } in
-    let sxf = {
-        sxf_msgno = msgno;
-        sxf_req = ief;
-        sxf_rspTx = rspTx;
-        sxf_failedRx = failedRx;
-    } in
-    Cf_cmonad.return (sxp, sxf)
+(*--- channel I/O ---*)
+let cx_msgno_check header cxp = (Int32.compare header.msgno cxp.cxp_msgno = 0)
+let cx_msgno_check_queue header = Cf_deque.predicate (cx_msgno_check header)
 
 let sx_msgno_check header sxp = (Int32.compare header.msgno sxp.sxp_msgno = 0)
 let sx_msgno_check_queue header = Cf_deque.predicate (sx_msgno_check header)
 
-type cxrsp =
-    | M_clt_rpy of iefix
-    | M_clt_err of iefix
-    | M_clt_ans of int32 * iefix
-    | M_clt_nul
+type mxrsp1variant = Mx_rpy | Mx_err
 
-type cxpad = {
-    cxp_msgno: int32;
-    cxp_req: oepad;
-    cxp_rspTx: cxrsp Iom_gadget.tx;
-    cxp_failedTx: Iom_stream.failed Iom_gadget.tx;
-}
+type 'p cxmsgvariant =
+    | Cx_msg0 of 'p cxfix
+    | Cx_msg1 of 'p cxfix * Iom_octet_stream.fragment Iom_gadget.tx
+    | Cx_msgx
+
+type cxansvariant = 
+    | Cx_ans_0 of iefix
+    | Cx_ans_1 of Mime_entity.basic_scan_adapter *
+        Iom_octet_stream.fragment Iom_gadget.rx *
+        Iom_stream.readywait Iom_gadget.tx
+
+type 'p cxrspvariant =
+    | Cx_rsp0 of 'p cxfix
+    | Cx_rsp1e of mxrsp1variant * iefix
+    | Cx_rsp1o of mxrsp1variant * Mime_entity.basic_scan_adapter *
+        Iom_octet_stream.fragment Iom_gadget.rx *
+        Iom_stream.readywait Iom_gadget.tx
+    | Cx_rspn of 'p cxfix * cxansvariant N32_map.t
+    | Cx_rspx
+
+type 'p cxstate = Cx of 'p cxmsgvariant * 'p cxrspvariant
+
+let cx_state0 cxf = Cx (Cx_msg0 cxf, Cx_rsp0 cxf)
+
+let cx_null_msg =
+    new Iom_octet_stream.fragment Iom_stream.Last (Cf_message.create "\r\n")
+
+let cx_msg_octets ?h:headers octets = function
+    | Cx (Cx_msg0 cxf, cxrv) ->
+        Iom_gadget.simplex >>= fun (octetsRx, octetsTx) ->
+        let event =
+            match headers with
+            | Some h -> OE_headers (h, octetsRx)
+            | None -> OE_octets octetsRx
+        in
+        cxf.cxf_req.oef_eventTx#put event >>= fun () ->
+        octetsTx#put octets >>= fun () ->
+        Cf_cmonad.return begin
+            let cxmv =
+                match octets#more with
+                | Iom_stream.More -> Cx_msg1 (cxf, octetsTx)
+                | Iom_stream.Last -> Cx_msgx
+            in
+            Cx (cxmv, cxrv)
+        end
+    | Cx (Cx_msg1 (_, octetsTx), cxrv) as cx ->
+        if headers <> None then
+            failwith "Beep_core: message already started.";
+        octetsTx#put octets >>= fun () ->
+        Cf_cmonad.return begin
+            match octets#more with
+            | Iom_stream.More -> cx
+            | Iom_stream.Last -> Cx (Cx_msgx, cxrv)
+        end
+    | Cx (Cx_msgx, _) ->
+        failwith "Beep_core: message already complete."
+
+let cx_msg_guard cont (Cx (cxmv, _) as cx) =
+    match cxmv with
+    | Cx_msg0 cxf
+    | Cx_msg1 (cxf, _) ->
+        cxf.cxf_req.oef_flowRx#get (cont cx)
+    | Cx_msgx ->
+        Cf_cmonad.nil
+
+let cx_rsp0_guard cont nul cxmv cxf =
+    cxf.cxf_rspRx#get begin fun rsp ->
+        match rsp with
+        | M_clt_rpy ief ->
+            cont (Cx (Cx_msgx, Cx_rsp1e (Mx_rpy, ief)))
+        | M_clt_err ief ->
+            cont (Cx (cxmv, Cx_rsp1e (Mx_err, ief)))
+        | M_clt_ans (ansno, ief) ->
+            let ansm = N32_map.replace (ansno, Cx_ans_0 ief) N32_map.nil in
+            cont (Cx (Cx_msgx, Cx_rspn (cxf, ansm)))
+        | M_clt_nul ->
+            nul
+    end
+
+let cx_rsp1e_guard cont cxmv rsp1v ief =
+    match cxmv, rsp1v with
+    | Cx_msgx, _
+    | Cx_msg1 (_, _), Mx_err ->
+        ief.ief_eventRx#get begin fun (headers, octetsRx) ->
+            let cxrv = Cx_rsp1o (rsp1v, headers, octetsRx, ief.ief_flowTx) in
+            cont (Cx (cxmv, cxrv))
+        end
+    | _, _ ->
+        assert (not true);
+        Cf_cmonad.nil
+
+let cx_rsp1o_guard cont cx =
+    match cx with
+    | Cx (Cx_msgx as cxmv, Cx_rsp1o (_, headers, octetsRx, flowTx))
+    | Cx (Cx_msg1 _ as cxmv, Cx_rsp1o (Mx_err, headers, octetsRx, flowTx)) ->
+        octetsRx#get begin fun octets ->
+            cont headers octets flowTx begin
+                match octets#more with
+                | Iom_stream.More -> cx
+                | Iom_stream.Last -> Cx (cxmv, Cx_rspx)
+            end
+        end
+    | Cx _ ->
+        assert (not true);
+        Cf_cmonad.nil
+
+let cx_ansr_guard cont cxf ansm =
+    cxf.cxf_rspRx#get begin fun rsp ->
+        cont (Cx (Cx_msgx, begin
+            match rsp with
+            | M_clt_ans (ansno, ief) ->
+                let ansm = N32_map.replace (ansno, Cx_ans_0 ief) ansm in
+                Cx_rspn (cxf, ansm)
+            | rsp ->
+                assert (rsp = M_clt_nul);
+                Cx_rspx
+        end))
+    end
+
+let cx_ans0_guard cont cxf ansm ansno ief =
+    ief.ief_eventRx#get begin fun (headers, octetsRx) ->
+        let ans1 = Cx_ans_1 (headers, octetsRx, ief.ief_flowTx) in
+        let m = N32_map.replace (ansno, ans1) ansm in
+        cont (Cx (Cx_msgx, Cx_rspn (cxf, m)))
+    end
+
+let cx_ans1_guard cont cxf ansm ansno headers octetsRx flowTx =
+    octetsRx#get begin fun octets ->
+        let ansm =
+            match octets#more with
+            | Iom_stream.More -> ansm
+            | Iom_stream.Last -> N32_map.delete ansno ansm
+        in
+        let cx = Cx (Cx_msgx, Cx_rspn (cxf, ansm)) in
+        cont ansno headers octets flowTx cx
+    end
+
+let cx_ansm_guard c0 c1 cxf ansm (ansno, cxansv) =
+    match cxansv with
+    | Cx_ans_0 ief ->
+        cx_ans0_guard c0 cxf ansm ansno ief
+    | Cx_ans_1 (headers, octetsRx, flowTx) ->
+        cx_ans1_guard c1 cxf ansm ansno headers octetsRx flowTx
+
+let cx_ans_guard c0 c1 cxf ansm =
+    cx_ansr_guard c0 cxf ansm >>= fun () ->
+    Cf_seq.C.sequence begin
+        let z = N32_map.to_seq_incr ansm in
+        Cf_seq.map (cx_ansm_guard c0 c1 cxf ansm) z
+    end
+
+type 'p sxmsgstate =
+    | Sx_msg0 of 'p sxfix
+    | Sx_msg1 of 'p sxfix * Mime_entity.basic_scan_adapter *
+        Iom_octet_stream.fragment Iom_gadget.rx
+    | Sx_msgx
+
+type sxans = Sx_ans of Iom_stream.readywait Iom_gadget.rx *
+    Iom_octet_stream.fragment Iom_gadget.tx
+
+type 'p sxrspstate =
+    | Sx_rsp0 of 'p sxfix
+    | Sx_rsp1 of mxrsp1variant * Iom_stream.readywait Iom_gadget.rx *
+        Iom_octet_stream.fragment Iom_gadget.tx
+    | Sx_rspn of 'p sxfix * int32 * sxans N32_map.t
+    | Sx_rspx
+
+type 'p sxstate = Sx of 'p sxmsgstate * 'p sxrspstate
+
+let sx_state0 sxf = Sx (Sx_msg0 sxf, Sx_rsp0 sxf)
+
+let sx_rsp1_octets ?h:headers rsp1v oeharness octets sx =
+    let Sx (sxmv, sxrv) = sx in
+    match sxrv with
+    | Sx_rsp0 sxf -> begin
+        match sxmv, rsp1v with
+        | Sx_msgx, _
+        | Sx_msg1 (_, _, _), Mx_err ->
+            oeharness >>= fun (rsp, oef) ->
+            sxf.sxf_rspTx#put rsp >>= fun () ->
+            Iom_gadget.simplex >>= fun (octetsRx, octetsTx) ->
+            let event =
+                match headers with
+                | Some h -> OE_headers (h, octetsRx)
+                | None -> OE_octets octetsRx
+            in
+            oef.oef_eventTx#put event >>= fun () ->
+            octetsTx#put octets >>= fun () ->
+            let sxrv =
+                match octets#more with
+                | Iom_stream.More -> Sx_rsp1 (rsp1v, oef.oef_flowRx, octetsTx)
+                | Iom_stream.Last -> Sx_rspx
+            in
+            Cf_cmonad.return (Sx (sxmv, sxrv))
+        | Sx_msg0 _, _ ->
+            failwith "Beep_core: message not started."
+        | _, _ ->
+            failwith "Beep_core: message incomplete."
+    end
+    | Sx_rspx ->
+        failwith "Beep_core: response already completed."
+    | Sx_rspn (_, _, _) ->
+        failwith "Beep_core: multiple response type already selected."
+    | Sx_rsp1 (rsp1v', _, octetsTx) ->
+        if rsp1v' <> rsp1v then
+            failwith "Beep_core: other response type already selected.";
+        if headers <> None then
+            failwith "Beep_core: response already started.";
+        match sxmv, rsp1v with
+        | Sx_msgx, _
+        | Sx_msg1 (_, _, _), Mx_err ->
+            octetsTx#put octets >>= fun () ->
+            let sxrv =
+                match octets#more with
+                | Iom_stream.More -> sxrv
+                | Iom_stream.Last -> Sx_rspx
+            in
+            Cf_cmonad.return (Sx (sxmv, sxrv))
+        | Sx_msg0 _, _ ->
+            failwith "Beep_core: message not started."
+        | _, _ ->
+            failwith "Beep_core: message incomplete."
+
+let rec sx_choose_ansno ansno ansm =
+    if Int32.compare ansno 0l < 0 then
+        sx_choose_ansno 0l ansm
+    else if N32_map.member ansno ansm then begin
+        let n, _ = N32_map.max ansm in
+        if Int32.compare n Int32.max_int = 0 then
+            sx_choose_ansno (Int32.succ ansno) ansm
+        else
+            n
+    end
+    else
+        ansno
+
+let sx_ans_start ?n ?h oeharness octets sx =
+    let Sx (sxmv, sxrv) = sx in
+    let sxf, ansm, ansno =
+        match sxrv, n with
+        | Sx_rsp0 sxf, None -> sxf, N32_map.nil, 0l
+        | Sx_rsp0 sxf, Some ansno -> sxf, N32_map.nil, ansno
+        | Sx_rsp1 _, _ -> failwith "Beep_core: single response started."
+        | Sx_rspn (sxf, ansno, ansm), None
+        | Sx_rspn (sxf, _, ansm), Some ansno -> sxf, ansm, ansno
+        | Sx_rspx, _ -> failwith "Beep_core: answers completed already."
+    in
+    let ansno = sx_choose_ansno ansno ansm in
+    let nextansno = sx_choose_ansno (Int32.succ ansno) ansm in
+    oeharness ansno >>= fun (rsp, oef) ->
+    sxf.sxf_rspTx#put rsp >>= fun () ->
+    Iom_gadget.simplex >>= fun (octetsRx, octetsTx) ->
+    let event =
+        match h with
+        | Some h -> OE_headers (h, octetsRx)
+        | None -> OE_octets octetsRx
+    in
+    oef.oef_eventTx#put event >>= fun () ->
+    octetsTx#put octets >>= fun () ->
+    let ansm =
+        match octets#more with
+        | Iom_stream.More ->
+            N32_map.replace (ansno, Sx_ans (oef.oef_flowRx, octetsTx)) ansm
+        | Iom_stream.Last ->
+            ansm
+    in
+    Cf_cmonad.return (Sx (sxmv, Sx_rspn (sxf, nextansno, ansm)), ansno)
+
+let sx_ans_append ~n:ansno octets sx =
+    let Sx (sxmv, sxrv) = sx in
+    match sxrv with
+    | Sx_rsp0 _ -> failwith "Beep_core: no answers started."
+    | Sx_rsp1 _ -> failwith "Beep_core: single response started."
+    | Sx_rspx -> failwith "Beep_core: answers completed already."
+    | Sx_rspn (sxf, nextansno, ansm) ->
+        try
+            let Sx_ans (_, octetsTx), ansmtl = N32_map.extract ansno ansm in
+            let ansm =
+                match octets#more with
+                | Iom_stream.More -> ansm
+                | Iom_stream.Last -> ansmtl
+            in
+            let sx = Sx (sxmv, Sx_rspn (sxf, nextansno, ansm)) in
+            octetsTx#put octets >>= fun () ->
+            Cf_cmonad.return sx
+        with
+        | Not_found ->
+            failwith "Beep_core: answer not started."
+
+let sx_msg0_guard cont sxrv sxf =
+    sxf.sxf_req.ief_eventRx#get begin fun (headers, octetsRx) ->
+        cont (Sx (Sx_msg1 (sxf, headers, octetsRx), sxrv))
+    end
+
+let sx_msg1_guard cont sxrv sxf headers octetsRx =
+    octetsRx#get begin fun octets ->
+        let sxmv =
+            match octets#more with
+            | Iom_stream.More -> Sx_msg1 (sxf, headers, octetsRx)
+            | Iom_stream.Last -> Sx_msgx
+        in
+        cont headers octets sxf.sxf_req.ief_flowTx (Sx (sxmv, sxrv))
+    end
+
+let sx_rsp1_guard cont flowRx sx = flowRx#get (cont sx)
+
+let sx_rspn_guard cont ansm sx =
+    Cf_seq.C.sequence begin
+        let z = N32_map.to_seq_incr ansm in
+        Cf_seq.map begin fun (ansno, Sx_ans (flowRx, _)) ->
+            flowRx#get (cont sx ansno)
+        end z
+    end
+
+class ['p] chform ~msg ~rpy ~err ~ans =
+    let msg = (msg :> eform) in
+    let rpy = (rpy :> eform) in
+    let err = (err :> eform) in
+    let ans = (ans :> eform) in
+    let ieMsg = msg#icore in
+    let oeMsg = msg#ocore in
+    let ieRpy = rpy#icore in
+    let oeRpy = rpy#ocore in
+    let ieErr = err#icore in
+    let oeErr = err#ocore in
+    let ieAns = ans#icore in
+    let oeAns = ans#ocore in
+    let cxMsg notifyTx dnTx msgno greeting =
+        Iom_gadget.simplex >>= fun (rspRx, rspTx) ->
+        Iom_gadget.simplex >>= fun (abortRx, abortTx) ->
+        oeMsg >>= fun (oep, oef) ->
+        let cxp = {
+            cxp_msgno = msgno;
+            cxp_req = oep;
+            cxp_rspTx = rspTx;
+            cxp_abortTx = abortTx;
+            cxp_greeting = greeting;
+        } in
+        let cxf = {
+            cxf_msgno = msgno;
+            cxf_req = oef;
+            cxf_rspRx = rspRx;
+            cxf_abortRx = abortRx;
+            cxf_notifyTx = notifyTx;
+        } in
+        dnTx#put cxp >>= fun () ->
+        Cf_cmonad.return cxf
+    in
+    let cxRpy =
+        ieRpy >>= fun (iep, ief) ->
+        Cf_cmonad.return (iep, M_clt_rpy ief)
+    in
+    let cxErr =
+        ieErr >>= fun (iep, ief) ->
+        Cf_cmonad.return (iep, M_clt_err ief)
+    in
+    let cxAns ansno =
+        ieAns >>= fun (iep, ief) ->
+        Cf_cmonad.return (iep, M_clt_ans (ansno, ief))
+    in
+    let sxMsg chf upTx msgno greeting =
+        Iom_gadget.simplex >>= fun (rspRx, rspTx) ->
+        Iom_gadget.simplex >>= fun (abortRx, abortTx) ->
+        ieMsg >>= fun (iep, ief) ->
+        let sxp = {
+            sxp_msgno = msgno;
+            sxp_req = iep;
+            sxp_rspRx = rspRx;
+            sxp_abortTx = abortTx;
+            sxp_greeting = greeting;
+        } in
+        let sxf = {
+            sxf_chfix = chf;
+            sxf_msgno = msgno;
+            sxf_req = ief;
+            sxf_rspTx = rspTx;
+            sxf_abortRx = abortRx;
+        } in
+        upTx#put sxf >>= fun () ->
+        Cf_cmonad.return sxp
+    in
+    let sxRpy =
+        oeRpy >>= fun (oep, oef) ->
+        Cf_cmonad.return (M_srv_rpy oep, oef)
+    in
+    let sxErr =
+        oeErr >>= fun (oep, oef) ->
+        Cf_cmonad.return (M_srv_err oep, oef)
+    in
+    let sxAns ansno =
+        oeAns >>= fun (oep, oef) ->
+        Cf_cmonad.return (M_srv_ans (ansno, oep), oef)
+    in
+    object
+        method core:
+            pad -> int32 -> (chpad * 'p chfix) Iom_gadget.t =
+            fun p channo ->
+                Iom_gadget.duplex >>= fun ((upRx, dnTx), (dnRx, upTx)) ->
+                Iom_gadget.simplex >>= fun (notifyRx, notifyTx) ->
+                let rec chf = {
+                    chf_channo = channo;
+                    chf_notifyTx = p.p_notifyTx;
+                    chf_notifyRx = notifyRx;
+                    chf_exchangeRx = upRx;
+                    chf_cxMsg = cxMsg notifyTx dnTx;
+                    chf_sxRpy = sxRpy;
+                    chf_sxErr = sxErr;
+                    chf_sxAns = sxAns;
+                } in
+                let chp = {
+                    chp_channo = channo;
+                    chp_notifyTx = notifyTx;
+                    chp_exchangeRx = dnRx;
+                    chp_sxMsg = sxMsg chf upTx;
+                    chp_cxRpy = cxRpy;
+                    chp_cxErr = cxErr;
+                    chp_cxAns = cxAns;
+                } in
+                Cf_cmonad.return (chp, chf)
+    end
+
+class ['state] mxdrv abortRx state = object(self)
+    inherit Iom_gadget.next
+    inherit Iom_gadget.start
     
-type cxfix = {
-    cxf_msgno: int32;
-    cxf_req: oefix;
-    cxf_rspRx: cxrsp Iom_gadget.rx;
-    cxf_failedRx: Iom_stream.failed Iom_gadget.rx;
+    val core_: 'state = state
+    method core = core_
+    
+    method private abort: 'a. 'a Iom_gadget.t = Iom_gadget.abort
+    method private complete: 'a.'a Iom_gadget.t = Iom_gadget.abort
+    
+    method private continue: 'a. 'state -> 'a Iom_gadget.t = fun state ->
+        {< core_ = state >}#next
+    
+    method private guard = abortRx#get (fun () -> self#abort)
+end
+
+class ['p] cxdrv ?cx cxf =
+    let cx0 = match cx with None -> cx_state0 cxf | Some cx -> cx in
+    object(self)
+        inherit ['p cxstate] mxdrv cxf.cxf_abortRx cx0 as super
+
+        method private continue: 'a. 'p cxstate -> 'a Iom_gadget.t = function
+            | Cx (Cx_msgx, Cx_rspx) -> self#complete
+            | cx -> super#continue cx
+        
+        method private failed:
+            'a. int -> exn -> 'a Iom_gadget.t = fun code x ->
+            cxf.cxf_notifyTx#put (M_ch_failed (code, x)) >>= fun () ->
+            self#abort
+        
+        method private msg_flow cx ready =
+            match ready, cx with
+            | `Ready, Cx (Cx_msg0 cxf, cxrv) ->
+                assert (match cxrv with Cx_rsp0 _ -> true | _ -> false);
+                Iom_gadget.simplex >>= fun (octetsRx, octetsTx) ->
+                cxf.cxf_req.oef_eventTx#put (OE_octets octetsRx) >>= fun () ->
+                octetsTx#put cx_null_msg >>= fun () ->
+                self#continue (Cx (Cx_msgx, cxrv))
+            | _, _ ->
+                self#continue cx
+
+        method private msg_octets = cx_msg_octets    
+        method private rpy_octets _ _ _ = self#continue
+        method private err_octets _ _ _ = self#continue
+        method private ans_octets _ _ _ _ = self#continue
+        method private nul = self#complete
+                
+        method private guard =
+            super#guard >>= fun () ->
+            cx_msg_guard self#msg_flow core_ >>= fun () ->
+            let Cx (cxmv, cxrv) = core_ in
+            match cxmv, cxrv with
+            | Cx_msg1 _, Cx_rsp0 cxf
+            | Cx_msgx, Cx_rsp0 cxf ->
+                cx_rsp0_guard self#continue self#nul cxmv cxf
+            | Cx_msg1 _, Cx_rsp1e (Mx_err as rsp1v, ief)
+            | Cx_msgx, Cx_rsp1e (rsp1v, ief) ->
+                cx_rsp1e_guard self#continue cxmv rsp1v ief
+            | Cx_msg1 _, Cx_rsp1o (Mx_err as rsp1v, _, _, _)
+            | Cx_msgx, Cx_rsp1o (rsp1v, _, _, _) ->
+                let cont =
+                    match rsp1v with
+                    | Mx_rpy -> self#rpy_octets
+                    | Mx_err -> self#err_octets
+                in
+                cx_rsp1o_guard cont core_
+            | Cx_msgx, Cx_rspn (cxf, ansm) ->
+                cx_ans_guard self#continue self#ans_octets cxf ansm
+            | _, _ ->
+                Cf_cmonad.nil
+    end
+
+class ['p] sxdrv ?sx sxf =
+    let sx0 = match sx with None -> sx_state0 sxf | Some sx -> sx in
+    object(self)
+        inherit ['p sxstate] mxdrv sxf.sxf_abortRx sx0 as super
+
+        method private continue: 'a. 'p sxstate -> 'a Iom_gadget.t = function
+            | Sx (Sx_msgx, Sx_rspx) -> self#complete
+            | sx -> super#continue sx
+        
+        method private msg_octets _ _ _ sx = self#continue sx
+
+        method private rsp1_flow sx _ = self#continue sx
+        
+        method private rpy_octets ?h =
+            sx_rsp1_octets ?h Mx_rpy sxf.sxf_chfix.chf_sxRpy
+
+        method private err_octets ?h =
+            sx_rsp1_octets ?h Mx_err sxf.sxf_chfix.chf_sxErr
+        
+        method private ans_flow sx _ _ = self#continue sx
+
+        method private ans_octets_start ?n ?h =
+            sx_ans_start ?n ?h sxf.sxf_chfix.chf_sxAns
+
+        method private ans_octets_append = sx_ans_append
+        
+        method private msg_guard (Sx (sxmv, sxrv)) =
+            match sxmv with
+            | Sx_msg0 sxf ->
+                sx_msg0_guard self#continue sxrv sxf
+            | Sx_msg1 (sxf, headers, octetsRx) ->
+                sx_msg1_guard self#msg_octets sxrv sxf headers octetsRx
+            | Sx_msgx ->
+                Cf_cmonad.nil
+
+        method private rsp_guard sx =
+            let Sx (sxmv, sxrv) = sx in
+            match sxmv, sxrv with
+            | Sx_msgx, Sx_rsp1 (Mx_rpy, flowRx, _)
+            | Sx_msg1 _, Sx_rsp1 (Mx_err, flowRx, _)
+            | Sx_msgx, Sx_rsp1 (Mx_err, flowRx, _) ->
+                sx_rsp1_guard self#rsp1_flow flowRx sx
+            | Sx_msgx, Sx_rspn (_, _, ansm) ->
+                sx_rspn_guard self#ans_flow ansm sx
+            | _, _ ->
+                Cf_cmonad.nil
+
+        method private guard =
+            super#guard >>= fun () ->
+            self#msg_guard core_ >>= fun () ->
+            self#rsp_guard core_
+    end
+
+type 'p chdrvstate = {
+    cds_msgno: int32;
+    cds_closing: exn option;
 }
 
-let cx_harness profile msgno =
-    Iom_gadget.duplex >>= fun ((failedRx, rspTx), (rspRx, failedTx)) ->
-    oe_harness profile#msg#limits >>= fun (oep, oef) ->
-    let cxp = {
-        cxp_msgno = msgno;
-        cxp_req = oep;
-        cxp_rspTx = rspTx;
-        cxp_failedTx = failedTx;
-    } in
-    let cxf = {
-        cxf_msgno = msgno;
-        cxf_req = oef;
-        cxf_rspRx = rspRx;
-        cxf_failedRx = failedRx;
-    } in
-    Cf_cmonad.return (cxp, cxf)
+let chdrvstate0 = {
+    cds_msgno = 0l;
+    cds_closing = None;
+}
 
-let cx_msgno_check header cxp = (Int32.compare header.msgno cxp.cxp_msgno = 0)
-let cx_msgno_check_queue header = Cf_deque.predicate (cx_msgno_check header)
+class ['p] chdrv chf = object(self:'self)
+    inherit Iom_gadget.start
+    inherit Iom_gadget.next
+    
+    val core_: 'p chdrvstate = chdrvstate0
+    method core = core_
+
+    method private abort: 'a. 'a Iom_gadget.t = Iom_gadget.abort
+    
+    method private close_ok = self#abort
+    method private close_error _ = self#next
+
+    method private close_msg _ =
+        chf.chf_notifyTx#put (M_ch0_ok chf.chf_channo) >>= fun () ->
+        Iom_gadget.abort
+
+    method private failed code x =
+        match x with
+        | Error _ ->
+            Iom_gadget.abort
+        | _ ->
+            let m = M_ch0_close (chf.chf_channo, code) in
+            chf.chf_notifyTx#put m >>= fun () ->
+            let s = { core_ with cds_closing = Some x } in
+            {< core_ = s >}#next
+    
+    method private closing: 'p chdrvstate Iom_gadget.t =
+        let m = M_ch0_close (chf.chf_channo, success) in
+        chf.chf_notifyTx#put m >>= fun () ->
+        Cf_cmonad.return { core_ with cds_closing = Some End_of_file }
+    
+    method private notify = function
+        | M_ch_failed (code, x) -> self#failed code x
+        | M_ch_close n -> self#close_msg n
+        | M_ch_ok -> self#close_ok
+        | M_ch_error n -> self#close_error n
+    
+    method private sx_start sxf =
+        Cf_cmonad.return (self, new sxdrv sxf)
+    
+    method private cx_start:
+        ?n:int32 -> ('p chdrvstate -> 'p cxfix ->
+        ('self * 'p cxdrv) Iom_gadget.t) ->'self Iom_gadget.t = fun ?n f ->
+        if core_.cds_closing <> None then
+            failwith "Beep_core.chdrv#cx_start: channel is closing.";
+        let n = match n with Some n -> n | None -> core_.cds_msgno in
+        chf.chf_cxMsg n false >>= fun cxf ->
+        let n =
+            if Int32.compare n Int32.max_int < 0 then Int32.succ n else 0l
+        in
+        f { core_ with cds_msgno = n } cxf >>= fun (obj, cx) ->
+        cx#start >>= fun () ->
+        Cf_cmonad.return obj
+    
+    method private guard =
+        chf.chf_notifyRx#get self#notify >>= fun () ->
+        chf.chf_exchangeRx#get begin fun sxf ->
+            self#sx_start sxf >>= fun (obj, sx) ->
+            sx#start >>= fun () ->
+            obj#next
+        end
+end
 
 type chsrxvariant =
     | S_mrx_new
     | S_mrx_err of cxpad * iepad * iestate
     | S_mrx_ans of cxpad * (iepad * iestate) N32_map.t
 
-let mrx_failed mrx x =
-    match mrx with
-    | S_mrx_new ->
-        Cf_cmonad.nil
-    | S_mrx_rpy (cxp, _, _)
-    | S_mrx_err (cxp, _, _)
-    | S_mrx_ans (cxp, _) ->
-        cxp.cxp_failedTx#put x
-    | S_mrx_msg (sxp, _) ->
-        sxp.sxp_failedTx#put x
-
 type chstxvariant =
     | S_mtx_new
     | S_mtx_msg of cxpad * oestate
         sxpad * (oepad * oestate) N32_map.t * (int32 * oepad) Cf_deque.t *
         Iom_stream.more
 
-let mtx_failed mtx x =
-    match mtx with
-    | S_mtx_new ->
-        Cf_cmonad.nil
-    | S_mtx_rpy (sxp, _, _)
-    | S_mtx_err (sxp, _, _)
-    | S_mtx_ans (sxp, _, _, _) ->
-        sxp.sxp_failedTx#put x
-    | S_mtx_msg (cxp, _) ->
-        cxp.cxp_failedTx#put x
-
-(*--- channel I/O ---*)
-class type chprofile_basic = [eprofile, eprofile, eprofile, eprofile] chprofile
-
-type chnotify = Iom_stream.failed
-
-type chpad = {
-    chp_channo: int32;
-    chp_profile: chprofile_basic;
-    chp_notifyTx: chnotify Iom_gadget.tx;
-    chp_exchangeRx: cxpad Iom_gadget.rx;
-    chp_exchangeTx: sxfix Iom_gadget.tx;
-}
-
-type chfix = {
-    chf_notifyRx: chnotify Iom_gadget.rx;
-    chf_exchangeRx: sxfix Iom_gadget.rx;
-    chf_exchangeTx: cxpad Iom_gadget.tx;
-}
-
-let ch_harness channo profile =
-    let profile = (profile :> chprofile_basic) in
-    Iom_gadget.duplex >>= fun ((upRx, dnTx), (dnRx, upTx)) ->
-    Iom_gadget.simplex >>= fun (notifyRx, notifyTx) ->
-    let chp = {
-        chp_channo = channo;
-        chp_profile = profile;
-        chp_notifyTx = notifyTx;
-        chp_exchangeRx = dnRx;
-        chp_exchangeTx = upTx;
-    } in
-    let chf = {
-        chf_notifyRx = notifyRx;
-        chf_exchangeRx = upRx;
-        chf_exchangeTx = dnTx;
-    } in
-    Cf_cmonad.return (chp, chf)
-
 type chstate = {
     chs_rxseqno: int32;
     chs_rxv: chsrxvariant;
     chs_txq: sxpad Cf_deque.t;
 }
 
-let chs0 = {
+let ch_channo chp = chp.chp_channo
+
+let ch_state0 = {
     chs_rxseqno = 0l;
     chs_rxv = S_mrx_new;
     chs_rxq = Cf_deque.nil;
     chs_txq = Cf_deque.nil;
 }
 
-let ch_failed chs chp error =
+let ch_rxseqno chs = chs.chs_rxseqno
+
+let ch_failed chs chp code x =
+    begin
+        if code > 0 && code < 1000 then
+            chp.chp_notifyTx#put (M_ch_failed (code, x))
+        else
+            Cf_cmonad.nil
+    end >>= fun () ->
+    Cf_seq.C.sequence begin
+        let z = Cf_deque.B.to_seq chs.chs_rxq in
+        Cf_seq.map (fun cxp -> cxp.cxp_abortTx#put ()) z
+    end >>= fun () ->
+    Cf_seq.C.sequence begin
+        let z = Cf_deque.B.to_seq chs.chs_txq in
+        Cf_seq.map (fun sxp -> sxp.sxp_abortTx#put ()) z
+    end
+
+let ch_errout chs chp error =
     let x = Error error in
-    let fx = `Failed x in
-    chp.chp_notifyTx#put fx >>= fun () ->
-    mrx_failed chs.chs_rxv fx >>= fun () ->
-    let z = Cf_deque.B.to_seq chs.chs_rxq in
-    let z = Cf_seq.map (fun cxp -> cxp.cxp_failedTx#put fx) z in
-    Cf_seq.C.sequence z >>= fun () ->
-    mtx_failed chs.chs_txv fx >>= fun () ->
-    let z = Cf_deque.B.to_seq chs.chs_txq in
-    let z = Cf_seq.map (fun sxp -> sxp.sxp_failedTx#put fx) z in
-    Cf_seq.C.sequence z >>= fun () ->
+    ch_failed chs chp (-1) x >>= fun () ->
     Cf_cmonad.return (Cf_exnopt.X x)
 
 let ch_rxf_header chs chp header =
 let ch_rxf_msg chs chp header more data =
     match ch_rxf_header chs chp header with
     | Some error ->
-        ch_failed chs chp error
+        ch_errout chs chp error
     | None ->
         match chs.chs_rxv with
         | S_mrx_new when sx_msgno_check_queue header chs.chs_txq -> begin
-            sx_harness chp.chp_profile#msg header.msgno >>= fun (sxp, sxf) ->
-            chp.chp_exchangeTx#put sxf >>= fun () ->
+            chp.chp_sxMsg header.msgno false >>= fun sxp ->
             let seqno = ch_seqno_adjust chs data in
             let txq = Cf_deque.A.push sxp chs.chs_txq in
             let vcons = ch_mrx_vcons_msg sxp in
-            ch_rxf_aux vcons chs ies0 sxp.sxp_req sxp.sxp_failedTx
+            ch_rxf_aux vcons chs ies0 sxp.sxp_req chp.chp_notifyTx
                 chs.chs_rxq txq seqno more data
           end
         | S_mrx_msg (sxp, ies) when sx_msgno_check header sxp -> begin
             let seqno = ch_seqno_adjust chs data in
             let vcons = ch_mrx_vcons_msg sxp in
-            ch_rxf_aux vcons chs ies sxp.sxp_req sxp.sxp_failedTx chs.chs_rxq
+            ch_rxf_aux vcons chs ies sxp.sxp_req chp.chp_notifyTx chs.chs_rxq
                 chs.chs_txq seqno more data
           end
         | _ ->
-            ch_failed chs chp (X_message_in_progress header)
+            ch_errout chs chp (X_message_in_progress header)
 
 type crxrsp1 =
     | F_rsp1_rpy of
         (cxpad -> iepad -> iestate -> chsrxvariant) *
-        (chprofile_basic -> eprofile) *
-        (iefix -> cxrsp)
+        (chpad -> (iepad * cxrsp) Iom_gadget.t)
     | F_rsp1_err of
         (cxpad -> iepad -> iestate -> chsrxvariant) *
-        (chprofile_basic -> eprofile) *
-        (iefix -> cxrsp)
+        (chpad -> (iepad * cxrsp) Iom_gadget.t)
 
 let crxrsp1_rpy =
     let variant cxp iep ies = S_mrx_rpy (cxp, iep, ies) in
-    let profile p = p#rpy in
-    let message ief = M_clt_err ief in
-    F_rsp1_rpy (variant, profile, message)
+    let iharness chp = chp.chp_cxRpy in
+    F_rsp1_rpy (variant, iharness)
 
 let crxrsp1_err =
     let variant cxp iep ies = S_mrx_err (cxp, iep, ies) in
-    let profile p = p#err in
-    let message ief = M_clt_err ief in
-    F_rsp1_err (variant, profile, message)
+    let iharness chp = chp.chp_cxErr in
+    F_rsp1_err (variant, iharness)
 
 let ch_rxf_rsp1 rspv chs chp header more data =
     match ch_rxf_header chs chp header with
     | Some error ->
-        ch_failed chs chp error
+        ch_errout chs chp error
     | None ->
         match rspv, chs.chs_rxv with
-        | F_rsp1_rpy (vcons, _, _), S_mrx_rpy (cxp, iep, ies)
-        | F_rsp1_err (vcons, _, _), S_mrx_err (cxp, iep, ies)
+        | F_rsp1_rpy (vcons, _), S_mrx_rpy (cxp, iep, ies)
+        | F_rsp1_err (vcons, _), S_mrx_err (cxp, iep, ies)
           when cx_msgno_check header cxp -> begin
             let seqno = ch_seqno_adjust chs data in
             let vcons = vcons cxp iep in
-            ch_rxf_aux vcons chs ies iep cxp.cxp_failedTx chs.chs_rxq
+            ch_rxf_aux vcons chs ies iep chp.chp_notifyTx chs.chs_rxq
                 chs.chs_txq seqno more data
           end
-        | F_rsp1_rpy (vcons, eprf, mcons), S_mrx_new
-        | F_rsp1_err (vcons, eprf, mcons), S_mrx_new -> begin
+        | F_rsp1_rpy (vcons, iharness), S_mrx_new
+        | F_rsp1_err (vcons, iharness), S_mrx_new -> begin
             match Cf_deque.B.pop chs.chs_rxq with
             | Some (cxp, rxq) when cx_msgno_check header cxp ->
-                let eprf = eprf chp.chp_profile in
-                ie_harness eprf#limits >>= fun (iep, ief) ->
-                cxp.cxp_rspTx#put (mcons ief) >>= fun () ->
+                iharness chp >>= fun (iep, cxrsp) ->
+                cxp.cxp_rspTx#put cxrsp >>= fun () ->
                 let seqno = ch_seqno_adjust chs data in
                 let vcons = vcons cxp iep in
-                ch_rxf_aux vcons chs ies0 iep cxp.cxp_failedTx rxq chs.chs_txq
+                ch_rxf_aux vcons chs ies0 iep chp.chp_notifyTx rxq chs.chs_txq
                     seqno more data
             | _ ->
-                ch_failed chs chp (X_response_unexpected header)
+                ch_errout chs chp (X_response_unexpected header)
           end
         | _ ->
-            ch_failed chs chp (X_message_in_progress header)
+            ch_errout chs chp (X_message_in_progress header)
 
 let ch_mrx_vcons_ans cxp ansno iem iep ies =
     S_mrx_ans (cxp, N32_map.replace (ansno, (iep, ies)) iem)
 let ch_rxf_ans chs chp ansno header more data =
     match ch_rxf_header chs chp header with
     | Some error ->
-        ch_failed chs chp error
+        ch_errout chs chp error
     | None ->
         match chs.chs_rxv with
         | S_mrx_new -> begin
             match Cf_deque.B.pop chs.chs_rxq with
             | Some (cxp, rxq) when cx_msgno_check header cxp ->
-                ie_harness chp.chp_profile#ans#limits >>= fun (iep, ief) ->
-                cxp.cxp_rspTx#put (M_clt_ans (ansno, ief)) >>= fun () ->
+                chp.chp_cxAns ansno >>= fun (iep, cxrsp) ->
+                cxp.cxp_rspTx#put cxrsp >>= fun () ->
                 let seqno = ch_seqno_adjust chs data in
                 let ansv = ansno, cxp, N32_map.nil in
                 let vcons = ch_mrx_vcons_ans cxp ansno N32_map.nil iep in
-                ch_rxf_aux ~ansv vcons chs ies0 iep cxp.cxp_failedTx rxq
+                ch_rxf_aux ~ansv vcons chs ies0 iep chp.chp_notifyTx rxq
                     chs.chs_txq seqno more data
             | _ ->
-                ch_failed chs chp (X_response_unexpected header)
+                ch_errout chs chp (X_response_unexpected header)
           end
         | S_mrx_ans (cxp, iem) when cx_msgno_check header cxp -> begin
             let seqno = ch_seqno_adjust chs data in
             | Cf_exnopt.U (iep, ies) ->
                 let ansv = ansno, cxp, iem in
                 let vcons = ch_mrx_vcons_ans cxp ansno iem iep in
-                ch_rxf_aux ~ansv vcons chs ies iep cxp.cxp_failedTx chs.chs_rxq
+                ch_rxf_aux ~ansv vcons chs ies iep chp.chp_notifyTx chs.chs_rxq
                     chs.chs_txq seqno more data
             | Cf_exnopt.X Not_found ->
-                ie_harness chp.chp_profile#ans#limits >>= fun (iep, ief) ->
-                cxp.cxp_rspTx#put (M_clt_ans (ansno, ief)) >>= fun () ->
+                chp.chp_cxAns ansno >>= fun (iep, cxrsp) ->
+                cxp.cxp_rspTx#put cxrsp >>= fun () ->
                 let ansv = ansno, cxp, iem in
                 let vcons = ch_mrx_vcons_ans cxp ansno iem iep in
-                ch_rxf_aux ~ansv vcons chs ies0 iep cxp.cxp_failedTx
+                ch_rxf_aux ~ansv vcons chs ies0 iep chp.chp_notifyTx
                     chs.chs_rxq chs.chs_txq seqno more data
             | Cf_exnopt.X _ as v ->
                 assert (not true);
                 Cf_cmonad.return v
           end
         | _ ->
-            ch_failed chs chp (X_message_in_progress header)
+            ch_errout chs chp (X_message_in_progress header)
 
 let ch_rxf_nul_aux chs chp cxp rxq iem header =
     if N32_map.empty iem then begin
         Cf_cmonad.return (Cf_exnopt.U (0, { chs with chs_rxq = rxq }))
     end
     else
-        ch_failed chs chp (X_message_in_progress header)
+        ch_errout chs chp (X_message_in_progress header)
 
 let ch_rxf_nul chs chp header =
     match ch_rxf_header chs chp header with
     | Some error ->
-        ch_failed chs chp error
+        ch_errout chs chp error
     | None ->
         match chs.chs_rxv with
         | S_mrx_new -> begin
             | Some (cxp, rxq) when cx_msgno_check header cxp ->
                 ch_rxf_nul_aux chs chp cxp rxq N32_map.nil header
             | _ ->
-                ch_failed chs chp (X_response_unexpected header)
+                ch_errout chs chp (X_response_unexpected header)
           end
         | S_mrx_ans (cxp, iem) when cx_msgno_check header cxp ->
             ch_rxf_nul_aux chs chp cxp chs.chs_rxq iem header
         | _ ->
-            ch_failed chs chp (X_message_in_progress header)
+            ch_errout chs chp (X_message_in_progress header)
 
 let ch_rxf chs chp = function
     | `MSG (header, more, data) ->
         ch_rxf_rsp1 crxrsp1_rpy chs chp header more data
     | `ERR (header, more, data) ->
         ch_rxf_rsp1 crxrsp1_err chs chp header more data
-    | `ANS (ansno, header, more, data) ->
+    | `ANS (header, more, data, ansno) ->
         ch_rxf_ans chs chp ansno header more data
     | `NUL header ->
         ch_rxf_nul chs chp header
 
-let ch_rxf_msg_guard chs ies sxp cont =
+let ch_rxw_msg_guard chs ies sxp cont =
     ie_flow_guard ies sxp.sxp_req begin fun (ack, iesopt) ->
         let rxv =
             match iesopt with
         cont (ack, { chs with chs_rxv = rxv })
     end
 
-let ch_rxf_rsp1_guard frsp chs ies cxp iep cont =
+let ch_rxw_rsp1_guard frsp chs ies cxp iep cont =
     ie_flow_guard ies iep begin fun (ack, iesopt) ->
         let rxv =
             match iesopt with
                 assert (ies.ies_more == Iom_stream.More);
                 let vcons =
                     match frsp with
-                    | F_rsp1_rpy (vcons, _, _)
-                    | F_rsp1_err (vcons, _, _) ->
+                    | F_rsp1_rpy (vcons, _)
+                    | F_rsp1_err (vcons, _) ->
                         vcons
                 in
                 vcons cxp iep ies
         cont (ack, { chs with chs_rxv = rxv })
     end
 
-let ch_rxf_ans1_guard chs cxp iem cont (ansno, (iep, ies)) =
+let ch_rxw_ans1_guard chs cxp iem cont (ansno, (iep, ies)) =
     ie_flow_guard ies iep begin fun (ack, iesopt) ->
         let iem =
             match iesopt with
         cont (ack, { chs with chs_rxv = S_mrx_ans (cxp, iem) })
     end
 
-let ch_rxf_ans_guard chs cxp iem cont =
+let ch_rxw_ans_guard chs cxp iem cont =
     let z = N32_map.to_seq_incr iem in
-    let z = Cf_seq.map (ch_rxf_ans1_guard chs cxp iem cont) z in
+    let z = Cf_seq.map (ch_rxw_ans1_guard chs cxp iem cont) z in
     Cf_seq.C.sequence z
 
-let ch_rxf_guard chs cont =
+let ch_rxw_guard chs cont =
     match chs.chs_rxv with
     | S_mrx_new ->
         Cf_cmonad.nil
     | S_mrx_msg (sxp, ies) ->
-        ch_rxf_msg_guard chs ies sxp cont
+        ch_rxw_msg_guard chs ies sxp cont
     | S_mrx_rpy (cxp, iep, ies) ->
-        ch_rxf_rsp1_guard crxrsp1_rpy chs ies cxp iep cont
+        ch_rxw_rsp1_guard crxrsp1_rpy chs ies cxp iep cont
     | S_mrx_err (cxp, iep, ies) ->
-        ch_rxf_rsp1_guard crxrsp1_err chs ies cxp iep cont
+        ch_rxw_rsp1_guard crxrsp1_err chs ies cxp iep cont
     | S_mrx_ans (cxp, iem) ->
-        ch_rxf_ans_guard chs cxp iem cont
+        ch_rxw_ans_guard chs cxp iem cont
 
 let ch_txf_sxrsp_guard chs chp cont =
     match Cf_deque.B.pop chs.chs_txq with
 
 let ch_txf_cxmsg_guard chs chp cont =
     chp.chp_exchangeRx#get begin fun cxp ->
-        let txv = S_mtx_msg (cxp, oes0) in
-        if
+        assert begin
             Cf_deque.predicate begin fun p ->
                 Int32.compare p.cxp_msgno cxp.cxp_msgno <> 0
             end chs.chs_rxq
-        then begin
-            let rxq = Cf_deque.A.push cxp chs.chs_rxq in
-            cont (0, [], { chs with chs_rxq = rxq; chs_txv = txv })
-        end
-        else begin
-            errj#error "Beep_core: client exchange msgno still in progress!";
-            cont (0, [], chs)
-        end
+        end;
+        let txv =
+            match cxp.cxp_greeting with
+            | true -> S_mtx_new
+            | false -> S_mtx_msg (cxp, oes0)
+        in
+        let rxq = Cf_deque.A.push cxp chs.chs_rxq in
+        cont (0, [], { chs with chs_rxq = rxq; chs_txv = txv })
     end
 
 let ch_txf_oe_aux chs chp msgno vcons fcons cont (oesopt, lmpopt) =
                     let seqno =
                         Int32.add chs.chs_txseqno (Int32.of_int length)
                     in
-                    let ans = `ANS (ansno, header, more, payload) in
+                    let ans = `ANS (header, more, payload, ansno) in
                     let chs = {
                         chs with
                         chs_txseqno = seqno;
 let ch_txf_nul_guard chs chp sxp oem oeq cont =
     sxp.sxp_rspRx#get begin function
     | M_srv_ans (ansno, oep) ->
-        if N32_map.member ansno oem then begin
-            errj#error "Beep_core: service response ansno still in progress!";
-            cont (0, [], chs)
-        end
-        else begin
-            let oem = N32_map.replace (ansno, (oep, oes0)) N32_map.nil in
-            let oeq = Cf_deque.A.push (ansno, oep) Cf_deque.nil in
-            let txv = S_mtx_ans (sxp, oem, oeq, Iom_stream.More) in
-            cont (0, [], { chs with chs_txv = txv })
-        end
-    | M_srv_nul ->
+        assert (not (N32_map.member ansno oem));
+        let oem = N32_map.replace (ansno, (oep, oes0)) N32_map.nil in
+        let oeq = Cf_deque.A.push (ansno, oep) Cf_deque.nil in
+        let txv = S_mtx_ans (sxp, oem, oeq, Iom_stream.More) in
+        cont (0, [], { chs with chs_txv = txv })
+    | rsp ->
+        assert (rsp = M_srv_nul);
         if N32_map.empty oem then begin
             let header = {
                 channo = chp.chp_channo;
             let txv = S_mtx_ans (sxp, oem, oeq, Iom_stream.Last) in
             cont (0, [], { chs with chs_txv = txv })
         end
-    | _ ->
-        errj#error "Beep_core: service response type invalid!";
-        cont (0, [], chs)
     end
 
 let ch_txf_guard window chs chp cont =
                         msgno = sxp.sxp_msgno;
                     } in
                     let seqno = Int32.add seqno (Int32.of_int length) in
-                    let frs = `ANS (ansno, header, more, payload) :: frs in
+                    let frs = `ANS (header, more, payload, ansno) :: frs in
                     frs, seqno
             in
             ch_txr_answer_aux frs oeq' seqno window chs chp sxp oem oeq rmore
         ch_txr_answer_aux [] Cf_deque.nil chs.chs_txseqno window chs chp sxp
             oem oeq rmore
 
-(*--- session management ---*)
-type scontrol = Iom_stream.stop
-type snotify = Iom_stream.failed
+class virtual ch0drv p = object(self)
+    inherit [tag] chdrv (Lazy.force p.p_ch0fix) as super
 
-type sfix = {
-    sp_controlTx: scontrol Iom_gadget.tx;
-    sp_notifyRx: snotify Iom_gadget.rx;
-}
+    method virtual private ch_stop: int32 -> unit Iom_gadget.t
+    method virtual private up_close: int32 -> int -> unit Iom_gadget.t
+    method virtual private up_ok: int32 -> unit Iom_gadget.t
+    method virtual private up_error: int32 -> int -> unit Iom_gadget.t
+    
+    method private cxgreeting cxf : tag cxstate = Cx (Cx_msgx, Cx_rsp0 cxf)
+    method private sxgreeting sxf : tag sxstate = Sx (Sx_msgx, Sx_rsp0 sxf)
+    
+    method private cx_greeting_core:
+        ('p chdrvstate -> 'p cxfix -> ('self * 'p cxdrv) Iom_gadget.t) ->
+        'self Iom_gadget.t = fun f ->
+        assert (core_.cds_closing = None);
+        let chf = Lazy.force p.p_ch0fix in
+        chf.chf_cxMsg 0l true >>= fun cxf ->
+        f { core_ with cds_msgno = 1l } cxf >>= fun (obj, cx) ->
+        cx#start >>= fun () ->
+        Cf_cmonad.return obj
+    
+    method private snotify = function
+        | M_ch0_stop channo -> self#ch_stop channo
+        | M_ch0_close (channo, code) -> self#up_close channo code
+        | M_ch0_ok channo -> self#up_ok channo
+        | M_ch0_error (channo, code) -> self#up_error channo code
+    
+    method private guard =
+        p.p_notifyRx#get self#snotify >>= fun () ->
+        super#guard
+end
 
-type spad = {
-    sp_controlRx: scontrol Iom_gadget.rx;
-    sp_notifyTx: snotify Iom_gadget.tx;
-}
+let ch0_load role cf =
+    let cf = (cf :> tag chform) in
+    Iom_gadget.simplex >>= fun (ch0Rx, ch0Tx) ->
+    let p = {
+        p_role = role;
+        p_ch0pad = lazy (assert false);
+        p_ch0fix = lazy (assert false);
+        p_notifyRx = ch0Rx;
+        p_notifyTx = ch0Tx;
+    } in
+    cf#core p 0l >>= fun (chp, chf) ->
+    Cf_cmonad.return {
+        p with
+        p_ch0pad = Lazy.lazy_from_val chp;
+        p_ch0fix = Lazy.lazy_from_val chf;
+    }
 
-type sstate = S_ss_unimplemented
+let ch0_pad p = Lazy.force p.p_ch0pad
+let ch0_role p = p.p_role
 
-let ss0 = S_ss_unimplemented
+let ch0_initial p =
+    let chp = Lazy.force p.p_ch0pad in
+    chp.chp_sxMsg 0l true >>= fun sxp ->
+    let txq = Cf_deque.A.push sxp Cf_deque.nil in
+    Cf_cmonad.return { ch_state0 with chs_txq = txq }
 
 (*--- End of File [ beep_core.ml ] ---*)

File beep/beep_core.mli

 (*---------------------------------------------------------------------------*
   INTERFACE  beep_core.mli
 
-  Copyright (c) 2003-2007, James H. Woodyatt
+  Copyright (c) 2003-2008, James H. Woodyatt
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
 class localizer:
     string list ->
     object
-    	method lang: string option
-    	method reply_code: int -> string
+        method greeting_lang: string list
+    	method reply_lang: string option
+    	method reply_content: int -> string
     end
 
 (** {5 Frames} *)
 type msg_frame = [ `MSG of header * Iom_stream.more * Cf_message.t ]
 type rpy_frame = [ `RPY of header * Iom_stream.more * Cf_message.t ]
 type err_frame = [ `ERR of header * Iom_stream.more * Cf_message.t ]
-type ans_frame = [ `ANS of int32 * header * Iom_stream.more * Cf_message.t ]
+type ans_frame = [ `ANS of header * Iom_stream.more * Cf_message.t * int32 ]
 type nul_frame = [ `NUL of header ]
 
 type response_frame = [ rpy_frame | err_frame | ans_frame | nul_frame ]
 
 exception Error of error
 
-(** {5 Message Exchange} *)
+(** {5 Default MIME Parameters} *)
+val default_xml_mime_ct: Mime_entity.ct
+val default_xml_mime_emit_adapter: Mime_entity.basic_emit_adapter
+val default_limits: Iom_octet_stream.limits
 
-val default_mime_ct: Mime_entity.ct
-val default_mime_cte: Mime_entity.cte
-val default_mime_emit_adapter: Mime_entity.basic_emit_adapter Lazy.t
-val default_mime_headers: Cf_message.t Lazy.t
+(** {5 Channel I/O} *)
+type iepad and iefix
+type oepad and oefix
 
-(** {5 Channel Profiles} *)
-class eprofile: Iom_octet_stream.limits -> object
-    val limits_: Iom_octet_stream.limits
-    method limits: Iom_octet_stream.limits
+class eform: limits:Iom_octet_stream.limits -> object
+    method icore: (iepad * iefix) Iom_gadget.t
+    method ocore: (oepad * oefix) Iom_gadget.t
 end
 
-class ['msg, 'rpy, 'err, 'ans] chprofile:
-    msg:'msg -> rpy:'rpy -> err:'err -> ans:'ans ->
+type pad
+type chpad and 'p chfix
+type 'p cxstate and 'p sxstate
+type 'p cxfix and 'p sxfix
+type 'p chdrvstate
+
+class ['p] chform:
+    msg:#eform -> rpy:#eform -> err:#eform -> ans:#eform ->
     object
-        constraint 'msg = #eprofile
-        constraint 'rpy = #eprofile
-        constraint 'err = #eprofile
-        constraint 'ans = #eprofile
-
-        val msg_: 'msg
-        val rpy_: 'rpy
-        val err_: 'err
-        val ans_: 'ans
-
-        method msg: 'msg
-        method rpy: 'rpy
-        method err: 'err
-        method ans: 'ans
+        method core: pad -> int32 -> (chpad * 'p chfix) Iom_gadget.t
     end
 
-(** {5 Entity I/O} *)
+class type ['state] mxdrv = object
+    inherit Iom_gadget.next
+    inherit Iom_gadget.start
+    
+    val core_: 'state
+    method core: 'state
+    
+    method private abort: 'a. 'a Iom_gadget.t
+    method private complete: 'a.'a Iom_gadget.t
+    method private continue: 'a. 'state -> 'a Iom_gadget.t
+    method private guard: unit Iom_gadget.guard
+end
 
-type iefix = {
-  ief_flowTx : Iom_stream.readywait Iom_gadget.tx;
-  ief_eventRx : Mime_stream.entity_scan_event Iom_gadget.rx;
-}
+class ['p] cxdrv: ?cx:'p cxstate -> 'p cxfix -> object
+    inherit ['p cxstate] mxdrv
+        
+    method private failed: 'a. int -> exn -> 'a Iom_gadget.t
+    
+    method private msg_flow:
+        'p cxstate -> Iom_stream.readywait -> unit Iom_gadget.t
+    
+    method private msg_octets:
+        ?h:Mime_entity.basic_emit_adapter -> Iom_octet_stream.fragment ->
+        'p cxstate -> 'p cxstate Iom_gadget.t
+            
+    method private rpy_octets:
+        Mime_entity.basic_scan_adapter -> Iom_octet_stream.fragment ->
+        Iom_stream.readywait Iom_gadget.tx -> 'p cxstate -> unit Iom_gadget.t
+    
+    method private err_octets:
+        Mime_entity.basic_scan_adapter -> Iom_octet_stream.fragment ->
+        Iom_stream.readywait Iom_gadget.tx -> 'p cxstate -> unit Iom_gadget.t
+    
+    method private ans_octets:
+        int32 -> Mime_entity.basic_scan_adapter ->
+        Iom_octet_stream.fragment -> Iom_stream.readywait Iom_gadget.tx ->
+        'p cxstate -> unit Iom_gadget.t
+end
 
-type iepad and oestate
+class ['p] sxdrv: ?sx:'p sxstate -> 'p sxfix -> object
+    inherit ['p sxstate] mxdrv
+    
+    method private msg_octets:
+        Mime_entity.basic_scan_adapter -> Iom_octet_stream.fragment ->
+        Iom_stream.readywait Iom_gadget.tx -> 'p sxstate -> unit Iom_gadget.t
 
-type oefix = {
-  oef_flowRx : Iom_stream.readywait Iom_gadget.rx;
-  oef_eventTx : Mime_stream.entity_emit_event Iom_gadget.tx;
-}
+    method private rsp1_flow:
+        'p sxstate -> Iom_stream.readywait -> unit Iom_gadget.t
+    
+    method private ans_flow:
+        'p sxstate -> int32 -> Iom_stream.readywait -> unit Iom_gadget.t
 
-type oepad and iestate
+    method private rpy_octets:
+        ?h:Mime_entity.basic_emit_adapter -> Iom_octet_stream.fragment ->
+        'p sxstate -> 'p sxstate Iom_gadget.t
+        
+    method private err_octets:
+        ?h:Mime_entity.basic_emit_adapter -> Iom_octet_stream.fragment ->
+        'p sxstate -> 'p sxstate Iom_gadget.t
+    
+    method private ans_octets_start:
+        ?n:int32 -> ?h:Mime_entity.basic_emit_adapter ->
+        Iom_octet_stream.fragment -> 'p sxstate ->
+        ('p sxstate * int32) Iom_gadget.t
+    
+    method private ans_octets_append:
+        n:int32 -> Iom_octet_stream.fragment -> 'p sxstate ->
+        'p sxstate Iom_gadget.t
+end
 
-(** {5 Message Exchange} *)
+class ['p] chdrv: 'p chfix -> object('self)
+    inherit Iom_gadget.start
+    inherit Iom_gadget.next
+    
+    val core_: 'p chdrvstate
+    method core: 'p chdrvstate
+    
+    method private abort: 'a. 'a Iom_gadget.t
+    method private failed: int -> exn -> unit Iom_gadget.t
+    method private closing: 'p chdrvstate Iom_gadget.t
+    
+    method private close_msg: int -> unit Iom_gadget.t
+    method private close_ok: unit Iom_gadget.t
+    method private close_error: int -> unit Iom_gadget.t
+    
+    method private cx_start:
+        ?n:int32 -> ('p chdrvstate -> 'p cxfix ->
+        ('self * 'p cxdrv) Iom_gadget.t) ->'self Iom_gadget.t
+    
+    method private sx_start:
+        'p sxfix -> ('self * 'p sxdrv) Iom_gadget.t
+    
+    method private guard: unit Iom_gadget.guard
+end
 
-type cxrsp = private
-    | M_clt_rpy of iefix
-    | M_clt_err of iefix
-    | M_clt_ans of int32 * iefix
-    | M_clt_nul
+(** {5 Transport I/O} *)
+type role = R_initiator | R_listener
 
-type cxfix = private {
-    cxf_msgno : int32;
-    cxf_req : oefix;
-    cxf_rspRx : cxrsp Iom_gadget.rx;
-    cxf_failedRx : Iom_stream.failed Iom_gadget.rx;
-}
+type chstate
 
-type cxpad
+val ch_state0: chstate
 
-val cx_harness:
-    ('msg, 'rpy, 'err, 'ans) #chprofile -> int32 ->
-    (cxpad * cxfix) Iom_gadget.t
+val ch_rxseqno: chstate -> int32
 
-type sxrsp = private (* need functions for composing these *)
-    | M_srv_rpy of oepad
-    | M_srv_err of oepad
-    | M_srv_ans of int32 * oepad
-    | M_srv_nul
+val ch_channo: chpad -> int32
 
-type sxfix = private {
-    sxf_msgno : int32;
-    sxf_req : iefix;
-    sxf_rspTx : sxrsp Iom_gadget.tx;
-    sxf_failedRx : Iom_stream.failed Iom_gadget.rx;
-}
+val ch_failed: chstate -> chpad -> int -> exn -> unit Iom_gadget.t
 
-type sxpad
-
-(** {5 Channel Driver} *)
-
-type chnotify = Iom_stream.failed
-
-type chprofile_basic = (eprofile, eprofile, eprofile, eprofile) chprofile
-
-type chfix = private {
-    chf_notifyRx : chnotify Iom_gadget.rx;
-    chf_exchangeRx : sxfix Iom_gadget.rx;
-    chf_exchangeTx : cxpad Iom_gadget.tx;
-}
-
-type chpad = private {
-    chp_channo: int32;
-    chp_profile: chprofile_basic;
-    chp_notifyTx: chnotify Iom_gadget.tx;
-    chp_exchangeRx: cxpad Iom_gadget.rx;
-    chp_exchangeTx: sxfix Iom_gadget.tx;
-}
-
-type chsrxvariant and chstxvariant
-
-type chstate = {
-    chs_rxseqno: int32;
-    chs_rxv: chsrxvariant;
-    chs_rxq: cxpad Cf_deque.t;
-    chs_txseqno: int32;
-    chs_txv: chstxvariant;
-    chs_txq: sxpad Cf_deque.t;
-}
-
-val chs0: chstate
-
-val ch_harness:
-    int32 -> ('msg, 'rpy, 'err, 'ans) #chprofile ->
-    (chpad * chfix) Iom_gadget.t
-
-val ch_failed: chstate -> chpad -> error -> 'a Cf_exnopt.t Iom_gadget.t
-
-val ch_rxf_guard:
+val ch_rxw_guard:
     chstate -> (int * chstate -> unit Iom_gadget.t) -> unit Iom_gadget.guard
 
 val ch_txf_guard:
     unit Iom_gadget.guard
     
 val ch_rxf:
-    chstate -> chpad -> [< frame ] -> (int * chstate) Cf_exnopt.t Iom_gadget.t
+    chstate -> chpad -> [< frame ] ->
+    (int * chstate) Cf_exnopt.t Iom_gadget.t
 
 val ch_txr:
-    int -> chstate -> chpad -> (int * [> frame ] list * chstate) Iom_gadget.t
+    int -> chstate -> chpad ->
+    (int * [> frame ] list * chstate) Iom_gadget.t
 
-(** {5 Session Management} *)
-type scontrol = Iom_stream.stop
-type snotify = Iom_stream.failed
+type tag
 
-type sfix = {
-    sp_controlTx: scontrol Iom_gadget.tx;
-    sp_notifyRx: snotify Iom_gadget.rx;
-}
+class virtual ch0drv: pad -> object
+    inherit [tag] chdrv
 
-type spad = {
-    sp_controlRx: scontrol Iom_gadget.rx;
-    sp_notifyTx: snotify Iom_gadget.tx;
-}
+    method virtual private ch_stop: int32 -> unit Iom_gadget.t
 
-type sstate
+    method virtual private up_close: int32 -> int -> unit Iom_gadget.t
+    method virtual private up_ok: int32 -> unit Iom_gadget.t
+    method virtual private up_error: int32 -> int -> unit Iom_gadget.t
+    
+    method private cxgreeting: tag cxfix -> tag cxstate
+    method private sxgreeting: tag sxfix -> tag sxstate
+end
 
-val ss0: sstate
+val ch0_load: role -> tag #chform -> pad Iom_gadget.t
+val ch0_pad: pad -> chpad
+val ch0_role: pad -> role
+val ch0_initial: pad -> chstate Iom_gadget.t
 
 (*--- End of File [ beep_core.mli ] ---*)

File beep/beep_serial.ml

 
 (**)
 let errj = Cf_journal.stderr
+let _ = errj#setlimit `None
 (**)
 
-(*
-module IOg = Iom_gadget
-module IOs = Iom_stream
-module IO8 = Iom_octet_stream
-module IOn = Iom_sock_stream
-
-module C = Beep_core
-*)
-
-type seq_header = {
+type seq = {
     seq_channo: int32;
     seq_ackno: int32;
     seq_window: int32;
 }
 
-type seq = [ `SEQ of seq_header ]
+type error =
+    | X_seq_no_channel of seq
+    | X_seq_unexpected of seq
 
-type frame = [ Beep_core.frame | seq ]
+exception Error of error
 
 let parse_seq_frame =
     let bad_syntax = Beep_core.Error Beep_core.X_frame_bad_syntax in
                     if Int32.compare win int32_of_maxint > 0 then
                         raise too_large;
                     let m = Cf_message.shift ~pos:n m in
-                    let seq = `SEQ {
+                    let seq = {
                         seq_channo = ch;
                         seq_ackno = seq;
                         seq_window = win;
                     raise bad_syntax
             end
 
-let parse_frame ?limit m =
-    try
-        Beep_core.parse_frame ?limit m
-    with
-    | Beep_core.Error _ ->
-        parse_seq_frame m
+let parse_frames =
+    let rec loop ~seq ~data ?limit m =
+        match
+            try
+                let frame, m = Beep_core.parse_frame ?limit m in
+                let data = frame :: data in
+                Cf_exnopt.U (seq, data, m)
+            with
+            | End_of_file as x ->
+                Cf_exnopt.X x
+            | Beep_core.Error _ ->
+                try
+                    let frame, m = parse_seq_frame m in
+                    let seq = Cf_deque.A.push frame seq in
+                    Cf_exnopt.U (seq, data, m)
+                with
+                | End_of_file as x ->
+                    Cf_exnopt.X x
+                | Beep_core.Error _ as x ->
+                    Cf_exnopt.X x
+        with
+        | Cf_exnopt.U (seq, data, m) ->
+            loop ~seq ~data m
+        | Cf_exnopt.X End_of_file ->
+            seq, List.rev data, m, None
+        | Cf_exnopt.X x ->
+            seq, List.rev data, m, Some x
+    in
+    loop ~seq:Cf_deque.nil ~data:[]
+
+let channo_of_frame = function
+    | `MSG (h, _, _)
+    | `RPY (h, _, _)
+    | `ERR (h, _, _)
+    | `ANS (_, h, _, _)
+    | `NUL h ->
+        h
 
 (*
 let buffer_add_seq_frame b (`SEQ header) =
 module N32_map = Beep_core.N32_map
 module Int_map = Cf_rbtree.Map(Cf_ordered.Int_order)
 
-type cpad = {
-    cp_core: Beep_core.chpad;
-    cp_priority: int;
-    cp_rxwinlow: int;
-    cp_rxwinhigh: int;
+type ctraffic = {
+    ct_channo: int32;
+    ct_rxwinlow: int;
+    ct_rxwinhigh: int;
 }
 
+let mwindow0 = 4096 (* RFC 3081, section 3.1.1 *)
+let maxsupportedwindow = Int32.of_int max_int
+
 type cstate = {
-    cs_core: Beep_core.chstate;
+    cs_chs: Beep_core.chstate;
+    cs_chp: Beep_core.chpad;
+    cs_rxackno: int32;
     cs_rxwindow: int;
+    cs_txackno: int32;
     cs_txwindow: int;
 }
 
-let mwindow0 = 4096 (* RFC 3081, section 3.1.1 *)
-
-let cstate0 = {
-    cs_core = Beep_core.chs0;
+let cstate0 chp chs = {
+    cs_chs = chs;
+    cs_chp = chp;
+    cs_rxackno = 0l;
     cs_rxwindow = mwindow0;
+    cs_txackno = 0l;
     cs_txwindow = mwindow0;
 }
 
     }
 
 type sstate = {
-    ss_core: Beep_core.sstate;
-    ss_s8rxfin: bool;
-    ss_s8txblock: int option;
+    ss_rxbuffer: Cf_message.substring Cf_deque.t;
+    ss_s8txblock: (int * seq Cf_deque.t) option;
     ss_csm: cstate N32_map.t;
-    ss_cpqm: cpad Cf_deque.t Int_map.t;
+    ss_ctqm: ctraffic Cf_deque.t Int_map.t;
 }
 
-let ss0 = {
-    ss_core = Beep_core.ss0;
-    ss_s8rxfin = false;
-    ss_s8txblock = Some 0;
-    ss_csm = N32_map.nil;
-    ss_cpqm = Int_map.nil;