Commits

Anonymous committed 905f3a0

Initial submit of OCaml NAE BEEP Core.

Comments (0)

Files changed (1)

beep/beep_tcp_mapping.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  beep_tcp_mapping.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(*
+module type X_tag = sig
+    val tag: string
+end
+
+module X_create(T: X_tag) = struct
+    let tag = Printf.sprintf "[%s] " T.tag
+
+    let printf fmt =
+        print_string tag;
+        Printf.printf fmt
+    
+    let sprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
+end
+
+module X = X_create(struct let tag = "Beep_tcp_mapping" end)
+*)
+
+module Err = Beep_error
+module F = Beep_frame
+module F_tcp = Beep_tcp_frame
+module U = Beep_transport
+module IO = Iom_sock_stream
+
+open Iom_reactor
+open Cf_cmonad.Op
+
+class config =
+    object
+        inherit Beep_transport.config
+        
+        method stream_max_window = 4096
+        method max_payload_size = 1000
+        method min_fragment_size = 1000
+    end
+
+type parameters_t = {
+    p_streamMaxWindow: int;
+    p_minSeqWindow_: int32;
+    p_maxPayloadSize: int;
+    p_minFragmentSize: int;
+}
+
+let parameters_ cfg =
+    let cfg = (cfg :> config) in
+    let maxWindow = cfg#stream_max_window in {
+        p_streamMaxWindow = maxWindow;
+        p_minSeqWindow_ = Int32.of_int (maxWindow / 2);
+        p_maxPayloadSize = cfg#max_payload_size;
+        p_minFragmentSize = cfg#min_fragment_size;
+    }
+
+module N32_map = Cf_rbtree.Map(Int32)
+
+type dn_event_t =
+    | Dn_frame of F.fhandle_t
+    | Dn_seq of F_tcp.seq_header_t
+    | Dn_final of int32
+
+type up_event_t =
+    | Up_frame of F.t * int
+    | Up_seq of F_tcp.seq_header_t
+    | Up_final
+
+type channel_t = {
+    c_rx_limit_: int32;
+    c_rx_ackno_: int32;
+    c_tx_limit_: int32;
+    c_tx_fhQ_: F.fhandle_t Cf_deque.t;
+}
+
+let channel0_ = {
+    c_rx_limit_ = 4096l;
+    c_rx_ackno_ = Int32.zero;
+    c_tx_limit_ = 4096l;
+    c_tx_fhQ_ = Cf_deque.nil;
+}
+
+type ack_t = {
+    ack_ackno_: int32;
+    ack_size_: int;
+}
+
+(*
+let chCounter_ = ref 0
+*)
+
+let channel_:
+    parameters_t -> int32 -> (U.frame_tx_t #rx * U.frame_rx_t #tx) ->
+    (up_event_t #rx * dn_event_t #tx) -> ('s, unit) t
+= fun p n (frameRx, frameTx) (upRx, dnTx) ->
+    (*
+    incr chCounter_;
+    let module X_input = struct
+        let tag = Printf.sprintf "Beep_tcp_mapping.channel_ (%u) n=%lu"
+            !chCounter_ n
+    end in
+    let module X = X_create(X_input) in
+    *)
+
+    (*--- Normalize parameters ---*)
+    let frameRx = (frameRx :> U.frame_tx_t rx) in
+    let frameTx = (frameTx :> U.frame_rx_t tx) in
+    let upRx = (upRx :> up_event_t rx) in
+    let dnTx = (dnTx :> dn_event_t tx) in
+    
+    (*--- Create acknowledgement wire ---*)
+    simplex >>= fun (ackRx, ackTx) ->
+    let ackRx = (ackRx :> ack_t rx) in
+    let ackTx = (ackTx :> ack_t tx) in
+    
+    (*--- Main loop ---*)
+    let rec loop () =
+        guard begin
+            frameRx#get do_frameRx_ >>= fun () ->
+            upRx#get do_upRx_ >>= fun () ->
+            ackRx#get do_ackRx_
+        end
+    
+    (*--- Process transmit event ---*)    
+    and do_frameRx_ = function
+        | U.F_tx_data fh -> do_frameRx_data_ fh
+        | U.F_tx_final -> do_frameRx_final_ ()
+        | U.F_tx_abort -> do_frameRx_abort_ ()
+    
+    (*--- Process transmit event: U.F_tx_data ---*)    
+    and do_frameRx_data_ fh =
+        load >>= fun c ->
+        let frame = fh.F.h_data in
+        assert (let header = F.header frame in header.F.channel = n);
+        let fhQ = c.c_tx_fhQ_ in
+        let fhQ =
+            match Cf_deque.A.pop fhQ with
+            | None -> Cf_deque.A.push fh Cf_deque.nil
+            | Some (hd, tl) ->
+                match F.join_fhandle hd fh with
+                | None -> Cf_deque.A.push fh fhQ
+                | Some fh -> Cf_deque.A.push fh tl
+        in
+        do_drain_fhQ_ c.c_tx_limit_ fhQ >>= fun fhQ ->
+        store { c with c_tx_fhQ_ = fhQ } >>= loop
+    
+    (*--- Put a data frame to the main reactor ---*)
+    and do_drain_fhQ_ limit fhQ =
+        match Cf_deque.B.pop fhQ with
+        | None ->
+            Cf_cmonad.return Cf_deque.nil
+        | Some (hd, tl) ->
+            let frame = hd.F.h_data in
+            let size = F.size frame in
+            if F.eom frame || size >= p.p_minFragmentSize then begin
+                let header = F.header frame in
+                let window = Int32.sub limit header.F.seqno in
+                let maxsize =
+                    if window > Int32.of_int p.p_maxPayloadSize then
+                        p.p_maxPayloadSize
+                    else
+                        size
+                in
+                match F.split maxsize frame with
+                | None ->
+                    dnTx#put (Dn_frame hd) >>= fun () ->
+                    do_drain_fhQ_ limit tl
+                | Some (f1, f2) ->
+                    let fh1 = { F.h_data = f1; F.h_ack = None } in
+                    let fh2 = { F.h_data = f2; F.h_ack = hd.F.h_ack } in
+                    let fhQ = Cf_deque.B.push fh2 tl in
+                    dnTx#put (Dn_frame fh1) >>= fun () ->
+                    do_drain_fhQ_ limit fhQ
+            end
+            else
+                Cf_cmonad.return fhQ
+    
+    (*--- Process transmit event: U.F_tx_final ---*)    
+    and do_frameRx_final_ () =
+        load >>= fun c ->
+        assert (Cf_deque.empty c.c_tx_fhQ_);
+        dnTx#put (Dn_final n) >>= loop
+
+    (*--- Process transmit event: U.F_tx_abort ---*)    
+    and do_frameRx_abort_ () =
+        Cf_cmonad.return ()
+        (* done processing *)
+
+    (*--- Process receive event ---*)    
+    and do_upRx_ = function
+        | Up_frame (frame, size) -> do_upRx_frame_ frame size
+        | Up_seq seq -> do_upRx_seq_ seq
+        | Up_final -> do_upRx_final_ ()
+
+    (*--- Process receive event: Up_frame ---*)    
+    and do_upRx_frame_ frame size =
+        assert (size = F.size frame);
+        let header = F.header frame in
+        assert (header.F.channel = n);
+        let ackno = Int32.add header.F.seqno (Int32.of_int size) in
+        let ack = Some (ackTx#put { ack_ackno_ = ackno; ack_size_ = size }) in
+        let fh = { F.h_data = frame; F.h_ack = ack } in
+        frameTx#put (U.F_rx_data fh) >>= loop
+
+    (*--- Process receive event: Up_seq ---*)    
+    and do_upRx_seq_ seq =
+        assert (seq.F_tcp.seq_channel = n);
+        load >>= fun c ->
+        let limit = Int32.add seq.F_tcp.seq_ackno seq.F_tcp.seq_window in
+        let dlim = Int32.sub limit c.c_tx_limit_ in
+        if dlim > Int32.zero then begin
+            do_drain_fhQ_ limit c.c_tx_fhQ_ >>= fun fhQ ->
+            store { c with c_tx_limit_ = limit; c_tx_fhQ_ = fhQ } >>= loop
+        end
+        else
+            store { c with c_tx_limit_ = limit } >>= loop
+    
+    (*--- Process receive event: Up_final ---*)
+    and do_upRx_final_ () =
+        frameTx#put U.F_rx_final
+        (* done processing *)
+
+    (*--- Process frame acknowledgement ---*)
+    and do_ackRx_ ack =
+        load >>= fun c ->
+        let { ack_ackno_ = ackno; ack_size_ = size } = ack in
+        let dack = Int32.sub ackno c.c_rx_ackno_ in
+        let ackno = if dack < Int32.zero then c.c_rx_ackno_ else ackno in
+        let limit = Int32.add c.c_rx_limit_ (Int32.of_int size) in
+        let window = Int32.sub limit ackno in
+        store { c with c_rx_limit_ = limit; c_rx_ackno_ = ackno } >>= fun () ->
+        if dack > Int32.zero && window >= p.p_minSeqWindow_ then begin
+            let seq = {
+                F_tcp.seq_channel = n;
+                F_tcp.seq_ackno = ackno;
+                F_tcp.seq_window = window;
+            } in
+            dnTx#put (Dn_seq seq) >>= loop
+        end
+        else
+            loop ()
+    in
+    
+    (*--- Start main loop ---*)
+    start (loop ()) channel0_
+
+type s_tx_variant_t =
+    | S_ready
+    | S_pending
+    | S_blocked
+    | S_draining
+    | S_unlinking
+
+type s_rx_link_t = {
+    rl_upTx_: up_event_t tx;
+    rl_window_: int32;
+    rl_final_: bool;
+}
+
+let rlInit_ = {
+    rl_upTx_ = new tx null;
+    rl_window_ = 4096l;
+    rl_final_ = false;
+}
+
+type state_t = {
+    s_rxBuffer_: Cf_message.t;
+    s_rlMap_: s_rx_link_t N32_map.t;
+    s_seqHdrMap_: F_tcp.seq_header_t N32_map.t;
+    s_fhQMap_: F.fhandle_t Cf_deque.t N32_map.t;
+    s_txVariant_: s_tx_variant_t;
+}
+
+let state0_ = {
+    s_rxBuffer_ = [];
+    s_rlMap_ = N32_map.nil;
+    s_seqHdrMap_ = N32_map.nil;
+    s_fhQMap_ = N32_map.nil;
+    s_txVariant_ = S_ready;
+}
+
+let header_check_ ~cmap ~chan ~size =
+    try
+        let rl = N32_map.search chan cmap in
+        if rl.rl_window_ < Int32.of_int size then
+            raise (Beep_error.X Beep_error.X_window_exceeded)
+    with
+    | Not_found ->
+        raise (Beep_error.X Beep_error.X_channel_closed)
+
+(*
+let sCounter_ = ref 0
+*)
+
+let session:
+    ?cfg:#config ->
+    (Beep_transport.transport_tx_t #rx * Beep_transport.transport_rx_t #tx) ->
+    (Iom_sock_stream.endpoint_rx_t #rx * Iom_sock_stream.endpoint_tx_t #tx) ->
+    kernel_t -> ('s, unit) t
+= fun ?cfg (mapRx, mapTx) (fdRx, fdTx) k ->
+    (*
+    incr sCounter_;
+    let module X_input = struct
+        let tag = Printf.sprintf "Beep_tcp_mapping.session %u" !sCounter_
+    end in
+    let module X = X_create(X_input) in
+    *)
+
+    (*--- Normalize parameters ---*)
+    let p =
+        match cfg with
+        | None -> parameters_ (new config)
+        | Some cfg -> parameters_ cfg
+    in
+    let mapRx = (mapRx :> U.transport_tx_t rx) in
+    let mapTx = (mapTx :> U.transport_rx_t tx) in
+    let fdRx = (fdRx :> IO.endpoint_rx_t rx) in
+    let fdTx = (fdTx :> IO.endpoint_tx_t tx) in
+    
+    (*--- Create mux down wire ---*)
+    simplex >>= fun (dnRx, dnTx) ->
+    let dnRx = (dnRx :> dn_event_t rx) in
+    let dnTx = (dnTx :> dn_event_t tx) in
+
+    (*--- Create idle reactor ---*)
+    duplex >>= fun ((idleRx, idleTx), (idleRx', idleTx')) ->
+    let idleRx = (idleRx :> e_idle_t rx) in
+    let idleTx = (idleTx :> control_t tx) in
+    idle ~c:idleRx' ~r:idleTx' k >>= fun () ->
+    
+    (*--- Main loop ---*)
+    let rec loop () =
+        load >>= fun s ->
+        guard begin
+            mapRx#get do_mapRx_ >>= fun () ->
+            fdRx#get do_fdRx_ >>= fun () ->
+            match s.s_txVariant_ with
+            | S_ready ->
+                dnRx#get do_dnRx_
+            | S_draining
+            | S_unlinking
+            | S_pending ->
+                dnRx#get do_dnRx_ >>= fun () ->
+                idleRx#get do_idleRx_
+            | S_blocked ->
+                Cf_cmonad.return ()
+        end
+    
+    (*--- Process BEEP transport mapping event ---*)
+    and do_mapRx_ = function
+        | U.M_tx_start n -> do_mapRx_start_ n
+        | U.M_tx_final -> do_mapRx_final_ ()
+        | U.M_tx_unlink -> do_mapRx_unlink_ ()
+        | U.M_tx_abort -> do_mapRx_abort_ ()
+    
+    (*--- Process BEEP transport mapping event: M_tx_start ---*)
+    and do_mapRx_start_ n =
+        duplex >>= fun (frameIO, frameIO') ->
+        simplex >>= fun (upRx, upTx) ->
+        let muxIO = upRx, dnTx in
+        channel_ p n frameIO muxIO >>= fun () ->
+        modify begin fun s ->
+            assert (not (N32_map.member n s.s_rlMap_));
+            let rl = { rlInit_ with rl_upTx_ = upTx } in
+            { s with s_rlMap_ = N32_map.replace (n, rl) s.s_rlMap_ }
+        end >>= fun () ->
+        mapTx#put (U.M_rx_start (n, frameIO')) >>= loop
+    
+    (*--- Process BEEP transport mapping event: M_tx_final ---*)
+    and do_mapRx_final_ () =
+        load >>= fun s ->
+        assert (N32_map.empty s.s_rlMap_);
+        match s.s_txVariant_ with
+        | S_ready ->
+            assert (N32_map.empty s.s_seqHdrMap_);
+            assert (N32_map.empty s.s_fhQMap_);
+            store { s with s_txVariant_ = S_draining } >>= fun () ->
+            fdTx#put IO.IO_tx_release >>= loop
+        | S_blocked
+        | S_pending ->
+            store {
+                s with
+                s_txVariant_ = S_draining;
+                s_seqHdrMap_ = N32_map.nil;
+            } >>= loop
+        | S_draining
+        | S_unlinking->
+            assert (not true);
+            Cf_cmonad.return ()
+    
+    (*--- Process BEEP transport mapping event: M_tx_unlink ---*)
+    and do_mapRx_unlink_ () =
+        load >>= fun s ->
+        assert (N32_map.empty s.s_rlMap_);
+        match s.s_txVariant_ with
+        | S_ready ->
+            assert (N32_map.empty s.s_seqHdrMap_);
+            assert (N32_map.empty s.s_fhQMap_);
+            fdTx#put IO.IO_tx_unlink >>= loop
+            (* done processing *)
+        | S_blocked
+        | S_pending ->
+            store {
+                s with
+                s_txVariant_ = S_unlinking;
+                s_seqHdrMap_ = N32_map.nil;
+            } >>= loop
+        | S_draining
+        | S_unlinking ->
+            assert (not true);
+            Cf_cmonad.return ()
+    
+    (*--- Process BEEP transport mapping event: M_tx_abort ---*)
+    and do_mapRx_abort_ () =
+        fdTx#put IO.IO_tx_reset
+        (* done processing *)
+    
+    (*--- Process SOCK_STREAM I/O event ---*)
+    and do_fdRx_ event =
+        load >>= fun s ->
+        match event with
+        | IO.IO_rx_error x -> do_fdRx_error_ s x
+        | IO.IO_rx_data data -> do_fdRx_data_ s data
+        | IO.IO_rx_release -> do_fdRx_release_ s
+        | IO.IO_rx_blocked -> do_fdRx_blocked_ s
+        | IO.IO_rx_ready -> do_fdRx_ready_ s
+        | IO.IO_rx_closed -> do_fdRx_closed_ s
+        | IO.IO_rx_unlinked -> do_fdRx_unlinked_ s
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_error ---*)
+    and do_fdRx_error_ s x =
+        mapTx#put (U.M_rx_failure x)
+        (* done processing *)
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_data ---*)
+    and do_fdRx_data_ s data =
+        let len = Cf_message.length data in
+        fdTx#put (IO.IO_tx_extend len) >>= fun () ->
+        let data = s.s_rxBuffer_ @ data in
+        let checkF = header_check_ ~cmap:s.s_rlMap_ in
+        match
+            try Cf_exnopt.U (F_tcp.parse#parselist ~check:checkF data)
+            with e -> Cf_exnopt.X e
+        with
+        | Cf_exnopt.X (Beep_error.X e) ->
+            do_beep_error_ e
+        | Cf_exnopt.X _ ->
+            do_beep_error_ Beep_error.X_bad_frame_syntax
+        | Cf_exnopt.U (flist, data) ->
+            do_fdRx_data_aux_ s.s_rlMap_ flist >>= function
+            | Cf_either.A rlMap ->
+                store {
+                    s with
+                    s_rxBuffer_ = data;
+                    s_rlMap_ = rlMap;
+                } >>= loop
+            | Cf_either.B x ->
+                do_beep_error_ x
+    
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_data (each frame) ---*)
+    and do_fdRx_data_aux_ rlMap = function
+        | [] ->
+            Cf_cmonad.return (Cf_either.A rlMap)
+        | hd :: tl ->
+            (*
+            X.printf "(RX) %s\n" (F_tcp.header_to_string hd);
+            flush stdout;
+            *)
+            match hd with
+            | `SEQ seq ->
+                do_fdRx_data_auxSeq_ rlMap seq tl >>= begin function
+                    | Cf_either.A rlMap ->
+                        do_fdRx_data_aux_ rlMap tl
+                    | v ->
+                        Cf_cmonad.return v
+                end
+            | #F.t as frame ->
+                let frame = (frame :> F.t) in
+                do_fdRx_data_auxData_ rlMap frame tl >>= begin function
+                    | Cf_either.A rlMap ->
+                        do_fdRx_data_aux_ rlMap tl
+                    | v ->
+                        Cf_cmonad.return v
+                end
+    
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_data (each `SEQ frame) ---*)
+    and do_fdRx_data_auxSeq_ rlMap seq tl =
+        let n = seq.F_tcp.seq_channel in
+        match
+            try Some (N32_map.search n rlMap) with Not_found -> None
+        with
+        | Some { rl_upTx_ = upTx; rl_final_ = false } ->
+            upTx#put (Up_seq seq) >>= fun () ->
+            Cf_cmonad.return (Cf_either.A rlMap)
+        | _ ->
+            Cf_cmonad.return (Cf_either.B Err.X_channel_closed)
+    
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_data (each data frame) ---*)
+    and do_fdRx_data_auxData_ rlMap frame tl =
+        let header = F.header frame in
+        let n = header.F.channel in
+        match
+            try Some (N32_map.search n rlMap) with Not_found -> None
+        with
+        | Some ({ rl_upTx_ = upTx; rl_final_ = false } as rl) ->
+            let size = F.size frame in
+            upTx#put (Up_frame (frame, size)) >>= fun () ->
+            let window = Int32.sub rl.rl_window_ (Int32.of_int size) in
+            assert (window >= Int32.zero);
+            let rl = { rl with rl_window_ = window } in
+            let rlMap = N32_map.replace (n, rl) rlMap in
+            Cf_cmonad.return (Cf_either.A rlMap)
+        | _ ->
+            Cf_cmonad.return (Cf_either.B Err.X_channel_closed)
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_release ---*)
+    and do_fdRx_release_ s =
+        if s.s_txVariant_ = S_draining then begin
+            assert (N32_map.empty s.s_rlMap_);
+            loop ()
+        end
+        else begin
+            do_beep_error_ Err.X_transport_error
+        end
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_blocked ---*)
+    and do_fdRx_blocked_ s =
+        assert (s.s_txVariant_ = S_ready);
+        store { s with s_txVariant_ = S_blocked } >>= fun () ->
+        idleTx#put C_unload >>= loop
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_ready ---*)
+    and do_fdRx_ready_ s =
+        assert (s.s_txVariant_ = S_blocked);
+        store { s with s_txVariant_ = S_ready } >>= loop
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_closed ---*)
+    and do_fdRx_closed_ s =
+        mapTx#put U.M_rx_final
+        (* done processing *)
+
+    (*--- Process SOCK_STREAM I/O event: IO.IO_rx_unlinked ---*)
+    and do_fdRx_unlinked_ s =
+        mapTx#put U.M_rx_unlink
+        (* done processing *)
+    
+    (*--- Process down mux event ---*)
+    and do_dnRx_ event =
+        load >>= fun s ->
+        assert (s.s_txVariant_ <> S_blocked);
+        match event with
+        | Dn_frame fh -> do_dnRx_frame_ s fh
+        | Dn_seq seq -> do_dnRx_seq_ s seq
+        | Dn_final n -> do_dnRx_final_ s n
+    
+    (*--- Process down mux event: Dn_frame ---*)
+    and do_dnRx_frame_ s fh =
+        let frame = fh.F.h_data in
+        let header = F.header frame in
+        let channo = header.F.channel in
+        assert (N32_map.member channo s.s_rlMap_);
+        let fhQ =
+            try
+                let fhQ = N32_map.search channo s.s_fhQMap_ in
+                assert (not (Cf_deque.empty fhQ));
+                fhQ
+            with
+            | Not_found ->
+                Cf_deque.nil
+        in
+        let fhQ = Cf_deque.A.push fh fhQ in
+        let fhQMap = N32_map.replace (channo, fhQ) s.s_fhQMap_ in
+        match s.s_txVariant_ with
+        | S_ready ->
+            store {
+                s with
+                s_fhQMap_ = fhQMap;
+                s_txVariant_ = S_pending;
+            } >>= fun () ->
+            idleTx#put C_load >>= loop
+        | v ->
+            store { s with s_fhQMap_ = fhQMap } >>= loop
+    
+    (*--- Process down mux event: Dn_seq ---*)
+    and do_dnRx_seq_ s seq =
+        let channo = seq.F_tcp.seq_channel in
+        assert (N32_map.member channo s.s_rlMap_);
+        let seqHdrMap = N32_map.replace (channo, seq) s.s_seqHdrMap_ in
+        match s.s_txVariant_ with
+        | S_ready ->
+            store {
+                s with
+                s_seqHdrMap_ = seqHdrMap;
+                s_txVariant_ = S_pending;
+            } >>= fun () ->
+            idleTx#put C_load >>= loop
+        | _ ->
+            store { s with s_seqHdrMap_ = seqHdrMap } >>= loop
+    
+    (*--- Process down mux event: Dn_final ---*)
+    and do_dnRx_final_ s n =
+        match
+            try Some (N32_map.search n s.s_rlMap_)
+            with Not_found -> None
+        with
+        | Some ({ rl_upTx_ = upTx; rl_final_ = false } as rl) ->
+            let seqHdrMap = N32_map.delete n s.s_seqHdrMap_ in
+            begin
+                if N32_map.member n s.s_fhQMap_ then begin
+                    let rl = { rl with rl_final_ = true } in
+                    Cf_cmonad.return (N32_map.replace (n, rl) s.s_rlMap_)
+                end
+                else begin
+                    upTx#put Up_final >>= fun () ->
+                    Cf_cmonad.return (N32_map.delete n s.s_rlMap_)
+                end
+            end >>= fun rlMap ->
+            let s = { s with s_seqHdrMap_ = seqHdrMap; s_rlMap_ = rlMap } in
+            store s >>= loop
+        | _ ->
+            assert (not true);
+            Cf_cmonad.return ()
+    
+    (*--- Process idle reactor event ---*)
+    and do_idleRx_ (`Idle tai) =
+        load >>= fun s ->
+        assert begin
+            match s.s_txVariant_ with
+            | (S_blocked | S_ready) -> false
+            | _ -> true
+        end;
+        let seqHdrMap = s.s_seqHdrMap_ in
+        let fhQMap = s.s_fhQMap_ in
+        let rlMap = s.s_rlMap_ in
+        let z = N32_map.to_seq_incr seqHdrMap in
+        let z = Cf_seq.map do_idleRx_seqHdrMap_ z in
+        Cf_seq.C.sequence z >>= fun () ->
+        begin
+            match
+                try Some (N32_map.extract Int32.zero fhQMap)
+                with Not_found -> None
+            with
+            | None ->
+                Cf_cmonad.return fhQMap
+            | Some (fhQ0, fhQMap) ->
+                let fhQ0Q = Cf_deque.A.push fhQ0 Cf_deque.nil in
+                do_idleRx_fhQQ_ fhQ0Q >>= fun () ->
+                Cf_cmonad.return fhQMap
+        end >>= fun fhQMap ->
+        let z = N32_map.to_seq_incr fhQMap in
+        let foldF fhQQ (_, fhQ) = Cf_deque.A.push fhQ fhQQ in
+        let fhQQ = Cf_seq.fold foldF Cf_deque.nil z in
+        do_idleRx_fhQQ_ fhQQ >>= fun () ->
+        do_idleRx_upFinal_ s rlMap (Cf_seq.first z)
+    
+    (*--- Process idle reactor event: transmit SEQ frame ---*)
+    and do_idleRx_seqHdrMap_ (channo, seq) =
+        let frame = `SEQ seq in
+        (*
+        X.printf "(TX) %s\n" (F_tcp.header_to_string frame);
+        flush stdout;
+        *)
+        fdTx#put (IO.IO_tx_data (F_tcp.emit frame))
+
+    (*--- Process idle reactor event: transmit BEEP data frame ---*)
+    and do_idleRx_fhQQ_ fhQQ =
+        match Cf_deque.B.pop fhQQ with
+        | None ->
+            Cf_cmonad.return ()
+        | Some (fhQ, fhQQ) ->
+            match Cf_deque.B.pop fhQ with
+            | None ->
+                do_idleRx_fhQQ_ fhQQ
+            | Some ({ F.h_data = frame; F.h_ack = ack }, fhQ) ->
+                (*
+                X.printf "(TX) %s\n" (F.header_to_string frame);
+                flush stdout;
+                *)
+                fdTx#put (IO.IO_tx_data (F.emit frame)) >>= fun () ->
+                begin
+                    match ack with
+                    | None -> Cf_cmonad.return ()
+                    | Some ack -> start ack ()
+                end >>= fun () ->
+                do_idleRx_fhQQ_ fhQQ
+    
+    (*--- Process idle reactor event: send Up_final signal ---*)
+    and do_idleRx_upFinal_ s rlMap z =
+        match Lazy.force z with
+        | Cf_seq.Z when s.s_txVariant_ = S_draining ->
+            fdTx#put IO.IO_tx_release >>= fun () ->
+            store {
+                s with
+                s_seqHdrMap_ = N32_map.nil;
+                s_fhQMap_ = N32_map.nil;
+            } >>= loop
+        | Cf_seq.Z ->
+            assert (s.s_txVariant_ = S_pending);
+            store {
+                s with
+                s_rlMap_ = rlMap;
+                s_txVariant_ = S_ready;
+                s_seqHdrMap_ = N32_map.nil;
+                s_fhQMap_ = N32_map.nil;
+            } >>= loop
+        | Cf_seq.P (hd, tl) ->
+            match
+                try Some (N32_map.search hd rlMap)
+                with Not_found -> None
+            with
+            | Some { rl_upTx_ = upTx; rl_final_ = true } ->
+                upTx#put Up_final >>= fun () ->
+                let rlMap = N32_map.delete hd s.s_rlMap_ in
+                do_idleRx_upFinal_ s rlMap tl
+            | _ ->
+                do_idleRx_upFinal_ s rlMap tl
+
+    (*--- Throw BEEP control error: Err.X_control_error ---*)
+    and do_control_error_ () =
+        do_beep_error_ Err.X_control_error
+    
+    (*--- Throw BEEP error ---*)
+    and do_beep_error_ error =
+        (*
+        X.printf "do_beep_error_: %s\n" (Err.to_string error);
+        flush stdout;
+        *)
+        fdTx#put IO.IO_tx_reset >>= fun () ->
+        mapTx#put (U.M_rx_failure (Err.X error))
+        (* done processing *)
+    in
+    
+    (*--- Start main loop ---*)
+    fdTx#put (IO.IO_tx_window p.p_streamMaxWindow) >>= fun () ->
+    start (loop ()) state0_
+
+module type T = sig
+    module S: Cf_sock_stream.T
+    
+    val create:
+        ?cfg:#config -> S.t -> Beep_transport.role_t -> kernel_t ->
+        ('s, Beep_transport.control_rx_t rx * Beep_transport.control_tx_t tx) t
+end
+
+module Create(S: Cf_sock_stream.T) = struct
+    module S = S
+    module IO = IO.Create(S)
+
+    let create ?cfg fd role k =
+        duplex >>= fun (fdA, fdB) ->
+        IO.endpoint ~c:fdB fd k >>= fun () ->
+        duplex >>= fun (mapA, mapB) ->
+        session ?cfg mapA fdA k >>= fun () ->
+        duplex >>= fun (ctrlA, ctrlB) ->
+        U.core ?cfg mapB ctrlA role k >>= fun () ->
+        Cf_cmonad.return ctrlB
+end
+
+(*--- End of File [ beep_tcp_mapping.ml ] ---*)