Commits

Anonymous committed 0ffedc5

New files.

Comments (0)

Files changed (2)

iom/t/t_mirrorc.ml

+(*---------------------------------------------------------------------------*
+  PROGRAM  t_mirrorc.ml
+
+  Copyright (c) 2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+let jout = Cf_journal.stdout
+let _ = jout#setlimit `None
+
+module G = Iom_gadget
+module SR = Iom_sock_stream
+
+open Cf_cmonad.Op
+
+module type Discipline_T = sig
+    val create:
+        SR.endpoint_rx_t #G.rx -> SR.endpoint_rx_t #G.tx -> ('s, unit) G.t
+end
+
+module Discipline: Discipline_T = struct
+    let search_crlf_ =
+        let rec loop n s =
+            match Lazy.force s with
+            | Cf_seq.P ('\r', tl) ->
+                let n = succ n in begin
+                    match Lazy.force tl with
+                    | Cf_seq.P ('\n', _) ->
+                        succ n
+                    | s ->
+                        loop n tl
+                end
+            | Cf_seq.P (_, tl) ->
+                loop (succ n) tl
+            | Cf_seq.Z ->
+                0
+        in
+        fun m ->
+            let s = Cf_message.to_seq m in
+            let pos = loop 0 s in
+            Cf_message.split ~pos m
+
+    module SC = Cf_scmonad
+
+    module type SM = sig
+        type ('s, 'i, 'o, 'a) t = ('s, ('i, 'o) Cf_flow.t, 'a) SC.t
+
+        val read: ('s, 'i, 'o, 'i) t
+        val write: 'o -> ('s, 'i, 'o, unit) t
+        val finish: ('s, 'i, 'o, unit) t
+    end
+
+    module SM: SM = struct
+        type ('s, 'i, 'o, 'a) t = ('s, ('i, 'o) Cf_flow.t, 'a) SC.t
+
+        let read f w = lazy (Cf_flow.Q (fun a -> Lazy.force (f a w)))
+        let write o a b = SC.cont (fun w -> lazy (Cf_flow.P (o, w))) a b
+        let finish a b = SC.init (Lazy.lazy_from_val Cf_flow.Z) a b
+    end
+
+    module C = Cf_cmonad
+    open SC.Op
+
+    let create =
+        let rec loop () =
+            SM.read >>= function
+            | (SR.IO_rx_release | SR.IO_rx_closed as event) ->
+                SC.load >>= begin function
+                | [] ->
+                    SM.write event >>= fun () ->
+                    SM.finish
+                | msg ->
+                    SM.write (SR.IO_rx_data msg) >>= fun () ->
+                    SM.write event >>= fun () ->
+                    SM.finish
+                end
+            | SR.IO_rx_data m1 ->
+                SC.load >>= begin fun m0 ->
+                let m0, m1 = search_crlf_ (m0 @ m1) in
+                match m0 with
+                | [] ->
+                    SC.store m1 >>= loop
+                | _ ->
+                    SM.write (SR.IO_rx_data m0) >>= fun () ->
+                    SC.store m1 >>= loop
+                end
+            | event ->
+                SM.write event
+        in
+        fun rx tx ->
+            let w =
+                let m =
+                    C.Op.( >>= ) (SC.down (loop ()) []) (fun _ -> C.return ())
+                in
+                C.eval m (Lazy.lazy_from_val Cf_flow.Z)
+            in
+            G.wrap rx tx w
+end
+
+module TCP = Cf_tcp4_socket
+
+module Opt = struct
+    open Arg
+    
+    module NI = Cf_nameinfo
+    
+    let host_ = ref "localhost"
+    let port_ = ref None
+    let sessions_ = ref 1
+    let count_ = ref 1
+    
+    let specs_ = Arg.align [
+        "-n", Set_int sessions_, " Number of sessions";
+        "-j", Set_int count_, " Number of messages to send per session";
+        "-h", Set_string host_, " Remote hostname (or IP address)";
+    ]
+    
+    let anon_ p =
+        if !port_ <> None then jout#fail "unexpected anonymous argument";
+        port_ := (Some p)
+    
+    let usage_ =
+        Printf.sprintf "Usage: %s [-n count] [-h host] port"
+            Sys.executable_name
+    
+    let () =
+        parse specs_ anon_ usage_
+    
+    let server =
+        let port =
+            match !port_ with
+            | None ->
+                usage specs_ usage_;
+                jout#fail "remote service is required"
+            | Some p ->
+                p
+        in
+        let rec loop = function
+            | [] ->
+                assert (not true);
+                TCP.P.AF.unspecified
+            | ai :: tl ->
+                match NI.specialize_sockaddr ai.NI.ai_addr TCP.P.AF.domain with
+                | None ->
+                    loop tl
+                | Some addr ->
+                    addr
+        in
+        let ais =
+            try NI.to_address (NI.A_bothnames (!host_, port)) with
+            | NI.Unresolved _ as x ->
+                jout#warn "unresolved remote address";
+                raise x
+        in
+        let addr = TCP.P.AF.of_sockaddr (loop ais) in
+        let host, port = addr in
+        let hostname = Cf_ip4_addr.ntop host in
+        jout#info "server at %s/%u" hostname port;
+        addr
+    
+    let sessions =
+        let n = !sessions_ in
+        if n < 1 then begin
+            usage specs_ usage_;
+            jout#fail "invalid number of sessions"
+        end;
+        let rec loop i =
+            lazy begin
+                if i < n then
+                    Cf_seq.P (i, loop (succ i))
+                else
+                    Cf_seq.Z
+            end
+        in
+        loop 0
+    
+    let count =
+        let n = !count_ in
+        if n < 1 then begin
+            usage specs_ usage_;
+            jout#fail "invalid number of messages to send per session"
+        end;
+        n
+end
+
+module type Main_T = sig
+    val reactor: G.kernel_t -> (unit, unit) G.t
+end
+
+module Main = struct
+    module M_tcp = Iom_sock_stream.Create(TCP)
+    
+    module N_map = Cf_rbtree.Map(Cf_ordered.Int_order)
+    
+    type mux_event_t =
+        | Mux_set of int * unit G.tx
+        | Mux_clr of int
+
+    let interrupt_ k =
+        G.simplex >>= fun (muxRx, muxTx) ->
+        let muxRx = (muxRx :> mux_event_t G.rx) in
+        G.simplex >>= fun (sigRx, sigTx) ->
+        let sigRx = (sigRx :> [ `Signal of int ] G.rx) in
+        G.simplex >>= fun (ctrlRx, ctrlTx) ->
+        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
+        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
+        Iom_reactor.signal ~c:ctrlRx ~r:sigTx ~n:Sys.sigint k >>= fun () ->
+        ctrlTx#put G.C_load >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                sigRx#get do_sigRx_ >>= fun () ->
+                muxRx#get do_muxRx_
+            end
+        and do_sigRx_ (`Signal _) =
+            do_finish_ () >>= fun () ->
+            G.load >>= fun m ->
+            let z = N_map.to_seq_incr m in
+            let z = Cf_seq.second z in
+            let z = Cf_seq.map begin fun termTx ->
+                let termTx = (termTx :> unit G.tx) in
+                termTx#put ()
+            end z in
+            Cf_seq.C.sequence z
+        and do_muxRx_ cmd =
+            G.load >>= fun m ->
+            match cmd with
+            | Mux_set (n, termTx) ->
+                G.store (N_map.replace (n, termTx) m) >>= loop
+            | Mux_clr n ->
+                let m = N_map.delete n m in
+                if N_map.empty m then
+                    do_finish_ ()
+                else
+                    loop ()
+        and do_finish_ () =
+            ctrlTx#put G.C_unload >>= fun () ->
+            ctrlTx#put G.C_final
+        in
+        G.start (loop ()) N_map.nil >>= fun () ->
+        Cf_cmonad.return muxTx
+    
+    let session_ muxTx n sock k =
+        let muxTx = (muxTx :> mux_event_t G.tx) in
+        G.duplex >>= fun ((upRx, dnTx), ctrlIO) ->
+        let upRx = (upRx :> SR.endpoint_rx_t G.rx) in
+        let dnTx = (dnTx :> SR.endpoint_tx_t G.tx) in
+        M_tcp.endpoint ~c:ctrlIO sock k >>= fun () ->
+        G.simplex >>= fun (lineRx, lineTx) ->
+        let lineRx = (lineRx :> SR.endpoint_rx_t G.rx) in
+        Discipline.create upRx lineTx >>= fun () ->
+        let window = 256 in
+        dnTx#put (SR.IO_tx_window window) >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                lineRx#get do_lineRx_
+            end
+        and do_lineRx_ = function
+            | SR.IO_rx_error x ->
+                jout#fail "Main.session_/do_lineRx_: incomplete! [IO_rx_error]"
+            | SR.IO_rx_data msg ->
+                let str = Cf_message.contents msg in
+                if String.length str > 0 && str.[0] = '[' then
+                    loop ()
+                else begin
+                    G.load >>= fun n ->
+                    if n < Opt.count then begin
+                        let n = succ n in
+                        putMessage_ n >>= fun () ->
+                        G.store n >>= fun () ->
+                        dnTx#put (SR.IO_tx_window window) >>= loop
+                    end
+                    else if n = Opt.count then begin
+                        let n = succ n in
+                        G.store n >>= fun () ->
+                        dnTx#put SR.IO_tx_release >>= loop
+                    end
+                    else
+                        loop ()
+                end
+            | SR.IO_rx_release ->
+                dnTx#put SR.IO_tx_release >>= fun () ->
+                muxTx#put (Mux_clr n)
+            | SR.IO_rx_blocked ->
+                dnTx#put (SR.IO_tx_window 0) >>= loop
+            | SR.IO_rx_ready ->
+                dnTx#put (SR.IO_tx_window window) >>= loop
+            | SR.IO_rx_closed ->
+                muxTx#put (Mux_clr n)
+            | SR.IO_rx_unlinked ->
+                jout#fail "Main.session_/do_lineRx_: incomplete! [IO_rx_unlinked]"
+        and putMessage_ n =
+            let str = Printf.sprintf "%u\r\n" n in
+            let msg = Cf_message.create str in
+            dnTx#put (SR.IO_tx_data msg)
+        in
+        G.start (putMessage_ 0) 0 >>= fun () ->
+        G.start (loop ()) 0
+
+    let connect_ muxTx n k =
+        let muxTx = (muxTx :> mux_event_t G.tx) in
+        G.simplex >>= fun (termRx, termTx) ->
+        let termRx = (termRx :> unit G.rx) in
+        muxTx#put (Mux_set (n, termTx)) >>= fun () ->
+        G.simplex >>= fun (inRx, inTx) ->
+        let inRx = (inRx :> M_tcp.initiator_rx_t G.rx) in
+        G.simplex >>= fun (cancelRx, cancelTx) ->
+        let cancelRx = (cancelRx :> unit G.rx) in
+        M_tcp.initiator ~c:inTx ~cancel:cancelRx Opt.server k >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                inRx#get do_inRx_ >>= fun () ->
+                termRx#get do_termRx_
+            end
+        and do_inRx_ = function
+            | M_tcp.I_connect (sock, _, _) ->
+                session_ muxTx n sock k
+            | M_tcp.I_error x ->
+                muxTx#put (Mux_clr n) >>= fun () ->
+                begin
+                    try raise x with
+                    | Unix.Unix_error (error, _, _) ->
+                        let str = Unix.error_message error in
+                        jout#warn "unable to connect session %u (%s)" n str;
+                    | x ->
+                        let str = Printexc.to_string x in
+                        jout#warn "unable to connect session %u (%s)" n str;
+                end;
+                Cf_cmonad.return ()
+        and do_termRx_ () =
+            jout#fail "Main.connect_/do_termRx_: unimplemented!"
+        in
+        G.start (loop ()) ()
+
+    let reactor k =
+        interrupt_ k >>= fun muxTx ->
+        Cf_seq.C.sequence begin
+            Cf_seq.map begin fun i ->
+                connect_ muxTx i k
+            end Opt.sessions
+        end
+end
+
+let () =
+    Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
+    begin
+        try G.run Main.reactor () with
+        | Unix.Unix_error (error, where, arg) ->
+            let error = Unix.error_message error in
+            jout#fail "Unix error: '%s' in %s(%s)\n" error where arg
+    end;
+    jout#info "orderly shutdown"
+
+(*--- End of Program [ t_mirrorc.ml ] ---*)
+(*---------------------------------------------------------------------------*
+  PROGRAM  t_perf.ml
+
+  Copyright (c) 2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+let jout = Cf_journal.stdout
+let _ = jout#setlimit `None
+let _ = Random.self_init ()
+
+module Opt: sig
+    type mode_t = Client | Server
+
+    val mode: mode_t
+    val addr: Cf_tcp6_socket.address_t
+    val frameSize: int
+    val windowSize: int
+    val maxEndpoints: int
+    val txSize: int
+    val sessionSeq: unit Cf_seq.t
+end = struct
+    open Arg
+
+    type mode_t = Client | Server
+    
+    let mode_ = ref Client
+    let hostName_ = ref (Cf_ip6_addr.ntop Cf_ip6_addr.unspecified)
+    let portName_ = ref None
+    let frameSize_ = ref 100
+    let windowSize_ = ref 1000
+    let maxEndpoints_ = ref 100
+    let txSize_ = ref 100000
+    let numSessions_ = ref None
+    
+    let specs_ = Arg.align [
+        "-s", Unit (fun () -> mode_ := Server), " Server mode.";
+        "-h", String (fun s -> hostName_ := s), " Host name (address).";
+        "-p", String (fun s -> portName_ := Some s), " Port name (number).";
+        "-z", Set_int frameSize_, " Frame size (default 100).";
+        "-w", Set_int windowSize_, " Window size (default 1000).";
+        "-m", Set_int maxEndpoints_, " Maximum simultaneous endpoints.";
+        "-t", Set_int txSize_, " Client transmit size (default 100000).";
+        "-n", Int (fun n -> numSessions_ := Some n),
+            " Number of client sessions.";
+    ]
+    
+    let anon_ p =
+        jout#error "unexpected anonymous argument"
+    
+    let usage_ =
+        Printf.sprintf "Usage: %s [-s] [-h host] [-p port]"
+            Sys.executable_name
+    
+    let () =
+        parse specs_ anon_ usage_
+    
+    let mode = !mode_
+    
+    module NI = Cf_nameinfo
+    module TCP4 = Cf_tcp4_socket
+    module TCP6 = Cf_tcp6_socket
+    
+    let addr =
+        let portName =
+            match mode, !portName_ with
+            | Client, None ->
+                jout#error "remote service is required";
+                usage specs_ usage_;
+                exit (-1)
+            | Server, None ->
+                "0"
+            | _, Some p ->
+                p
+        in
+        let hostName =
+            match mode with
+            | Server ->
+                !hostName_
+            | Client ->
+                if !hostName_ = "::" then "::1" else !hostName_
+        in
+        let rec loop = function
+            | [] ->
+                assert (not true);
+                jout#fail "resolver error"
+            | ai :: tl ->
+                match
+                    NI.specialize_sockaddr ai.NI.ai_addr TCP6.P.AF.domain
+                with
+                | Some addr ->
+                    TCP6.P.AF.of_sockaddr addr
+                | None ->
+                    match
+                        NI.specialize_sockaddr ai.NI.ai_addr TCP4.P.AF.domain
+                    with
+                    | None ->
+                        loop tl
+                    | Some addr ->
+                        try
+                            let host, port = TCP4.P.AF.of_sockaddr addr in
+                            let host = Cf_ip4_addr.is_unicast host in
+                            let host = Cf_ip6_addr.to_v4mapped host in
+                            (host, port, 0l :> TCP6.address_t)
+                        with
+                        | Failure _ ->
+                            loop tl
+        in
+        let hint =
+            if mode = Server then {
+                NI.addrinfo_default_hint with
+                NI.ai_flags = {
+                    NI.to_address_default_flags with
+                    NI.ai_passive = true
+                }
+            }
+            else
+                NI.addrinfo_default_hint
+        in
+        let niArg =
+            if mode = Server && hostName = "::" then
+                NI.A_servicename portName
+            else
+                NI.A_bothnames (hostName, portName)
+        in
+        loop begin
+            try NI.to_address ~hint (NI.A_bothnames (hostName, portName)) with
+            | NI.Unresolved _ as x ->
+                jout#warn "unresolved remote address";
+                raise x
+        end
+    
+    let frameSize =
+        if !frameSize_ < 1 then begin
+            jout#error "frame size must be positive integer";
+            usage specs_ usage_;
+            exit (-1)
+        end;
+        !frameSize_
+    
+    let windowSize =
+        if !windowSize_ < 1 then begin
+            jout#error "window size must be positive integer";
+            usage specs_ usage_;
+            exit (-1)
+        end;
+        !windowSize_
+
+    let maxEndpoints =
+        if !maxEndpoints_ < 1 then begin
+            jout#error "max simultaneous endpoints must be positive integer";
+            usage specs_ usage_;
+            exit (-1)
+        end;
+        !maxEndpoints_
+
+    let txSize =
+        if !txSize_ < 1 then begin
+            jout#error "client transmit size must be positive integer";
+            usage specs_ usage_;
+            exit (-1)
+        end;
+        !txSize_
+    
+    let rec sessionSeq_ n =
+        lazy begin
+            match n with
+            | Some n when n > 0 ->
+                Cf_seq.P ((), sessionSeq_ (Some (pred n)))
+            | None ->
+                Cf_seq.P ((), sessionSeq_ None)
+            | _ ->
+                Cf_seq.Z
+        end
+    
+    let sessionSeq =
+        match !numSessions_ with
+        | Some n when n < 1 ->
+            jout#error "number of client sessions must be positive integer";
+            usage specs_ usage_;
+            exit (-1)
+        | n ->
+            sessionSeq_ n
+end
+
+module Common = struct
+    open Cf_cmonad.Op
+
+    module G = Iom_gadget
+    module SR = Iom_sock_stream
+    module TCP = Iom_tcp6_socket
+    module IP6 = Cf_ip6_addr
+    module IP4 = Cf_ip6_addr
+    
+    module N_map = Cf_rbtree.Map(Cf_ordered.Int_order)
+    
+    let socket_ntop_ addr =
+        let host, port, scope = addr in
+        let hostname = Cf_ip6_addr.ntop host in
+        let ifname =
+            try "%" ^ (Cf_netif.indextoname (Int32.to_int scope))
+            with Not_found -> ""
+        in
+        Printf.sprintf "%s%s/%u" hostname ifname port
+
+    let interrupt_ k =
+        G.simplex >>= fun (sigRx, sigTx) ->
+        let sigTx = (sigTx :> [ `Signal of int ] G.tx) in
+        G.simplex >>= fun (ctrlRx, ctrlTx) ->
+        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
+        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
+        Iom_reactor.signal ~c:ctrlRx ~r:sigTx ~n:Sys.sigint k >>= fun () ->
+        ctrlTx#put G.C_load >>= fun () ->
+        Cf_cmonad.return (ctrlTx, sigRx)
+end
+
+module Client: sig
+    val reactor: Iom_gadget.kernel_t -> (unit, unit) Iom_gadget.t
+end = struct
+    include Common
+    open Cf_cmonad.Op
+    
+    let session_ topTx ep frame k =
+        let fd: int = Obj.magic (Cf_socket.to_unix_file_descr ep) in
+        let topTx = (topTx :> 'top G.tx) in
+        G.duplex >>= fun ((dnRx, dnTx), c) ->
+        TCP.endpoint ~c ep k >>= fun () ->
+        let dnRx = (dnRx :> SR.endpoint_rx_t G.rx) in
+        let dnTx = (dnTx :> SR.endpoint_tx_t G.tx) in
+        dnTx#put (SR.IO_tx_window Opt.windowSize) >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                dnRx#get do_dnRx_
+            end
+        and do_dnRx_ = function
+            | SR.IO_rx_error x ->
+                raise x
+            | SR.IO_rx_data msg ->
+                let len = Cf_message.length msg in
+                dnTx#put (SR.IO_tx_extend len) >>= fun () ->
+                begin G.load >>= function
+                    | None ->
+                        loop ()
+                    | Some txSize ->
+                        if txSize > 0 then begin
+                            let txSize = txSize - len in
+                            let txSize = if txSize < 0 then 0 else txSize in
+                            do_put_frame_ len >>= fun () ->
+                            G.store (Some txSize) >>= loop
+                        end
+                        else begin
+                            dnTx#put SR.IO_tx_release >>= fun () ->
+                            G.store None >>= loop
+                        end
+                end
+            | SR.IO_rx_release -> 
+                jout#warn "Client.session_: \
+                    unexpected SR.IO_rx_release event!";
+                loop ()
+            | SR.IO_rx_blocked ->
+                assert begin
+                    jout#debug "Client.session_: \
+                        unexpected SR.IO_rx_blocked event!"
+                end;
+                loop ()
+            | SR.IO_rx_ready ->
+                assert begin
+                    jout#debug "Client.session_: \
+                        unexpected SR.IO_rx_ready event!"
+                end;
+                loop ()
+            | SR.IO_rx_closed ->
+                topTx#put `Done
+            | SR.IO_rx_unlinked ->
+                assert (not true);
+                jout#fail "Client.session_: \
+                    unexpected SR.IO_rx_unlinked event!"
+        and do_put_frame_ len =
+            if len > 0 then begin
+                dnTx#put (SR.IO_tx_data frame) >>= fun () ->
+                do_put_frame_ (len - Opt.frameSize)
+            end
+            else
+                Cf_cmonad.return ()
+        in
+        G.start (do_put_frame_ Opt.windowSize) None >>= fun () ->
+        G.start (loop ()) (Some (Opt.txSize - Opt.frameSize))
+    
+    let connector_ topTx k =
+        let topTx = (topTx :> 'top G.tx) in
+        G.simplex >>= fun (ctrlRx, c) ->
+        TCP.initiator ~c Opt.addr k >>= fun () ->
+        let ctrlRx = (ctrlRx :> TCP.initiator_rx_t G.rx) in
+        let rec loop () =
+            G.guard begin
+                ctrlRx#get do_ctrlRx_
+            end
+        and do_ctrlRx_ = function
+            | TCP.I_connect (ep, locAddr, remAddr) ->
+                jout#info "client at %s" (socket_ntop_ locAddr);
+                Cf_socket.setsockopt ep Cf_ip_common.tcp_nodelay true;
+                let m = String.create Opt.frameSize in
+                for i = 0 to Opt.frameSize - 1 do
+                    m.[i] <- Char.chr (32 + (Random.int 95))
+                done;
+                let m = Cf_message.create m in
+                session_ topTx ep m k
+            | TCP.I_error x ->
+                topTx#put (`Error x)
+        in
+        G.start (loop ()) ()
+    
+    let reactor k =
+        G.simplex >>= fun (topRx, topTx) ->
+        let topRx = (topRx :> 'top G.rx) in
+        let startSeq = Cf_seq.limit Opt.maxEndpoints Opt.sessionSeq in
+        let startSeq = Cf_seq.map (fun () -> connector_ topTx k) startSeq in
+        Cf_seq.C.sequence startSeq >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                topRx#get do_topRx_
+            end
+        and do_topRx_ = function
+            | `Error x ->
+                raise x
+            | `Done ->
+                G.load >>= fun (current, remainSeq) ->
+                match Lazy.force remainSeq with
+                | Cf_seq.P ((), remainSeq) ->
+                    connector_ topTx k >>= fun () ->
+                    G.store (current, remainSeq) >>= loop
+                | Cf_seq.Z when current > 1 ->
+                    G.store (pred current, remainSeq) >>= loop
+                | _ ->
+                    Cf_cmonad.return ()
+        in
+        let remainSeq = Cf_seq.shift Opt.maxEndpoints Opt.sessionSeq in
+        G.start (loop ()) (Opt.maxEndpoints, remainSeq)
+end
+
+module Server: sig
+    val reactor: Iom_gadget.kernel_t -> (unit, unit) Iom_gadget.t
+end = struct
+    include Common
+    open Cf_cmonad.Op
+    
+    type client_state_t =
+        | C_running of Cf_message.t * Cf_message.t Cf_deque.t option
+        | C_draining of Cf_message.t Cf_deque.t option
+    
+    let blockedQ_ = Some Cf_deque.nil
+
+    let client_ clientNo ep k =
+        G.duplex >>= fun ((dnRx, dnTx), c) ->
+        let dnRx = (dnRx :> SR.endpoint_rx_t G.rx) in
+        let dnTx = (dnTx :> SR.endpoint_tx_t G.tx) in
+        TCP.endpoint ~c ep k >>= fun () ->
+        G.duplex >>= fun ((upRx, upTx), upIO) ->
+        let upRx = (upRx :> 'upRx G.rx) in
+        let upTx = (upTx :> 'upTx G.tx) in
+        dnTx#put (SR.IO_tx_window Opt.windowSize) >>= fun () ->
+        let rec loop () =
+            G.guard begin
+                dnRx#get do_dnRx_ >>= fun () ->
+                upRx#get do_upRx_
+            end
+        and do_dnRx_ = function
+            | SR.IO_rx_error x -> upTx#put (`Error x)
+            | SR.IO_rx_data msg -> do_dnRx_data_ msg
+            | SR.IO_rx_release -> do_dnRx_release_ ()
+            | SR.IO_rx_blocked -> do_dnRx_blocked_ ()
+            | SR.IO_rx_ready -> do_dnRx_ready_ ()
+            | SR.IO_rx_closed -> do_dnRx_closed_ ()
+            | SR.IO_rx_unlinked ->
+                assert (not true);
+                jout#fail "Server.client_[%u]: \
+                    unexpected SR.IO_rx_unlinked event!" clientNo
+        and do_dnRx_data_ msg =
+            G.load >>= function
+            | C_draining _ ->
+                assert (not true);
+                loop ()
+            | C_running (inMsg, blockedQ) ->
+                let rec aux msg =
+                    let len = Cf_message.length msg in
+                    let pos = Opt.frameSize in
+                    if len > pos then begin
+                        let m1, m2 = Cf_message.split ~pos msg in
+                        Cf_seq.writeC (upTx#put (`Frame m1)) >>= fun () ->
+                        aux m2
+                    end
+                    else if len = pos then begin
+                        Cf_seq.writeC (upTx#put (`Frame msg)) >>= fun () ->
+                        Cf_seq.writeC (G.store (C_running ([], blockedQ)))
+                    end
+                    else
+                        Cf_seq.writeC (G.store (C_running (msg, blockedQ)))
+                in
+                let z = Cf_seq.evalC (aux (inMsg @ msg)) in
+                Cf_seq.C.sequence z >>= fun () ->
+                dnTx#put (SR.IO_tx_window Opt.windowSize) >>= loop
+        and do_dnRx_release_ () =
+            G.load >>= function
+            | C_draining _ ->
+                assert (not true);
+                loop ()
+            | C_running (inMsg, blockedQ) ->
+                dnTx#put SR.IO_tx_release >>= fun () ->
+                upTx#put `Done >>= fun () ->
+                do_dnRx_release_aux_ inMsg >>= fun () ->
+                G.store (C_draining blockedQ) >>= loop
+        and do_dnRx_release_aux_ = function
+            | [] -> Cf_cmonad.return ()
+            | m -> upTx#put (`Frame m)
+        and do_dnRx_blocked_ () =
+            G.modify begin fun s ->
+                match s with
+                | C_running (inMsg, None) -> C_running (inMsg, blockedQ_)
+                | C_draining None -> C_draining blockedQ_
+                | s -> assert (not true); s
+            end
+        and do_dnRx_ready_ () =
+            jout#info "Server.client_[%u]: msg received." clientNo;
+            G.load >>= function
+            | C_running (inMsg, (Some outMsgQ)) ->
+                do_dnRx_ready_drain_ outMsgQ >>= fun () ->
+                G.store (C_running (inMsg, None)) >>= loop
+            | C_draining (Some outMsgQ) ->
+                do_dnRx_ready_drain_ outMsgQ >>= fun () ->
+                dnTx#put SR.IO_tx_release >>= fun () ->
+                G.store (C_draining None) >>= loop
+            | _ ->
+                assert (not true);
+                loop ()
+        and do_dnRx_ready_drain_ outMsgQ =
+            let z = Cf_deque.A.to_seq outMsgQ in
+            let z = Cf_seq.map (fun m -> dnTx#put (SR.IO_tx_data m)) z in
+            Cf_seq.C.sequence z
+        and do_dnRx_closed_ () =
+            upTx#put `Done
+        and do_upRx_ = function
+            | `Frame msg -> do_upRx_frame_ msg
+            | `Abort -> dnTx#put SR.IO_tx_reset
+        and do_upRx_frame_ msg =
+            G.load >>= function
+            | C_draining (Some q) ->
+                let q = Some (Cf_deque.A.push msg q) in
+                G.store (C_draining q) >>= loop
+            | C_running (inMsg, Some q) ->
+                let q = Some (Cf_deque.A.push msg q) in
+                G.store (C_running (inMsg, q)) >>= loop
+            | _ ->
+                dnTx#put (SR.IO_tx_data msg) >>= loop
+        in
+        G.start (loop ()) (C_running ([], None)) >>= fun () ->
+        Cf_cmonad.return upIO
+    
+    let reactor k =
+        interrupt_ k >>= fun (sigTx, sigRx) ->
+        let sigRx = (sigRx :> [ `Signal of int ] G.rx) in
+        let sigTx = (sigTx :> G.control_t G.tx) in
+        G.duplex >>= fun ((listenRx, listenTx), c) ->
+        let listenRx = (listenRx :> TCP.listener_rx_t G.rx) in
+        let listenTx = (listenTx :> TCP.listener_tx_t G.tx) in
+        TCP.listener ~c ~qlen:10 Opt.addr k >>= fun () ->
+        listenTx#put (TCP.L_limit Opt.maxEndpoints) >>= fun () ->
+        let rec loop () =
+            G.load >>= fun (_, _, dnIOQ as s) ->
+            G.guard begin
+                let z = Cf_deque.A.to_seq dnIOQ in
+                let z = Cf_seq.map begin fun (n, dnRx, _) -> 
+                    let dnRx = (dnRx :> 'dnRx G.rx) in
+                    dnRx#get (do_dnRx_ s n)
+                end z in
+                Cf_seq.C.sequence z >>= fun () ->
+                listenRx#get do_listenRx_ >>= fun () ->
+                sigRx#get do_sigRx_
+            end
+        and do_dnRx_ (nClient, dnIOMap, dnIOQ as s) n = function
+            | `Frame msg ->
+                do_dnRx_frame_ msg dnIOQ >>= fun dnIOQ ->
+                G.store (nClient, dnIOMap, dnIOQ) >>= loop
+            | `Done ->
+                listenTx#put (TCP.L_extend 1) >>= fun () ->
+                let dnIOMap = N_map.delete n dnIOMap in
+                let dnIOQ =
+                    Cf_deque.filter (fun (n', _, _) -> n <> n') dnIOQ
+                in
+                G.store (nClient, dnIOMap, dnIOQ) >>= loop
+            | `Error x ->
+                raise x
+        and do_dnRx_frame_ msg dnIOQ =
+            match Cf_deque.A.pop dnIOQ with
+            | Some (hd, tl) ->
+                let dnIOQ = Cf_deque.B.push hd tl in
+                let _, _, dnTx = hd in
+                let dnTx = (dnTx :> 'dnTx G.tx) in
+                dnTx#put (`Frame msg) >>= fun () ->
+                Cf_cmonad.return dnIOQ
+            | None ->
+                Cf_cmonad.return dnIOQ
+        and do_listenRx_ = function
+            | TCP.L_bind addr ->
+                jout#info "server at %s" (socket_ntop_ addr);
+                loop ()
+            | TCP.L_connect (ep, lAddr, rAddr) ->
+                Cf_socket.setsockopt ep Cf_ip_common.tcp_nodelay true;
+                jout#info "client connected from %s" (socket_ntop_ rAddr);
+                G.load >>= fun (nClient, dnIOMap, dnIOQ) ->
+                client_ nClient ep k >>= fun dnIO ->
+                let dnIOMap = N_map.replace (nClient, dnIO) dnIOMap in
+                let dnRx, dnTx = dnIO in
+                let dnIOQ = Cf_deque.B.push (nClient, dnRx, dnTx) dnIOQ in
+                let nClient = succ nClient in
+                G.store (nClient, dnIOMap, dnIOQ) >>= loop
+            | TCP.L_error x ->
+                loop ()
+        and do_sigRx_ (`Signal _) =
+            listenTx#put TCP.L_close
+        in
+        G.start (loop ()) (0, N_map.nil, Cf_deque.nil)
+end
+
+let () =
+    let reactor =
+        match Opt.mode with
+        | Opt.Client -> Client.reactor
+        | Opt.Server -> Server.reactor
+    in
+    Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
+    begin
+        try Iom_gadget.run reactor () with
+        | Unix.Unix_error (error, where, arg) ->
+            let error = Unix.error_message error in
+            jout#fail "Unix error: '%s' in %s(%s)\n" error where arg
+    end;
+    jout#info "ok"
+
+(*--- End of Program [ t_perf.ml ] ---*)