Commits

jhwoodyatt  committed 94f6773

Initial submit of OCaml NAE BEEP Core.

  • Participants
  • Parent commits a36b522

Comments (0)

Files changed (4)

File beep/beep_transport.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  beep_transport.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(*
+module type X_tag = sig
+    val tag: string
+end
+
+module X_create(T: X_tag) = struct
+    let tag = Printf.sprintf "[%s] " T.tag
+
+    let printf fmt =
+        print_string tag;
+        Printf.printf fmt
+    
+    let sprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
+end
+
+module X = X_create(struct let tag = "Beep_transport" end)
+*)
+
+(**)
+module J = Cf_journal
+(**)
+
+open Iom_reactor
+open Cf_cmonad.Op
+
+module Err = Beep_error
+module F = Beep_frame
+module E = Beep_exchange
+module C = Beep_channel
+module EE = Beep_entity_exchange
+module E_xml = Beep_xml_exchange
+
+module N32_map = Cf_rbtree.Map(Int32)
+module N32_set = Cf_rbtree.Set(Int32)
+
+type reply1_t = [
+    | Beep_frame.phandle_t Beep_exchange.rpy_t
+    | Beep_frame.phandle_t Beep_exchange.err_t
+]
+
+type w_rx_progress_t = W_rx_p_pending | W_rx_p_in_progress
+
+type w_rx1_state_t =
+    | W_rx1_error of Err.t
+    | W_rx1_payload of F.payload_t
+
+type w_rx_state_t =
+    | W_rx_msg of F.phandle_t tx
+    | W_rx_rsp0 of F.phandle_t E.response_t tx
+    | W_rx_rsp1 of F.phandle_t tx * (F.t -> w_rx1_state_t)
+    | W_rx_rspN of F.phandle_t E.answer_t tx * F.phandle_t tx N32_map.t
+
+type w_tx_ans_state_t =
+    | W_tx_ans_more of int32 * F.phandle_t E.answer_t rx
+    | W_tx_ans_last of (unit, unit) t
+
+type w_tx_state_t =
+    | W_tx_msg of int32 * F.phandle_t rx * F.phandle_t E.response_t tx option
+    | W_tx_rsp0 of
+        int32 * F.phandle_t E.response_t rx * (unit, unit) t option
+    | W_tx_rsp1 of
+        int32 * F.phandle_t rx * (F.header_t -> F.payload_t -> F.t) *
+        (unit, unit) t option
+    | W_tx_rspN of
+        int32 * F.phandle_t rx N32_map.t * w_tx_ans_state_t *
+        (unit, unit) t option
+
+type ch_state_t =
+    | Ch_online
+    | Ch_close_rx
+    | Ch_close_tx of C.M_close.t option
+    | Ch_releasing
+    | Ch_finalizing
+
+type channel_t = {
+    c_state_: ch_state_t;
+    c_rx_seqno_: int32;
+    c_tx_seqno_: int32;
+    c_msgno_: int32;
+    c_rxQ_: (int32 * w_rx_progress_t * w_rx_state_t) Cf_deque.t;
+    c_txQ_: w_tx_state_t Cf_deque.t;
+}
+
+let channel0_ = {
+    c_state_ = Ch_online;
+    c_rx_seqno_ = Int32.zero;
+    c_tx_seqno_ = Int32.zero;
+    c_msgno_ = Int32.zero;
+    c_rxQ_ = Cf_deque.nil;
+    c_txQ_ = Cf_deque.nil;
+}
+
+type frame_rx_t =
+    | F_rx_data of Beep_frame.fhandle_t
+    | F_rx_final
+    | F_rx_error of exn
+
+type frame_tx_t =
+    | F_tx_data of Beep_frame.fhandle_t
+    | F_tx_final
+    | F_tx_abort
+
+type up_event_t =
+    | Up_close of int32 * C.M_close.t
+    | Up_error of int32 * C.M_error.t
+    | Up_sync of int32
+    | Up_final of int32
+    | Up_failed of int32 * exn
+    | Up_reset of int32
+
+type dn_event_t =
+    | Dn_close of C.M_close.t
+    | Dn_error of C.M_error.t
+    | Dn_release
+    | Dn_final
+    | Dn_abort
+
+let ch_state_ s = modify (fun c -> { c with c_state_ = s })
+
+let succ31_ n =
+    if n = Int32.max_int then Int32.zero else Int32.succ n
+
+(*
+let chCounter_ = ref 0
+*)
+
+let channel_:
+    ?c:channel_t -> int32 -> (frame_rx_t #rx * frame_tx_t #tx) ->
+    C.control_duplex_t -> (dn_event_t #rx * up_event_t #tx) -> ('s, unit) t
+= fun ?c n (frameRx, frameTx) (channelRx, channelTx) (dnRx, upTx) ->
+    (*
+    incr chCounter_;
+    let module X_input = struct
+        let tag = Printf.sprintf "Beep_transport.channel_ (%u) n=%lu"
+            !chCounter_ n
+    end in
+    let module X = X_create(X_input) in
+    *)
+
+    (*--- Normalize parameters ---*)
+    let frameRx = (frameRx :> frame_rx_t rx) in
+    let frameTx = (frameTx :> frame_tx_t tx) in
+    let channelRx = (channelRx :> C.control_tx_t rx) in
+    let channelTx = (channelTx :> C.control_rx_t tx) in
+    let dnRx = (dnRx :> dn_event_t rx) in
+    let upTx = (upTx :> up_event_t tx) in
+    
+    (*--- Main loop ---*)
+    let rec loop () =
+        load >>= fun c ->
+        guard begin
+            frameRx#get do_frameRx_ >>= fun () ->
+            gate_txQ_ c >>= fun () ->
+            channelRx#get do_channelRx_ >>= fun () ->
+            dnRx#get do_dnRx_
+        end
+    
+    (*--- Gate on the head of the transmit queue ---*)
+    and gate_txQ_ c =
+        match Cf_deque.B.pop c.c_txQ_ with
+        | None ->
+            Cf_cmonad.return ()
+        | Some (wtx, txQ) ->
+            match wtx with
+            | W_tx_msg (msgno, phRx, rspTxOpt) ->
+                get_txQ_msg_ c txQ msgno phRx rspTxOpt
+            | W_tx_rsp0 (msgno, rspRx, closeAck) ->
+                get_txQ_rsp0_ c txQ msgno rspRx closeAck
+            | W_tx_rsp1 (msgno, phRx, consF, closeAck) ->
+                get_txQ_rsp1_ c txQ msgno phRx consF closeAck
+            | W_tx_rspN (msgno, phRxMap, contOpt, closeAck) ->
+                let mapF (ansno, phRx) =
+                    get_txQ_rspN_cont_
+                      c txQ msgno phRxMap contOpt ansno phRx closeAck
+                in
+                let z = N32_map.to_seq_incr phRxMap in
+                let z = Cf_seq.map mapF z in
+                Cf_seq.C.sequence z >>= fun () ->
+                match contOpt with
+                | W_tx_ans_last _ ->
+                    Cf_cmonad.return ()
+                | W_tx_ans_more (ansno, ansOptRx) ->
+                    get_txQ_rspN_next_
+                      c txQ msgno phRxMap contOpt ansno ansOptRx closeAck
+    
+    (*--- Process a frame event ---*)
+    and do_frameRx_ = function
+        | F_rx_data fh ->
+            do_frameRx_data_ fh
+        | F_rx_final ->
+            do_frameRx_final_ ()
+        | F_rx_error x ->
+            channelTx#put C.C_rx_abort >>= fun () ->
+            upTx#put (Up_failed (n, x))
+            (* done processing *)
+    
+    (*--- Receive frame event: F_rx_data ---*)
+    and do_frameRx_data_ fh =
+        load >>= fun c ->
+        let frame = fh.F.h_data in
+        let header = F.header frame in
+        match c.c_state_ with
+        | Ch_finalizing ->
+            do_beep_error_ Err.X_channel_closed
+        | _ when header.F.channel <> n ->
+            do_beep_error_ Err.X_channel_closed
+        | _ ->
+            let seqno = header.F.seqno in
+            if seqno = c.c_rx_seqno_ then begin
+                let msgno = header.F.msgno in
+                let seqno = Int32.add seqno (Int32.of_int (F.size frame)) in
+                let c = { c with c_rx_seqno_ = seqno } in
+                match frame with
+                | `MSG (header, payload) ->
+                    let more, _ = payload in
+                    let ph = { F.h_data = payload; F.h_ack = fh.F.h_ack } in
+                    do_frameRx_data_msg_ c msgno more ph
+                | _ ->
+                    do_frameRx_data_rsp_ c msgno fh
+            end
+            else
+                do_beep_error_ Err.X_out_of_sequence
+    
+    (*--- Receive data frame: `MSG frame ---*)
+    and do_frameRx_data_msg_ c msgno more ph =
+        match Cf_deque.B.pop c.c_rxQ_ with
+        | None
+        | Some ((_, _, W_rx_rsp0 _), _) ->
+            do_frameRx_data_msgReady_ c msgno more ph
+        | Some ((msgno', _, W_rx_msg phTx), rxQ) when msgno = msgno' ->
+            do_frameRx_data_msgMore_ c msgno more ph phTx rxQ
+        | _ ->
+            do_beep_error_ Err.X_incomplete_entity
+    
+    (*--- Receive data frame: `MSG frame (first in entity) ---*)
+    and do_frameRx_data_msgReady_ c msgno more ph =
+        duplex >>= fun ((phRx, rspTx), (rspRx, phTx)) ->
+        let exch = { E.request = `MSG phRx; E.response = rspTx } in
+        channelTx#put (C.C_rx_exchange exch) >>= fun () ->
+        let rxQ =
+            match more with
+            | F.Last ->
+                c.c_rxQ_
+            | F.More ->
+                let wrx = msgno, W_rx_p_in_progress, W_rx_msg phTx in
+                Cf_deque.B.push wrx c.c_rxQ_
+        in
+        let wtx = W_tx_rsp0 (msgno, rspRx, None) in
+        let txQ = Cf_deque.A.push wtx c.c_txQ_ in
+        store { c with c_rxQ_ = rxQ; c_txQ_ = txQ } >>= fun () ->
+        let phTx = (phTx :> F.phandle_t tx) in
+        phTx#put ph >>= loop
+
+    (*--- Receive data frame: `MSG frame (not first in entity) ---*)
+    and do_frameRx_data_msgMore_ c msgno more ph phTx rxQ =
+        let rxQ =
+            match more with
+            | F.Last ->
+                c.c_rxQ_
+            | F.More ->
+                let wrx = msgno, W_rx_p_in_progress, W_rx_msg phTx in
+                Cf_deque.B.push wrx c.c_rxQ_
+        in
+        let c = { c with c_rxQ_ = rxQ } in
+        let phTx = (phTx :> F.phandle_t tx) in
+        phTx#put ph >>= fun () ->
+        store c >>= loop
+    
+    (*--- Receive data frame: first frame in response ---*)
+    and do_frameRx_data_rsp_ c msgno fh =
+        let { F.h_data = frame; F.h_ack = ack } = fh in
+        match Cf_deque.B.pop c.c_rxQ_ with
+        | None ->
+            do_beep_error_ Err.X_not_requested
+        | Some (wrx, rxQ) ->
+            match wrx with
+            | msgno', _, W_rx_rspN _ when msgno' <> msgno ->
+                do_beep_error_ Err.X_more_answers_expected
+            | msgno', _, _ when msgno' <> msgno ->
+                do_beep_error_ Err.X_incomplete_entity
+            | _, _, W_rx_msg _ ->
+                do_beep_error_ Err.X_incomplete_entity
+            | _, _, W_rx_rsp0 rspTx ->
+                do_frameRx_data_rsp0_ c ack rxQ rspTx frame
+            | _, _, W_rx_rsp1 (phTx, screenF) ->
+                do_frameRx_data_rsp1_ c ack rxQ phTx screenF msgno frame
+            | _, _, W_rx_rspN (ansTx, phTxMap) ->
+                do_frameRx_data_rspN_ c ack rxQ ansTx phTxMap frame
+    
+    (*--- Receive data frame: first *requested* frame in response ---*)
+    and do_frameRx_data_rsp0_ c ack rxQ rspTx = function
+        | `RPY (header, payload) ->
+            let consF phRx = `RPY phRx in
+            let screenF = function
+                | `RPY (_, p) -> W_rx1_payload p
+                | _ -> W_rx1_error Err.X_bad_frame_type
+            in
+            do_frameRx_data_rsp0_single_
+              c ack rxQ header payload consF screenF rspTx
+        | `ERR (header, payload) ->
+            let consF phRx = `ERR phRx in
+            let screenF = function
+                | `ERR (_, p) -> W_rx1_payload p
+                | _ -> W_rx1_error Err.X_bad_frame_type
+            in
+            do_frameRx_data_rsp0_single_
+              c ack rxQ header payload consF screenF rspTx
+        | `ANS (header, ansno, payload) ->
+            do_frameRx_data_rsp0_multi_ c ack rxQ header ansno payload rspTx
+        | `NUL header ->
+            store { c with c_rxQ_ = rxQ } >>= fun () ->
+            let rspTx = (rspTx :> [`NUL] tx) in
+            rspTx#put `NUL >>= loop
+        | `MSG (_, _) ->
+            assert (not true);
+            Cf_cmonad.return ()
+
+    (*--- Receive data frame: first *requested* RPY/ERR frame ---*)
+    and do_frameRx_data_rsp0_single_
+      c ack rxQ header payload consF screenF rspTx =
+        let rspTx = (rspTx :> reply1_t tx) in
+        let ph = { F.h_data = payload; F.h_ack = ack } in
+        let payload = () and ack = () in
+        simplex >>= fun (phRx, phTx) ->
+        let phTx = (phTx :> F.phandle_t tx) in
+        let rspTx = (rspTx :> reply1_t tx) in
+        let consF = (consF :> F.phandle_t rx -> reply1_t) in
+        rspTx#put (consF phRx) >>= fun () ->
+        let more, _ = ph.F.h_data in
+        let rxQ =
+            match more with
+            | F.Last ->
+                rxQ
+            | F.More ->
+                let wrxs = W_rx_rsp1 (phTx, screenF) in
+                let wrx = header.F.msgno, W_rx_p_in_progress, wrxs in
+                Cf_deque.B.push wrx rxQ
+        in
+        store { c with c_rxQ_ = rxQ } >>= fun () ->
+        phTx#put ph >>= loop
+
+    (*--- Receive data frame: first *requested* `RPY/`ERR frame ---*)
+    and do_frameRx_data_rsp0_multi_ c ack rxQ header ansno payload rspTx =
+        let ph = { F.h_data = payload; F.h_ack = ack } in
+        let rspTx = (rspTx :> F.phandle_t E.response_t tx) in
+        simplex >>= fun (phRx, phTx) ->
+        let phTx = (phTx :> F.phandle_t tx) in
+        let phTxMap = N32_map.replace (ansno, phTx) N32_map.nil in
+        simplex >>= fun (ansRx, ansTx) ->
+        let ansRx = (ansRx :> F.phandle_t E.answer_t rx) in
+        let ansTx = (ansTx :> F.phandle_t E.answer_t tx) in
+        rspTx#put (`ANS ansRx) >>= fun () ->
+        ansTx#put (E.Ans_entity phRx) >>= fun () ->
+        let msgno = header.F.msgno in
+        let wrx = W_rx_rspN (ansTx, phTxMap) in
+        let op = msgno, W_rx_p_in_progress, wrx in
+        let rxQ = Cf_deque.B.push op rxQ in
+        store { c with c_rxQ_ = rxQ } >>= fun () ->
+        phTx#put ph >>= loop
+    
+    (*--- Receive data frame: continued `ERR/`RPY frame in response ---*)
+    and do_frameRx_data_rsp1_ c ack rxQ phTx screenF msgno frame =
+        match screenF frame with
+        | W_rx1_error error ->
+            do_beep_error_ error
+        | W_rx1_payload payload ->
+            let rxQ =
+                match payload with
+                | F.Last, _ ->
+                    c.c_rxQ_
+                | F.More, _ ->
+                    let wrx = W_rx_rsp1 (phTx, screenF) in
+                    let op = msgno, W_rx_p_in_progress, wrx in
+                    Cf_deque.B.push op c.c_rxQ_
+            in
+            store { c with c_rxQ_ = rxQ } >>= fun () ->
+            let phTx = (phTx :> F.phandle_t tx) in
+            phTx#put { F.h_data = payload; F.h_ack = ack } >>= loop
+    
+    (*--- Receive data frame: first *requested* frame in response ---*)
+    and do_frameRx_data_rspN_ c ack rxQ ansTx phTxMap frame =
+        let ansTx = (ansTx :> F.phandle_t E.answer_t tx) in
+        match frame with
+        | `ANS (header, ansno, payload) ->
+            let msgno = header.F.msgno in
+            begin
+                match
+                    try Some (N32_map.search ansno phTxMap)
+                    with Not_found -> None
+                with
+                | Some phTx ->
+                    Cf_cmonad.return (phTxMap, phTx)
+                | None ->
+                    simplex >>= fun (phRx, phTx) ->
+                    ansTx#put (E.Ans_entity phRx) >>= fun () ->
+                    let phTxMap = N32_map.replace (ansno, phTx) phTxMap in
+                    Cf_cmonad.return (phTxMap, phTx)
+            end >>= fun (phTxMap, phTx) ->
+            let wrx = W_rx_rspN (ansTx, phTxMap) in
+            let op = msgno, W_rx_p_in_progress, wrx in
+            let rxQ = Cf_deque.B.push op c.c_rxQ_ in
+            store { c with c_rxQ_ = rxQ } >>= fun () ->
+            let phTx = (phTx :> F.phandle_t tx) in
+            phTx#put { F.h_data = payload; F.h_ack = ack } >>= loop
+        | `NUL header ->
+            if N32_map.empty phTxMap then begin
+                store c >>= fun () ->
+                let ack =
+                    match ack with
+                    | None -> Cf_cmonad.return ()
+                    | Some r -> r
+                in
+                ansTx#put (E.Ans_final ack) >>= loop
+            end
+            else
+                do_beep_error_ Err.X_incomplete_entity
+        | _ ->
+            do_beep_error_ Err.X_bad_frame_type
+
+    (*--- Receive frame event: F_rx_final ---*)
+    and do_frameRx_final_ () =
+        load >>= fun c ->
+        match c.c_state_ with
+        | Ch_finalizing ->
+            upTx#put (Up_final n)
+            (* done processing *)
+        | Ch_close_tx None ->
+            channelTx#put C.C_rx_release
+            (* done processing *)
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+
+    (*--- Get frame of MSG entity for transmit ---*)
+    and get_txQ_msg_ c txQ msgno phRx rspTxOpt =
+        let phRx = (phRx :> F.phandle_t rx) in
+        phRx#get begin fun ph ->
+            let more, body as payload = ph.F.h_data in
+            let size = Int32.of_int (Cf_message.length body) in
+            let header = {
+                F.channel = n;
+                F.msgno = msgno;
+                F.seqno = c.c_tx_seqno_;
+            } in
+            let fh = {
+                F.h_data = `MSG (header, payload);
+                F.h_ack = ph.F.h_ack;
+            } in
+            let rxQ =
+                match rspTxOpt with
+                | Some rspTx ->
+                    let wrx = msgno, W_rx_p_pending, W_rx_rsp0 rspTx in
+                    Cf_deque.A.push wrx c.c_rxQ_
+                | None ->
+                    c.c_rxQ_
+            in
+            let txQ =
+                match rspTxOpt, more with
+                | Some _, F.More ->
+                    Cf_deque.B.push (W_tx_msg (msgno, phRx, None)) txQ
+                | _, _ ->
+                    txQ
+            in
+            store {
+                c with
+                c_rxQ_ = rxQ;
+                c_txQ_ = txQ;
+                c_tx_seqno_ = Int32.add c.c_tx_seqno_ size;
+            } >>= fun () ->
+            frameTx#put (F_tx_data fh) >>= loop
+        end
+        
+    (*--- Get start of response to MSG for transmit ---*)
+    and get_txQ_rsp0_ c txQ msgno rspRx closeAck =
+        let rspRx = (rspRx :> F.phandle_t E.response_t rx) in
+        rspRx#get begin function
+            | `RPY phRx ->
+                let consF h p = `RPY (h, p) in
+                let op = W_tx_rsp1 (msgno, phRx, consF, closeAck) in
+                do_txQ_rsp0_aux_ c txQ op
+            | `ERR phRx ->
+                let consF h p = `ERR (h, p) in
+                let op = W_tx_rsp1 (msgno, phRx, consF, closeAck) in
+                do_txQ_rsp0_aux_ c txQ op
+            | `ANS ansRx ->
+                let ansRxOpt = W_tx_ans_more (Int32.zero, ansRx) in
+                let op = W_tx_rspN (msgno, N32_map.nil, ansRxOpt, closeAck) in
+                do_txQ_rsp0_aux_ c txQ op
+            | `NUL ->
+                do_txQ_nul_ { c with c_txQ_ = txQ } msgno closeAck
+        end
+    
+    (*--- Start message response ---*)
+    and do_txQ_rsp0_aux_ c txQ op =
+        let txQ = Cf_deque.B.push op txQ in
+        store begin
+            match c.c_state_ with
+            | Ch_close_tx (Some closeMsg) ->
+                let txQ = do_txQ_rsp0_auxTxQ_ (Up_close (n, closeMsg)) txQ in
+                { c with c_state_ = Ch_close_tx None; c_txQ_ = txQ }
+            | Ch_releasing ->
+                { c with c_txQ_ = do_txQ_rsp0_auxTxQ_ (Up_sync n) txQ }
+            | _ ->
+                { c with c_txQ_ = txQ }
+        end >>= loop
+    
+    (*--- Start message response (w/ synchronization ack) ---*)
+    and do_txQ_rsp0_auxTxQ_ upEvent txQ =
+        match Cf_deque.A.pop txQ with
+        | None ->
+            assert (not true);
+            Cf_deque.nil
+        | Some (wtx, txQ) ->
+            let ack = Some (upTx#put upEvent) in
+            let wtx =
+                match wtx with
+                | W_tx_rsp1 (msgno, phRx, consF, None) ->
+                    W_tx_rsp1 (msgno, phRx, consF, ack)
+                | W_tx_rspN (msgno, phRxMap, ansRxOpt, None) ->
+                    W_tx_rspN (msgno, phRxMap, ansRxOpt, ack)
+                | wtx ->
+                    assert (not true);
+                    wtx
+            in
+            Cf_deque.A.push wtx txQ
+        
+    (*--- Get frame of RPY/ERR entity for transmit ---*)
+    and get_txQ_rsp1_ c txQ msgno phRx consF closeAck =
+        let phRx = (phRx :> F.phandle_t rx) in
+        phRx#get begin fun ph ->
+            let more, body as payload = ph.F.h_data in
+            let size = Int32.of_int (Cf_message.length body) in
+            let header = {
+                F.channel = n;
+                F.msgno = msgno;
+                F.seqno = c.c_tx_seqno_;
+            } in
+            let frame = consF header payload in
+            let ack = F.join_ack ph.F.h_ack closeAck in
+            let fh = { F.h_data = frame; F.h_ack = ack } in
+            let txQ =
+                match more with
+                | F.More ->
+                    Cf_deque.B.push (W_tx_rsp1 (msgno, phRx, consF, None)) txQ
+                | F.Last ->
+                    txQ
+            in
+            let seqno = Int32.add c.c_tx_seqno_ size in
+            store { c with c_tx_seqno_ = seqno; c_txQ_ = txQ } >>= fun () ->
+            frameTx#put (F_tx_data fh) >>= loop
+        end
+    
+    (*--- Get frame of ANS entity for transmit ---*)
+    and get_txQ_rspN_cont_ c txQ msgno phRxMap ansRxOpt ansno phRx closeAck =
+        let phRx = (phRx :> F.phandle_t rx) in
+        phRx#get begin fun ph ->
+            let more, body as payload = ph.F.h_data in
+            let size = Int32.of_int (Cf_message.length body) in
+            let header = {
+                F.channel = n;
+                F.msgno = msgno;
+                F.seqno = c.c_tx_seqno_;
+            } in
+            let frame = `ANS (header, ansno, payload) in
+            let ack = F.join_ack ph.F.h_ack closeAck in
+            let fh = { F.h_data = frame; F.h_ack = ack } in
+            let txQ, putNul =
+                match more, ansRxOpt with
+                | F.Last, W_tx_ans_last ack ->
+                    txQ, Some ack
+                | F.Last, W_tx_ans_more _ ->
+                    let phRxMap = N32_map.delete ansno phRxMap in
+                    let op = W_tx_rspN (msgno, phRxMap, ansRxOpt, None) in
+                    Cf_deque.B.push op txQ, None
+                | F.More, _ ->
+                    let op = W_tx_rspN (msgno, phRxMap, ansRxOpt, None) in
+                    Cf_deque.B.push op txQ, None
+            in
+            let seqno = Int32.add c.c_tx_seqno_ size in
+            store { c with c_tx_seqno_ = seqno; c_txQ_ = txQ } >>= fun () ->
+            frameTx#put (F_tx_data fh) >>= fun () ->
+            match putNul with
+            | Some _ as ack->
+                let header = {
+                    F.channel = n;
+                    F.msgno = msgno;
+                    F.seqno = seqno;
+                } in
+                let fh = { F.h_data = `NUL header; F.h_ack = ack } in
+                frameTx#put (F_tx_data fh) >>= loop
+            | None ->
+                loop ()
+        end
+
+    (*--- Get NUL or start of answer for transmit ---*)
+    and get_txQ_rspN_next_ c txQ msgno phRxMap contOpt ansno ansRx closeAck =
+        let ansRx = (ansRx :> F.phandle_t E.answer_t rx) in
+        ansRx#get begin function
+            | E.Ans_entity phRx ->
+                let phRxMap = N32_map.replace (ansno, phRx) phRxMap in
+                let ansno = succ31_ ansno in
+                let ansS = W_tx_ans_more (ansno, ansRx) in
+                let wtx = W_tx_rspN (msgno, phRxMap, ansS, closeAck) in
+                store { c with c_txQ_ = Cf_deque.B.push wtx txQ } >>= loop
+            | E.Ans_final ack when N32_map.empty phRxMap ->
+                do_txQ_nul_ c msgno (F.join_ack (Some ack) closeAck)
+            | E.Ans_final ack ->
+                let wtx =
+                    W_tx_rspN (msgno, phRxMap, W_tx_ans_last ack, closeAck)
+                in
+                store { c with c_txQ_ = Cf_deque.B.push wtx txQ } >>= loop
+        end
+    
+    (*--- Transmit NUL frame ---*)
+    and do_txQ_nul_ c msgno ack =
+        store c >>= fun () ->
+        let header = {
+            F.channel = n;
+            F.msgno = msgno;
+            F.seqno = c.c_tx_seqno_;
+        } in
+        let fh = { F.h_data = `NUL header; F.h_ack = ack } in
+        frameTx#put (F_tx_data fh) >>= loop
+
+    (*--- Process a channel event ---*)
+    and do_channelRx_ = function
+        | C.C_tx_exchange exch -> do_channelRx_exchange_ exch
+        | C.C_tx_close closeMsg -> do_channelRx_close_ closeMsg
+        | C.C_tx_error errorMsg -> do_channelRx_error_ errorMsg
+        | C.C_tx_release -> do_channelRx_release_ ()
+        | C.C_tx_reset -> do_channelRx_reset_ ()
+
+    (*--- Process a channel event: C.C_tx_exchange ---*)
+    and do_channelRx_exchange_ exch =
+        load >>= fun c ->
+        if c.c_state_ = Ch_online then begin
+            let { E.request = `MSG phRx; E.response = rspTx } = exch in
+            let msgno = c.c_msgno_ in
+            let wtx = W_tx_msg (msgno, phRx, Some rspTx) in
+            let txQ = Cf_deque.A.push wtx c.c_txQ_ in
+            store { c with c_msgno_ = succ31_ msgno; c_txQ_ = txQ } >>= loop
+        end
+        else
+            do_control_error_ ()
+
+    (*--- Process a channel event: C.C_tx_close ---*)
+    and do_channelRx_close_ closeMsg =
+        load >>= fun c ->
+        if c.c_state_ = Ch_online then begin
+            begin
+                if Cf_deque.empty c.c_txQ_ then begin
+                    upTx#put (Up_close (n, closeMsg)) >>= fun () ->
+                    Cf_cmonad.return None
+                end
+                else
+                    Cf_cmonad.return (Some closeMsg)
+            end >>= fun s ->
+            store { c with c_state_ = Ch_close_tx s } >>= loop
+        end
+        else
+            do_control_error_ ()
+
+    (*--- Process a channel event: C.C_tx_error ---*)
+    and do_channelRx_error_ errorMsg =
+        load >>= fun c ->
+        if c.c_state_ = Ch_close_rx then begin
+            store { c with c_state_ = Ch_online } >>= fun () ->
+            upTx#put (Up_error (n, errorMsg)) >>= loop
+        end
+        else
+            do_control_error_ ()
+
+    (*--- Process a channel event: C.C_tx_release ---*)
+    and do_channelRx_release_ () =
+        load >>= fun c ->
+        if c.c_state_ = Ch_close_rx || n = Int32.zero then begin
+            store { c with c_state_ = Ch_finalizing } >>= fun () ->
+            frameTx#put F_tx_final >>= loop
+        end
+        else
+            do_control_error_ ()
+
+    (*--- Process a channel event: C.C_tx_reset ---*)
+    and do_channelRx_reset_ () =
+        load >>= fun c ->
+        if c.c_state_ = Ch_online then begin
+            frameTx#put F_tx_abort >>= fun () ->
+            upTx#put (Up_reset n)
+            (* done processing *)
+        end
+        else
+            do_control_error_ ()
+
+    (*--- Process a session event ---*)
+    and do_dnRx_ event =
+        load >>= fun c ->
+        match event with
+        | Dn_close closeMsg ->
+            ch_state_ Ch_close_rx >>= fun () ->
+            channelTx#put (C.C_rx_close closeMsg) >>= loop
+        | Dn_error errorMsg ->
+            ch_state_ Ch_online >>= fun () ->
+            channelTx#put (C.C_rx_error errorMsg) >>= loop
+        | Dn_release ->
+            ch_state_ Ch_releasing >>= loop
+        | Dn_final ->
+            frameTx#put F_tx_final >>= loop
+        | Dn_abort ->
+            frameTx#put F_tx_abort >>= fun () ->
+            channelTx#put C.C_rx_abort
+            (* done processing *)
+
+    (*--- Throw BEEP control error: Err.X_control_error ---*)
+    and do_control_error_ () =
+        do_beep_error_ Err.X_control_error
+    
+    (*--- Throw BEEP error ---*)
+    and do_beep_error_ error =
+        (*
+        X.printf "do_beep_error_: %s\n" (Err.to_string error);
+        flush stdout;
+        *)
+        channelTx#put C.C_rx_abort >>= fun () ->
+        upTx#put (Up_failed (n, (Err.X error)))
+        (* done processing *)
+    in
+    
+    (*--- Start main loop ---*)
+    start (loop ()) (match c with Some c -> c | None -> channel0_)
+
+class config =
+    object
+        method initial_timeout = 5
+        method release_timeout = 5
+        method new_locale localize = new Beep_locale.default localize
+    end
+
+type parameters_t = {
+    p_initialTimeout_: int;
+    p_releaseTimeout_: int;
+    p_newLocale_: string list -> Beep_locale.t;
+}
+
+let parameters_ cfg =
+    let cfg = (cfg :> config) in {
+        p_initialTimeout_ = cfg#initial_timeout;
+        p_releaseTimeout_ = cfg#release_timeout;
+        p_newLocale_ = cfg#new_locale;
+    }
+
+let rec emit_attribute_value_tokens_ pp = function
+    | [] ->
+        ()
+    | hd :: [] ->
+        Format.pp_print_string pp hd
+    | hd :: tl ->
+        Format.pp_print_string pp hd;
+        Format.pp_print_space pp ();
+        emit_attribute_value_tokens_ pp tl
+
+let xml_header_string_ = Cf_message.contents E.xml_header_text
+
+module E_profile = struct
+    open Cf_parser.Op
+    include C.M_profile
+        
+    let uri_required_ = "#REQUIRED"
+    let empty_content_ = ""
+    
+    let initial_ = {
+        uri = uri_required_;
+        encoding = E_none;
+        content = empty_content_;
+    }
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("profile", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+    
+    let parse =
+        let attr pos obj (k, v as pair) =
+            match k with
+            | "uri" ->
+                { obj with C.M_profile.uri = v }
+            | "encoding" ->
+                begin
+                    match v with
+                    | "none" -> { obj with encoding = E_none }
+                    | "base64" -> { obj with encoding = E_base64 }
+                    | _ -> invalid_ pos pair
+                end
+            | _ ->
+                invalid_ pos pair
+        in
+        let content obj =
+            if obj.uri == uri_required_ then
+                Xml_parser.invalid
+            else begin
+                Xml_parser.accumulated_character_data >>= fun msg ->
+                let data = Cf_message.contents msg in
+                let data =
+                    if obj.encoding = E_base64 then
+                        Mime_base64.D.atomic data
+                    else
+                        data
+                in
+                ~:{ obj with content = data }
+            end
+        in
+        let space = Xml_parser.S_preserve in
+        fun _ ->
+            Xml_parser.validated_element
+                ~tag:"profile" ~attr ~content ~space initial_
+    
+    let emit pp x =
+        Format.pp_print_string pp "<profile uri='";
+        Format.pp_print_string pp x.uri;
+        Format.pp_print_char pp '\'';
+        begin
+            match x.encoding with
+            | E_base64 ->
+                Format.pp_print_string pp " encoding='base64'"
+            | E_none ->
+                ()
+        end;
+        if x.content = "" then
+            Format.pp_print_string pp "/>"
+        else begin
+            Format.pp_print_char pp '>';
+            Format.pp_print_string pp x.content;
+            Format.pp_print_string pp "</profile>"
+        end
+end
+
+module M_profile = struct
+    type t = E_profile.t
+    let parse = E_profile.parse
+
+    let emit pp x =
+        Format.pp_print_string pp xml_header_string_;
+        E_profile.emit pp x;
+        E.xml_header
+end
+
+module M_ok = struct
+    open Cf_parser.Op
+    
+    type t = unit
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("ok", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+
+    let parse =
+        let attr pos () pair = invalid_ pos pair in
+        let content () = Xml_parser.optional_whitespace in
+        let space = Xml_parser.S_preserve in
+        fun _ ->
+            Xml_parser.validated_element ~tag:"ok" ~attr ~content ~space ()
+    
+    let emit pp () =
+        Format.pp_print_string pp xml_header_string_;
+        Format.pp_print_string pp "<ok/>";
+        E.xml_header
+end
+
+module M_error = struct
+    open Cf_parser.Op
+    include C.M_error
+        
+    let empty_content_ = ""
+    
+    let initial_ = {
+        code = -1;
+        lang = None;
+        content = empty_content_;
+    }
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("error", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+
+    let parse =
+        let attr pos obj (k, v as pair) =
+            match k with
+            | "code" ->
+                if String.length v <> 3 then invalid_ pos pair;
+                String.iter begin function
+                    | '0'..'9' -> ()
+                    | _ -> invalid_ pos pair
+                end v;
+                { obj with code = int_of_string v }
+            | "xml:lang" ->
+                { obj with lang = Some v }
+            | _ ->
+                invalid_ pos pair
+        in
+        let content obj =
+            if obj.code = -1 then
+                Xml_parser.invalid
+            else
+                Xml_parser.accumulated_character_data >>= fun msg ->
+                ~:{ obj with content = Cf_message.contents msg }
+        in
+        fun _ ->
+            Xml_parser.validated_element ~tag:"error" ~attr ~content initial_
+    
+    let emit pp x =
+        if x.code < 100 || x.code > 999 then
+            invalid_arg "Beep_transport.M_error.emit: invalid code number";
+        Format.pp_print_string pp xml_header_string_;
+        Format.pp_print_string pp "<error code='";
+        Format.pp_print_int pp x.code;
+        Format.pp_print_char pp '\'';
+        begin
+            match x.lang with
+            | None -> ()
+            | Some s ->
+                Format.pp_print_string pp " xml:lang='";
+                Format.pp_print_string pp s;
+                Format.pp_print_char pp '\'';
+        end;
+        if x.content = "" then
+            Format.pp_print_string pp "/>"
+        else begin
+            Format.pp_print_char pp '>';
+            Format.pp_print_string pp x.content;
+            Format.pp_print_string pp "</error>"
+        end;
+        E.xml_header
+end
+
+module M_start = struct
+    open Cf_parser.Op
+    
+    type t = {
+        number: int32;
+        serverName: string option;
+        profiles: E_profile.t * E_profile.t list;
+    }
+    
+    let number_required_ = -1l
+    
+    let initial_ = {
+        number = number_required_;
+        serverName = None;
+        profiles = E_profile.initial_, [];
+    }
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("start", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+
+    let parse =
+        let attr pos obj (k, v as pair) =
+            match k with
+            | "number" ->
+                let n = Int32.of_string v in
+                let v' = Int32.to_string n in
+                if not (v = v' && n >= Int32.zero) then invalid_ pos pair;
+                { obj with number = n }
+            | "serverName" ->
+                { obj with serverName = Some v }
+            | _ ->
+                invalid_ pos pair
+        in
+        let content header obj =
+            if obj.number == number_required_ then
+                Xml_parser.invalid
+            else begin
+                Xml_parser.optional_whitespace >>= fun () ->
+                ?+ begin
+                    E_profile.parse header >>= fun profile ->
+                    Xml_parser.optional_whitespace >>= fun () ->
+                    ~:profile
+                end >>= fun (hd, tl) ->
+                ~:{ obj with profiles = hd, tl }
+            end
+        in
+        fun header ->
+            let content = content header in
+            Xml_parser.validated_element ~tag:"start" ~attr ~content initial_
+    
+    let emit pp x =
+        if x.number < Int32.zero || x.number > Int32.max_int then
+            invalid_arg "Beep_transport.E_start.emit: invalid channel number";
+        Format.pp_open_vbox pp 2;
+        Format.pp_print_string pp xml_header_string_;
+        Format.pp_print_string pp "<start number='";
+        Format.pp_print_string pp (Int32.format "%u" x.number);
+        Format.pp_print_char pp '\'';
+        begin
+            match x.serverName with
+            | None -> ()
+            | Some s ->
+                Format.pp_print_string pp " serverName='";
+                Format.pp_print_string pp s;
+                Format.pp_print_char pp '\'';
+        end;
+        let hd, tl = x.profiles in
+        Format.pp_print_char pp '>';
+        List.iter begin fun profile ->
+            Format.pp_print_cut pp ();
+            E_profile.emit pp profile;
+        end (hd :: tl);
+        Format.pp_close_box pp ();
+        Format.pp_print_cut pp ();
+        Format.pp_print_string pp "</start>";
+        E.xml_header
+end
+
+module M_close = struct
+    open Cf_parser.Op
+    
+    type t = {
+        number: int32;
+        code: int;
+        lang: string option;
+        content: string;
+    }
+    
+    let empty_content_ = ""
+    
+    let initial_ = {
+        number = 0l;
+        code = -1;
+        lang = None;
+        content = empty_content_;
+    }
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("close", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+
+    let parse =
+        let attr pos obj (k, v as pair) =
+            match k with
+            | "number" ->
+                let n = Int32.of_string v in
+                let v' = Int32.to_string n in
+                if not (v = v' && n >= Int32.zero) then invalid_ pos pair;
+                { obj with number = n }
+            | "code" ->
+                if String.length v <> 3 then
+                    invalid_ pos pair;
+                String.iter begin function
+                    | '0'..'9' -> ()
+                    | _ -> invalid_ pos pair
+                end v;
+                { obj with code = int_of_string v }
+            | "xml:lang" ->
+                { obj with lang = Some v }
+            | _ ->
+                invalid_ pos pair
+        in
+        let content obj =
+            Xml_parser.accumulated_character_data >>= fun msg ->
+            ~:{ obj with content = Cf_message.contents msg }
+        in
+        fun _ ->
+            Xml_parser.validated_element ~tag:"close" ~attr ~content initial_
+    
+    let emit pp x =
+        if x.number < Int32.zero || x.number > Int32.max_int then
+            invalid_arg "Beep_transport.E_close.emit: invalid channel number";
+        if x.code < 100 || x.code > 999 then
+            invalid_arg "Beep_transport.E_close.emit: invalid code number";
+        Format.pp_print_string pp xml_header_string_;
+        Format.pp_print_string pp "<close";
+        if x.number <> Int32.zero then begin
+            Format.pp_print_string pp " number='";
+            Format.pp_print_string pp (Int32.format "%u" x.number);
+            Format.pp_print_char pp '\'';
+        end;
+        Format.pp_print_string pp " code='";
+        Format.pp_print_int pp x.code;
+        Format.pp_print_char pp '\'';
+        begin
+            match x.lang with
+            | None -> ()
+            | Some s ->
+                Format.pp_print_string pp " xml:lang='";
+                Format.pp_print_string pp s;
+                Format.pp_print_char pp '\'';
+        end;
+        if x.content = "" then
+            Format.pp_print_string pp "/>"
+        else begin
+            Format.pp_print_char pp '>';
+            Format.pp_print_string pp x.content;
+            Format.pp_print_string pp "</close>"
+        end;
+        E.xml_header
+end
+
+module M_greeting = struct
+    open Cf_parser.Op
+    
+    type t = {
+        features: string list;
+        localize: string list;
+        profiles: C.M_profile.t list;
+    }
+    
+    let default = {
+        features = [];
+        localize = [];
+        profiles = [];
+    }
+    
+    let invalid_ pos pair =
+        let event = Xml_event.T_element_start ("greeting", [pair]), pos in
+        raise (Xml_parser.Invalid event)
+
+    let parse =
+        let split v =
+            let vs = (Cf_seq.of_string v) in
+            match Xml_parser.split_attribute_value vs with
+            | None -> []
+            | Some (vs, _) -> vs
+        in
+        let attr pos obj (k, v as pair) =
+            match k with
+            | "features" -> { obj with features = split v }
+            | "localize" -> { obj with localize = split v }
+            | _ -> invalid_ pos pair
+        in
+        let content header obj =
+            ?* begin
+                E_profile.parse header >>= fun profile ->
+                if profile.E_profile.content <> "" then
+                    Xml_parser.invalid
+                else
+                    ~:profile
+            end >>= fun profiles ->
+            ~:{ obj with profiles = profiles }
+        in
+        let space = Xml_parser.S_preserve in
+        fun header ->
+            let content = content header in
+            Xml_parser.validated_element 
+                ~tag:"greeting" ~attr ~content ~space default
+    
+    let emit pp x =
+        Format.pp_open_vbox pp 2;
+        Format.pp_print_string pp xml_header_string_;
+        Format.pp_print_string pp "<greeting";
+        if x.features <> [] then begin
+            Format.pp_print_string pp " features='";
+            emit_attribute_value_tokens_ pp x.features;
+            Format.pp_print_char pp '\'';
+        end;
+        if x.localize <> [] then begin
+            Format.pp_print_string pp " localize='";
+            emit_attribute_value_tokens_ pp x.localize;
+            Format.pp_print_char pp '\'';
+        end;
+        if x.profiles = [] then begin
+            Format.pp_print_string pp "/>";
+            Format.pp_close_box pp ()
+        end
+        else begin
+            Format.pp_print_char pp '>';
+            List.iter begin fun profile ->
+                Format.pp_print_cut pp ();
+                E_profile.emit pp profile;
+            end x.profiles;
+            Format.pp_close_box pp ();
+            Format.pp_print_cut pp ();
+            Format.pp_print_string pp "</greeting>"
+        end;
+        E.xml_header
+end
+
+module E_request = struct
+    open Cf_parser.Op
+    
+    type t = Start of M_start.t | Close of M_close.t
+    
+    let parse h =
+        Cf_parser.alt [
+            (M_start.parse h >>= fun start -> ~:(Start start));
+            (M_close.parse h >>= fun close -> ~:(Close close));
+        ]
+    
+    let emit pp = function
+        | Start e -> M_start.emit pp e
+        | Close e -> M_close.emit pp e
+end
+
+module E_reply = struct
+    open Cf_parser.Op
+    
+    type t = Profile of E_profile.t | Ok of (unit, unit) Iom_reactor.t option
+    
+    let parse h = assert false  (* never called *)
+    
+    let emit pp = function
+        | Profile p -> M_profile.emit pp p
+        | Ok _ -> M_ok.emit pp ()
+end
+
+module P_start = struct
+    let header_processing = `H_leave
+    
+    module MSG = M_start
+    module RPY = M_profile
+    module ERR = M_error
+    module ANS = E_xml.Null_entity
+end
+
+module P_close = struct
+    let header_processing = `H_leave
+    
+    module MSG = M_close
+    module RPY = M_ok
+    module ERR = M_error
+    module ANS = E_xml.Null_entity
+end
+
+module P_union = struct
+    let header_processing = `H_leave
+    
+    module MSG = E_request
+    module RPY = E_reply
+    module ERR = M_error
+    module ANS = E_xml.Null_entity
+end
+
+module E_xml_start = E_xml.Create(P_start)
+module E_xml_close = E_xml.Create(P_close)
+module E_xml_union = E_xml.Create(P_union)
+
+type transport_rx_t =
+    | M_rx_start of int32 * (frame_rx_t rx * frame_tx_t tx)
+    | M_rx_final
+    | M_rx_unlink
+    | M_rx_failure of exn
+
+type transport_tx_t =
+    | M_tx_start of int32
+    | M_tx_final
+    | M_tx_unlink
+    | M_tx_abort
+    
+type role_t = R_initiator | R_listener
+
+type control_rx_t =
+    | C_rx_failure of exn
+    | C_rx_greeting of M_greeting.t
+    | C_rx_error of Beep_channel.M_error.t
+    | C_rx_close of Beep_channel.M_close.t
+    | C_rx_reset
+    | C_rx_final
+
+type control_tx_t =
+    | C_tx_greeting of M_greeting.t * Beep_channel.service list
+    | C_tx_error of Beep_channel.M_error.t
+    | C_tx_start of Beep_channel.client
+    | C_tx_close of Beep_channel.M_close.t
+    | C_tx_release
+    | C_tx_abort
+
+class initial_client_exchange cancelRx upTx =
+    let cancelRx = (cancelRx :> unit rx) in
+    let upTx = (upTx :> 'up tx) in
+    let hp = `H_leave in
+    object(self)
+        inherit EE.client ~initial:() [] hp
+        
+        method private entity dnCancelTx rspRx =
+            let dnCancelTx = (dnCancelTx :> EE.cancel_t tx) in
+            let rspRx = (rspRx :> EE.rx_event_t EE.response_t rx) in
+            let rec loop () =
+                guard begin
+                    cancelRx#get do_cancelRx_ >>= fun () ->
+                    rspRx#get do_rspRx_
+                end
+            and do_cancelRx_ () =
+                dnCancelTx#put EE.Cancel
+            and do_rspRx_ = function
+                | `RPY (`Entity (h, p)) ->
+                    upTx#put begin
+                        match E_xml.parse_aux ~hp ~f:M_greeting.parse h p with
+                        | Cf_exnopt.U u -> `Greeting u
+                        | Cf_exnopt.X x -> `Fail x
+                    end
+                | `ERR (`Entity (h, p)) ->
+                    upTx#put begin
+                        match E_xml.parse_aux ~hp ~f:M_error.parse h p with
+                        | Cf_exnopt.U u -> `Decline u
+                        | Cf_exnopt.X x -> `Fail x
+                    end
+                | `RPY (`Error x)
+                | `ERR (`Error x) ->
+                    upTx#put (`Fail x)
+                | _ ->
+                    upTx#put (`Fail (Err.X Err.X_bad_frame_type))
+            in
+            start (loop ()) ()
+    end
+
+class initial_service_exchange initRx =
+    let initRx = (initRx :> 'a rx) in
+    object(self)
+        inherit Beep_entity_exchange.service ~initial:() () `H_leave
+        
+        method private entity _ rspTx =
+            guard begin
+                initRx#get begin fun initMsg ->
+                    let rspMsg =
+                        match initMsg with
+                        | `Greeting m ->
+                            `RPY (EE.emit_aux ~f:M_greeting.emit m)
+                        | `Decline m ->
+                            `ERR (EE.emit_aux ~f:M_error.emit m)
+                    in
+                    let rspTx =
+                        (rspTx :> EE.entity_t EE.response_t F.handle_t tx)
+                    in
+                    rspTx#put { F.h_data = rspMsg; F.h_ack = None }
+                end
+            end
+    end
+
+class start_client_exchange (cancelRx, upTx) startAuxMsg =
+    let upTx = (upTx :> 'up tx) in
+    object(self)
+        inherit E_xml_start.client ~cancel:cancelRx startAuxMsg ()
+        
+        method private rpy v =
+            upTx#put begin
+                match v with
+                | Cf_exnopt.U u -> `Profile u
+                | Cf_exnopt.X x -> `Fail x
+            end
+        
+        method private err v =
+            upTx#put begin
+                match v with
+                | Cf_exnopt.U e -> `Error e
+                | Cf_exnopt.X x -> `Fail x
+            end
+        
+        method private ans _ = self#nul
+
+        method private nul =
+            upTx#put (`Fail (Err.X Err.X_bad_frame_type))
+    end
+
+class close_client_exchange (cancelRx, upTx) closeAuxMsg =
+    let upTx = (upTx :> 'up tx) in
+    let n = closeAuxMsg.M_close.number in
+    object(self)
+        inherit E_xml_close.client ~cancel:cancelRx closeAuxMsg ()
+        
+        method private rpy v =
+            upTx#put begin
+                match v with
+                | Cf_exnopt.U () -> `Ok
+                | Cf_exnopt.X x -> `Fail x
+            end
+        
+        method private err v =
+            upTx#put begin
+                match v with
+                | Cf_exnopt.U e -> `Error e
+                | Cf_exnopt.X x -> `Fail x
+            end
+        
+        method private ans _ = self#nul
+
+        method private nul =
+            upTx#put (`Fail (Err.X Err.X_bad_frame_type))
+    end
+
+class union_service_exchange (dnRx, upTx) =
+    let dnRx = (dnRx :> 'dn rx) in
+    let upTx = (upTx :> 'up tx) in
+    object(self)
+        inherit E_xml_union.service ()
+        
+        method private msg dnTx dnMsg =
+            let dnTx = (dnTx :> 'dnxml tx) in
+            upTx#put dnMsg >>= fun () ->
+            let f dnRpy =
+                dnTx#put begin
+                    match dnRpy with
+                    | `RPY (E_reply.Ok sync as v) -> self#rpy ?sync v
+                    | `RPY v -> self#rpy v
+                    | `ERR v -> self#err v
+                end
+            in
+            guard (dnRx#get f)
+    end
+
+type s0_tx_event_t = [ `Decline of M_error.t | `Greeting of M_greeting.t ]
+type s0_rx_event_t = [ s0_tx_event_t | `Fail of exn ]
+
+type s0_tx_t =
+    | S0_tx0 of s0_tx_event_t tx
+    | S0_tx1 of C.service list
+    | S0_txfin
+
+type s0_rx_t =
+    | S0_rx0 of s0_rx_event_t rx * unit tx
+    | S0_rx1 of Beep_locale.t
+
+type session_variant_t =
+    | Sv_ready
+    | Sv_tuning of int32
+    | Sv_unlinking
+    | Sv_close_tx of N32_set.t
+    | Sv_close_rx of N32_set.t option
+
+type s_rxq_entry_msg_t = [ `ERR of M_error.t | `RPY of E_reply.t ]
+
+type s_rxq_entry_t =
+    | C0_rx_close of int32 * s_rxq_entry_msg_t tx
+    | C0_rx_start0 of
+        int32 * s_rxq_entry_msg_t tx * C.service_start_t rx
+    | C0_rx_start1 of
+        int32 * s_rxq_entry_msg_t tx * C.M_profile.t * C.profile
+    | C0_rx_union of E_xml_union.P.MSG.t Cf_exnopt.t rx * s_rxq_entry_msg_t tx
+
+type s_txq_entry_start_t =
+    [ `Error of M_error.t | `Fail of exn | `Profile of C.M_profile.t ]
+
+type s_txq_entry_close_t =
+    [ `Error of M_error.t | `Fail of exn | `Ok ]
+
+type s_txq_entry_t =
+    | C0_tx_close of int32 * s_txq_entry_close_t rx * unit tx
+    | C0_tx_close_c0 of s_txq_entry_close_t rx * unit tx * E.t option
+    | C0_tx_start0 of C.M_start.t rx * C.client_start_t tx
+    | C0_tx_start1 of int32 * C.M_start.t * C.client_start_t tx
+    | C0_tx_start2 of
+        int32 * s_txq_entry_start_t rx * unit tx * C.client_start_t tx *
+        (frame_rx_t rx * frame_tx_t tx)
+    | C0_tx_start3 of int32 * C.profile rx * (frame_rx_t rx * frame_tx_t tx)
+
+type session_t = {
+    s_srvList_: C.service list;
+    s_locale_: Beep_locale.t;
+    s_channo_: int32;
+    s_variant_: session_variant_t;
+    s_rxQ_: s_rxq_entry_t Cf_deque.t;
+    s_txQ_: s_txq_entry_t Cf_deque.t;
+    s_cMap_: dn_event_t tx N32_map.t;
+}
+
+type state_t =
+  | S_initial
+  | S_greeting of dn_event_t tx * s0_rx_t * s0_tx_t
+  | S_online of session_t
+  | S_final
+
+let online_ dnTx role locale srvList =
+    S_online {
+        s_srvList_ = srvList;
+        s_locale_ = locale;
+        s_channo_ = (if role == R_initiator then Int32.one else 2l);
+        s_variant_ = Sv_ready;
+        s_rxQ_ = Cf_deque.nil;
+        s_txQ_ = Cf_deque.nil;
+        s_cMap_ = N32_map.replace (Int32.zero, dnTx) N32_map.nil;
+    }
+
+let ch_succ_ n =
+    if n = Int32.max_int then
+        Int32.one
+    else if Int32.succ n = Int32.max_int then
+        2l
+    else
+        Int32.succ (Int32.succ n)
+
+let cons_errorMsg_ s code =
+    let loc = s.s_locale_ in
+    `ERR {
+        C.M_error.code = code;
+        C.M_error.lang = loc#lang;
+        C.M_error.content = loc#content code;
+    }
+
+(*
+let cCounter_ = ref 0
+*)
+
+let core:
+    ?cfg:#config -> (transport_rx_t #rx * transport_tx_t #tx) ->
+    (control_tx_t #rx * control_rx_t #tx) -> role_t -> kernel_t -> ('s, unit) t
+= fun ?cfg (mapRx, mapTx) (ctrlRx, ctrlTx) role k ->
+    (*
+    incr cCounter_;
+    let module X_input = struct
+        let tag = Printf.sprintf "Beep_transport.core %u" !cCounter_
+    end in
+    let module X = X_create(X_input) in
+    *)
+
+    (*--- Normalize parameters ---*)
+    let p =
+        parameters_ begin
+            match cfg with
+            | None -> new config
+            | Some cfg -> (cfg :> config)
+        end
+    in
+    let mapRx = (mapRx :> transport_rx_t rx) in
+    let mapTx = (mapTx :> transport_tx_t tx) in
+    let ctrlRx = (ctrlRx :> control_tx_t rx) in
+    let ctrlTx = (ctrlTx :> control_rx_t tx) in
+    
+    (*--- Create the internal up_event_t simplex ---*)
+    simplex >>= fun (upRx, upTx) ->
+    let upRx = (upRx :> up_event_t rx) in
+    let upTx = (upTx :> up_event_t tx) in
+    
+    (*--- Create the management channel duplex ---*)
+    duplex >>= fun ((c0Rx, c0Tx), c0IO) ->
+    let c0Rx = (c0Rx :> C.control_rx_t rx) in
+    let c0Tx = (c0Tx :> C.control_tx_t tx) in
+
+    (*--- Main loop ---*)
+    let rec loop () =
+        load >>= fun s ->
+        guard begin
+            mapRx#get do_mapRx_ >>= fun () ->
+            upRx#get do_upRx_ >>= fun () ->
+            match s with
+            | S_initial
+            | S_final ->
+                Cf_cmonad.return ()
+            | S_greeting (dnTx, S0_rx0 (initMsgRx, cancelTx), txState) ->
+                ctrlRx#get do_ctrlRx_ >>= fun () ->
+                get_initMsgRx_ dnTx txState initMsgRx
+            | S_greeting (_, _, _) ->
+                ctrlRx#get do_ctrlRx_
+            | S_online s ->
+                ctrlRx#get do_ctrlRx_ >>= fun () ->
+                c0Rx#get do_c0Rx_ >>= fun () ->
+                gate_online_rxQ_ s >>= fun () ->
+                gate_online_txQ_ s
+        end
+    
+    (*--- Gate on S_online (receive)---*)
+    and gate_online_rxQ_ s =
+        match Cf_deque.B.pop s.s_rxQ_ with
+        | None ->
+            Cf_cmonad.return ()
+        | Some (op, rxQ) ->
+            let s = { s with s_rxQ_ = rxQ } in
+            match op with
+            | C0_rx_union (srvRx, srvTx) ->
+                get_c0SrvRx_ s srvRx srvTx
+            | C0_rx_start0 (n, srvTx, srvRspRx) ->
+                get_c0SrvRspRx_ s n srvTx srvRspRx
+            | C0_rx_start1 _
+            | C0_rx_close _ ->
+                Cf_cmonad.return ()
+    
+    (*--- Gate on S_online (transmit)---*)
+    and gate_online_txQ_ s =
+        match Cf_deque.B.pop s.s_txQ_ with
+        | None ->
+            Cf_cmonad.return ()
+        | Some (op, txQ) ->
+            let s = { s with s_txQ_ = txQ } in
+            match op with
+            | C0_tx_start0 (startRx, connectTx) ->
+                get_c0StartMsgRx_ s startRx connectTx
+            | C0_tx_start1 (n, startMsg, connectTx) ->
+                Cf_cmonad.return ()
+            | C0_tx_start2 (n, startRpyRx, cancelTx, connectTx, frameIO) ->
+               get_c0StartRpyRx_ s n startRpyRx cancelTx connectTx frameIO
+            | C0_tx_start3 (n, profileRx, frameIO) ->
+               get_c0ClientProfileRx_ s n profileRx frameIO
+            | C0_tx_close (n, closeRpyRx, cancelTx) ->
+                get_c0CloseRpyRx_ s n closeRpyRx cancelTx
+            | C0_tx_close_c0 (closeRpyRx, cancelTx, exchOpt) ->
+                get_c0CloseRpyRx_c0_ s closeRpyRx cancelTx exchOpt
+
+    (*--- Process transport event ---*)
+    and do_mapRx_ = function
+        | M_rx_start (n, frameIO) when n = Int32.zero ->
+            do_mapRx_start_ch0_ frameIO
+        | M_rx_start (n, frameIO) ->
+            do_mapRx_start_ n frameIO
+        | M_rx_final ->
+            do_mapRx_final_ ()
+        | M_rx_unlink ->
+            do_mapRx_unlink_ ()
+        | M_rx_failure x ->
+            do_mapRx_failure_ x
+    
+    (*--- Process transport event: M_rx_start ---*)
+    and do_mapRx_start_ch0_ frameIO =
+        load >>= fun s ->
+        assert (s = S_initial);
+        simplex >>= fun (dnRx, dnTx) ->
+        let dnRx = (dnRx :> dn_event_t rx) in
+        let dnTx = (dnTx :> dn_event_t tx) in
+        simplex >>= fun (cancelRx, cancelTx) ->
+        let cancelRx = (cancelRx :> unit rx) in
+        let cancelTx = (cancelTx :> unit tx) in
+        simplex >>= fun (srvRx, srvTx) ->
+        let srvRx = (srvRx :> 'srv rx) in
+        let srvTx = (srvTx :> 'srv tx) in
+        simplex >>= fun (initRspRx, initRspTx) ->
+        let sExchObj = new initial_service_exchange srvRx in
+        let sExch = {
+            E.request = `MSG (new rx null);
+            E.response = initRspTx;
+        } in
+        sExchObj#link sExch >>= fun () ->
+        simplex >>= fun (clntRx, clntTx) ->
+        let clntRx = (clntRx :> 'clnt rx) in
+        let clntTx = (clntTx :> 'clnt tx) in
+        let cExchObj = new initial_client_exchange cancelRx clntTx in
+        cExchObj#link >>= fun { E.response = rspTx } ->
+        let wrx = Int32.zero, W_rx_p_pending, W_rx_rsp0 rspTx in
+        let rxQ = Cf_deque.A.push wrx Cf_deque.nil in
+        let wtx = W_tx_rsp0 (Int32.zero, initRspRx, None) in
+        let txQ = Cf_deque.A.push wtx Cf_deque.nil in
+        let c0 = {
+            channel0_ with
+            c_msgno_ = Int32.one;
+            c_rxQ_ = rxQ;
+            c_txQ_ = txQ;
+        } in
+        channel_ ~c:c0 Int32.zero frameIO c0IO (dnRx, upTx) >>= fun () ->
+        let s = S_greeting (dnTx, S0_rx0 (clntRx, cancelTx), S0_tx0 srvTx) in
+        store s >>= loop
+    
+    (*--- Process transport event: M_rx_start ---*)
+    and do_mapRx_start_ n frameIO =
+        load >>= function
+        | S_initial
+        | S_final
+        | S_greeting _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+        | S_online s ->
+            match Cf_deque.B.pop s.s_rxQ_ with
+            | Some (C0_rx_start1 (n, srvTx, profileMsg, obj), rxQ) ->
+                srvTx#put (`RPY (E_reply.Profile profileMsg)) >>= fun () ->
+                simplex >>= fun (dnRx, dnTx) ->
+                let muxIO = dnRx, upTx in
+                let cMap = N32_map.replace (n, dnTx) s.s_cMap_ in
+                let s = { s with s_rxQ_ = rxQ; s_cMap_ = cMap } in
+                store (S_online s) >>= fun () ->
+                obj#profile >>= fun chIO ->
+                channel_ n frameIO chIO muxIO >>= loop
+            | _ ->
+                match Cf_deque.B.pop s.s_txQ_ with
+                | Some (C0_tx_start1 (n, startMsg, connectTx), tl) ->
+                    duplex >>= fun ((startRpyRx, cancelTx), exchIO) ->
+                    let startAux = {
+                        M_start.number = n;
+                        M_start.serverName = startMsg.C.M_start.serverName;
+                        M_start.profiles = startMsg.C.M_start.profiles;
+                    } in
+                    let exchObj = new start_client_exchange exchIO startAux in
+                    exchObj#link >>= fun exch ->
+                    c0Tx#put (C.C_tx_exchange exch) >>= fun () ->
+                    let txOp =
+                        C0_tx_start2
+                        (n, startRpyRx, cancelTx, connectTx, frameIO)
+                    in
+                    let txQ = Cf_deque.B.push txOp tl in
+                    store (S_online { s with s_txQ_ = txQ }) >>= loop
+                | _ ->
+                    assert (not true);
+                    Cf_cmonad.return ()
+    
+    (*--- Process transport event: M_rx_final ---*)
+    and do_mapRx_final_ () =
+        load >>= function
+        | S_online { s_variant_ = (Sv_close_rx _ | Sv_close_tx _) } ->
+            ctrlTx#put C_rx_final
+            (* done processing *)
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+    
+    (*--- Process transport event: M_rx_unlink ---*)
+    and do_mapRx_unlink_ () =
+        load >>= function
+        | S_online { s_variant_ = Sv_unlinking } ->
+            ctrlTx#put C_rx_reset
+            (* done processing *)
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+    
+    (*--- Process transport event: M_rx_failure ---*)
+    and do_mapRx_failure_ x =
+        do_abort_channels_ () >>= fun () ->
+        ctrlTx#put (C_rx_failure x)
+        (* done processing *)
+
+    (*--- Process session event ---*)
+    and do_upRx_ event =
+        load >>= function
+        | S_online s ->
+            begin
+                match event with
+                | Up_close (n, closeMsg) -> do_upRx_close_ s n closeMsg
+                | Up_error (n, errorMsg) -> do_upRx_error_ s n errorMsg
+                | Up_sync n -> do_upRx_sync_ s n
+                | Up_final n -> do_upRx_final_ s n
+                | Up_failed (n, x) -> do_upRx_failed_ s n x
+                | Up_reset n -> do_upRx_reset_ s n
+            end
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+
+    (*--- Process session event: Up_close ---*)
+    and do_upRx_close_ s n closeMsg =
+        duplex >>= fun ((closeRpyRx, cancelTx), exchIO) ->
+        let closeAux = {
+            M_close.number = n;
+            M_close.code = closeMsg.C.M_close.code;
+            M_close.lang = closeMsg.C.M_close.lang;
+            M_close.content = closeMsg.C.M_close.content;
+        } in
+        let exchObj = new close_client_exchange exchIO closeAux in
+        exchObj#link >>= fun exch ->
+        c0Tx#put (C.C_tx_exchange exch) >>= fun () ->
+        let txOp = C0_tx_close (n, closeRpyRx, cancelTx) in
+        let txQ = Cf_deque.A.push txOp s.s_txQ_ in
+        store (S_online { s with s_txQ_ = txQ }) >>= loop
+
+    (*--- Process session event: Up_error ---*)
+    and do_upRx_error_ s n errorMsg =
+        begin
+            match Cf_deque.B.pop s.s_rxQ_ with
+            | Some (C0_rx_close (n', srvTx), tl) when n = n' ->
+                srvTx#put (`ERR errorMsg) >>= fun () ->
+                Cf_cmonad.return tl
+            | _ ->
+                assert (not true);
+                Cf_cmonad.return s.s_rxQ_
+        end >>= fun rxQ ->
+        assert (N32_map.member n s.s_cMap_);
+        let s = { s with s_rxQ_ = rxQ } in
+        store (S_online s) >>= loop
+
+    (*--- Process session event: Up_sync ---*)
+    and do_upRx_sync_ s n =
+        let u =
+            match s.s_variant_ with
+            | Sv_close_tx u -> u
+            | Sv_close_rx (Some u) -> u
+            | _ -> assert (not true); N32_set.nil
+        in
+        if n <> Int32.zero then begin
+            assert (N32_set.member n u);
+            let u = N32_set.clear n u in
+            let s = { s with s_variant_ = Sv_close_rx (Some u) } in
+            store (S_online s) >>= loop
+        end
+        else begin
+            assert (N32_set.empty u);
+            c0Tx#put C.C_tx_release >>= loop
+        end
+
+    (*--- Process session event: Up_final ---*)
+    and do_upRx_final_ s n =
+        if n <> Int32.zero then begin
+            begin
+                match Cf_deque.B.pop s.s_rxQ_ with
+                | Some (C0_rx_close (n', srvTx), tl) when n = n' ->
+                    srvTx#put (`RPY (E_reply.Ok None)) >>= fun () ->
+                    Cf_cmonad.return tl
+                | _ ->
+                    assert (not true);
+                    Cf_cmonad.return s.s_rxQ_
+            end >>= fun rxQ ->
+            assert (N32_map.member n s.s_cMap_);
+            let s = {
+                s with
+                s_rxQ_ = rxQ;
+                s_cMap_ = N32_map.delete n s.s_cMap_;
+            } in
+            store (S_online s) >>= loop
+        end
+        else
+            mapTx#put M_tx_final >>= loop
+
+    (*--- Process session event: Up_failed ---*)
+    and do_upRx_failed_ s n x =
+        let s = { s with s_cMap_ = N32_map.delete n s.s_cMap_ } in
+        store (S_online s) >>= fun () ->
+        do_abort_channels_ () >>= fun () ->
+        mapTx#put M_tx_abort >>= fun () ->
+        ctrlTx#put (C_rx_failure x)
+        (* done processing *)
+
+    (*--- Process session event: Up_reset ---*)
+    and do_upRx_reset_ s n =
+        assert (match s.s_variant_ with Sv_tuning _ -> true | _ -> false);
+        let s = { s with s_cMap_ = N32_map.delete n s.s_cMap_ } in
+        store (S_online s) >>= fun () ->
+        do_abort_channels_ () >>= fun () ->
+        mapTx#put M_tx_unlink >>= loop
+
+    (*--- Process management channel event ---*)
+    and do_c0Rx_ event =
+        load >>= function
+        | S_online s ->
+            begin
+                match event with
+                | C.C_rx_close _
+                | C.C_rx_error _
+                | C.C_rx_release
+                | C.C_rx_abort ->
+                    assert (not true);
+                    Cf_cmonad.return ()
+                | C.C_rx_exchange exch ->
+                    duplex >>= fun (srvIO, (srvRx, srvTx)) ->
+                    let srv = new union_service_exchange srvIO in
+                    srv#link exch >>= fun () ->
+                    let crx = C0_rx_union (srvRx, srvTx) in
+                    let rxQ = Cf_deque.A.push crx s.s_rxQ_ in
+                    store (S_online { s with s_rxQ_ = rxQ }) >>= loop
+            end
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+
+    (*--- Process control event ---*)
+    and do_ctrlRx_ = function
+        | C_tx_greeting (greetingMsg, srvList) ->
+            do_ctrlRx_greeting_ greetingMsg srvList
+        | C_tx_error errorMsg ->
+            do_ctrlRx_error_ errorMsg
+        | C_tx_start client ->
+            do_ctrlRx_start_ client
+        | C_tx_close closeMsg ->
+            do_ctrlRx_close_ closeMsg
+        | C_tx_release ->
+            do_ctrlRx_release_ ()
+        | C_tx_abort ->
+            do_ctrlRx_abort_ ()
+    
+    (*--- Process control event: C_tx_greeting ---*)
+    and do_ctrlRx_greeting_ greetingMsg srvList =
+        load >>= function
+        | S_greeting (dnTx, rxState, S0_tx0 srvTx) ->
+            srvTx#put (`Greeting greetingMsg) >>= fun () ->
+            store begin
+                match rxState with
+                | S0_rx1 loc -> online_ dnTx role loc srvList
+                | _ -> S_greeting (dnTx, rxState, S0_tx1 srvList)
+            end >>= loop
+        | _ ->
+            do_control_error_ ()
+    
+    (*--- Process control event: C_tx_error ---*)
+    and do_ctrlRx_error_ errorMsg =
+        load >>= function
+        | S_greeting (dnTx, rxState, S0_tx0 srvTx) ->
+            srvTx#put (`Decline errorMsg) >>= fun () ->
+            store (S_greeting (dnTx, rxState, S0_txfin)) >>= loop
+        | S_online ({ s_variant_ = Sv_close_rx None } as s) ->
+            begin
+                match Cf_deque.B.pop s.s_rxQ_ with
+                | Some (C0_rx_close (n, srvTx), tl) ->
+                    assert (n = Int32.zero);
+                    srvTx#put (`ERR errorMsg) >>= fun () ->
+                    Cf_cmonad.return tl
+                | _ ->
+                    assert (not true);
+                    Cf_cmonad.return s.s_rxQ_
+            end >>= fun rxQ ->
+            let s = { s with s_rxQ_ = rxQ } in
+            store (S_online s) >>= loop
+        | _ ->
+            do_control_error_ ()
+    
+    (*--- Process control event: C_tx_start ---*)
+    and do_ctrlRx_start_ client =
+        let client = (client :> C.client) in
+        load >>= function
+        | S_online ({ s_variant_ = Sv_ready } as s) ->
+            simplex >>= fun (startRx, startTx) ->
+            let startRx = (startRx :> C.M_start.t rx) in
+            let startTx = (startTx :> C.M_start.t tx) in
+            simplex >>= fun (connectRx, connectTx) ->
+            let connectRx = (connectRx :> C.client_start_t rx) in
+            let connectTx = (connectTx :> C.client_start_t tx) in
+            client#connect startTx connectRx >>= fun () ->
+            let op = C0_tx_start0 (startRx, connectTx) in
+            let txQ = Cf_deque.A.push op s.s_txQ_ in
+            let s = { s with s_txQ_ = txQ } in
+            store (S_online s) >>= loop
+        | _ ->
+            do_control_error_ ()
+    
+    (*--- Process control event: C_tx_close ---*)
+    and do_ctrlRx_close_ closeMsg =
+        load >>= function
+        | S_online ({ s_variant_ = Sv_ready } as s) ->
+            duplex >>= fun ((closeRpyRx, cancelTx), exchIO) ->
+            let closeAux = {
+                M_close.number = Int32.zero;
+                M_close.code = closeMsg.C.M_close.code;
+                M_close.lang = closeMsg.C.M_close.lang;
+                M_close.content = closeMsg.C.M_close.content;
+            } in
+            let exchObj = new close_client_exchange exchIO closeAux in
+            exchObj#link >>= fun exch ->
+            do_release_channels_ s >>= fun u ->
+            begin
+                if N32_set.empty u then begin
+                    c0Tx#put (C.C_tx_exchange exch) >>= fun () ->
+                    Cf_cmonad.return None
+                end
+                else
+                    Cf_cmonad.return (Some exch)
+            end >>= fun exchOpt ->
+            let txOp = C0_tx_close_c0 (closeRpyRx, cancelTx, exchOpt) in
+            let txQ = Cf_deque.A.push txOp s.s_txQ_ in
+            let s = { s with s_txQ_ = txQ; s_variant_ = Sv_close_tx u } in
+            store (S_online s) >>= loop
+        | _ ->
+            do_control_error_ ()
+
+    (*--- Process control event: C_tx_release ---*)
+    and do_ctrlRx_release_ () =
+        load >>= function
+        | S_online ({ s_variant_ = Sv_close_rx None } as s) ->
+            do_release_channels_ s >>= fun u ->
+            let s = { s with s_variant_ = Sv_close_rx (Some u) } in
+            store (S_online s) >>= fun () ->
+            if N32_set.empty u then begin
+                let srvTx =
+                    match Cf_deque.B.pop s.s_rxQ_ with
+                    | Some (C0_rx_close (n, srvTx), tl) ->
+                        assert (n = Int32.zero);
+                        assert (Cf_deque.empty tl);
+                        srvTx
+                    | _ ->
+                        assert (not true);
+                        new tx null
+                in
+                let sync = upTx#put (Up_sync Int32.zero) in
+                srvTx#put (`RPY (E_reply.Ok (Some sync))) >>= loop
+            end
+            else
+                loop ()
+        | _ ->
+            do_control_error_ ()
+
+    (*--- Process control event: C_tx_abort ---*)
+    and do_ctrlRx_abort_ () =
+        do_abort_channels_ () >>= fun () ->
+        mapTx#put M_tx_abort
+        (* done processing *)
+
+    (*--- Get initial service message ---*)
+    and get_initMsgRx_ dnTx txState initMsgRx =
+        let initMsgRx = (initMsgRx :> 'initMsg rx) in
+        initMsgRx#get begin function
+            | `Fail x ->
+                do_abort_channels_ () >>= fun () ->
+                ctrlTx#put (C_rx_failure x)
+                (* done processing *)
+            | `Decline errorMsg ->
+                do_abort_channels_ () >>= fun () ->
+                store S_final >>= fun () ->
+                ctrlTx#put (C_rx_error errorMsg) >>= loop
+            | `Greeting greetingMsg ->
+                let loc = p.p_newLocale_ greetingMsg.M_greeting.localize in
+                let ctrlPut = ctrlTx#put (C_rx_greeting greetingMsg) in
+                begin
+                    match txState with
+                    | S0_tx0 _ ->
+                        store (S_greeting (dnTx, S0_rx1 loc, txState))
+                            >>= fun () ->
+                        ctrlPut
+                    | S0_tx1 srvList ->
+                        store (online_ dnTx role loc srvList) >>= fun () ->
+                        ctrlPut
+                    | S0_txfin ->
+                        store S_final
+                end >>= loop
+        end
+
+    (*--- Get response from service ---*)
+    and get_c0SrvRspRx_ s n srvTx srvRspRx =
+        let srvRspRx = (srvRspRx :> C.service_start_t rx) in
+        let srvTx = (srvTx :> 'c0srvtx tx) in
+        srvRspRx#get begin function
+            | C.SS_profile (profileMsg, obj) ->
+                mapTx#put (M_tx_start n) >>= fun () ->
+                let rxOp = C0_rx_start1 (n, srvTx, profileMsg, obj) in
+                let rxQ = Cf_deque.B.push rxOp s.s_rxQ_ in
+                let sv = if obj#tuning then Sv_tuning n else Sv_ready in
+                let s = { s with s_rxQ_ = rxQ; s_variant_ = sv; } in
+                store (S_online s) >>= loop
+           | C.SS_error errorMsg ->
+                srvTx#put (`ERR errorMsg) >>= fun () ->
+                store (S_online s) >>= loop
+        end
+
+    (*--- Get message for management channel ---*)
+    and get_c0SrvRx_ s srvRx srvTx =
+        let srvRx = (srvRx :> 'c0srvrx rx) in
+        let srvTx = (srvTx :> 'c0srvtx tx) in
+        srvRx#get begin fun v ->
+            match v with
+            | Cf_exnopt.U (E_request.Start startAuxMsg) ->
+                do_c0SrvRx_start_ s srvTx startAuxMsg
+            | Cf_exnopt.U (E_request.Close closeAuxMsg) ->
+                do_c0SrvRx_close_ s srvTx closeAuxMsg
+            | Cf_exnopt.X x ->
+                let code = Err.Std_code.general_syntax_error in
+                srvTx#put (cons_errorMsg_ s code) >>= fun () ->
+                store (S_online s) >>= loop
+        end
+    
+    (*--- Do channel management service request: E_request.Start ---*)
+    and do_c0SrvRx_start_ s srvTx startAuxMsg =
+        let srvTx = (srvTx :> 'c0srvtx tx) in
+        let n = startAuxMsg.M_start.number in
+        if n > Int32.zero && N32_map.member n s.s_cMap_ then begin
+            let code = Err.Std_code.parameter_syntax_error in
+            srvTx#put (cons_errorMsg_ s code) >>= fun () ->
+            store (S_online s) >>= loop
+        end
+        else if s.s_variant_ <> Sv_ready then begin
+            let code = Err.Std_code.channel_request_declined in
+            srvTx#put (cons_errorMsg_ s code) >>= fun () ->
+            store (S_online s) >>= loop
+        end
+        else begin
+            let startMsg = {
+                C.M_start.serverName = startAuxMsg.M_start.serverName;
+                C.M_start.profiles = startAuxMsg.M_start.profiles;
+            } in
+            let rec foldF = function
+                | [] ->
+                    let code = Err.Std_code.channel_request_declined in
+                    srvTx#put (cons_errorMsg_ s code) >>= fun () ->
+                    Cf_cmonad.return s
+                | hd :: tl ->
+                    let hd = (hd :> C.service) in
+                    hd#accept startMsg >>= function
+                    | None ->
+                        foldF tl
+                    | Some srvRspRx ->
+                        let rxOp = C0_rx_start0 (n, srvTx, srvRspRx) in
+                        let rxQ = Cf_deque.B.push rxOp s.s_rxQ_ in
+                        Cf_cmonad.return { s with s_rxQ_ = rxQ }
+            in
+            foldF s.s_srvList_ >>= fun s ->
+            store (S_online s) >>= loop
+        end
+    
+    (*--- Do channel management service request: E_request.Close ---*)
+    and do_c0SrvRx_close_ s srvTx closeAuxMsg =
+        let n = closeAuxMsg.M_close.number in
+        let closeMsg = {
+            C.M_close.code = closeAuxMsg.M_close.code;
+            C.M_close.lang = closeAuxMsg.M_close.lang;
+            C.M_close.content = closeAuxMsg.M_close.content;
+        } in
+        if n > Int32.zero then
+            do_c0SrvRx_close_aux_ s srvTx n closeMsg
+        else
+            do_c0SrvRx_close_c0_ s srvTx closeMsg
+    
+    (*--- Do channel management service request: E_request.Close n=0 ---*)
+    and do_c0SrvRx_close_c0_ s srvTx closeMsg =
+        let srvTx = (srvTx :> 'c0srvtx tx) in
+        match s.s_variant_ with
+        | Sv_close_tx _ ->
+            srvTx#put (`RPY (E_reply.Ok None)) >>= loop
+        | Sv_close_rx _
+        | Sv_unlinking ->
+            assert (not true);
+            Cf_cmonad.return ()
+        | Sv_tuning _ ->
+            srvTx#put (cons_errorMsg_ s 550) >>= loop
+        | Sv_ready ->
+            ctrlTx#put (C_rx_close closeMsg) >>= fun () ->
+            let rxOp = C0_rx_close (Int32.zero, srvTx) in
+            let rxQ = Cf_deque.A.push rxOp s.s_rxQ_ in
+            let s = { s with s_variant_ = Sv_close_rx None; s_rxQ_ = rxQ } in
+            store (S_online s) >>= loop
+
+    (*--- Do channel management service request: E_request.Close n>0 ---*)