Anonymous avatar Anonymous committed 6bb35fc

Split Iom_reactor into Iom_gadget.

Comments (0)

Files changed (3)

iom/iom_sock_stream.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  iom_sock_stream.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(*
+let xtag = "[Iom_sock_stream] "
+
+let xprintf fmt =
+    print_string xtag;
+    Printf.printf fmt
+
+let xsprintf (fmt : ('a, unit, string) format) =
+    Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
+*)
+
+type endpoint_tx_t =
+    | IO_tx_data of Cf_message.t	(* send data to peer *)
+    | IO_tx_release                 (* initiate orderly release *)
+    | IO_tx_extend of int           (* extend application receive window *)
+    | IO_tx_window of int           (* set application receive window *)
+ (* | IO_tx_peek of (Cf_message.t -> int option) peek until extend permitted *)
+    | IO_tx_reset                   (* abortive close *)
+    | IO_tx_unlink                  (* flush output and stop input *)
+
+type endpoint_rx_t =
+    | IO_rx_error of exn            (* socket closed on error condition *)
+    | IO_rx_data of Cf_message.t	(* data received from peer *)
+    | IO_rx_release                 (* peer initiated orderly release *)
+    | IO_rx_blocked                 (* transmit flow control *)
+    | IO_rx_ready                   (* ready for transmit *)
+    | IO_rx_closed                  (* orderly close complete *)
+    | IO_rx_unlinked                (* output flushed and input stopped *)
+
+module type T = sig
+    module S: Cf_sock_stream.T
+    
+    type connection_t = S.t * S.address_t * S.address_t
+
+    type initiator_rx_t =
+        | I_connect of connection_t
+        | I_error of exn
+    
+    val initiator:
+        c:initiator_rx_t #Iom_gadget.tx -> ?cancel:unit #Iom_gadget.rx ->
+        ?sock:S.t -> ?src:S.address_t -> S.address_t -> Iom_gadget.kernel_t ->
+        ('s, unit) Iom_gadget.t
+    
+    type listener_tx_t =
+        | L_extend of int
+        | L_limit of int
+        | L_close
+    
+    type listener_rx_t =
+        | L_bind of S.address_t
+        | L_connect of connection_t
+        | L_error of exn
+        
+    val listener:
+        c:(listener_tx_t #Iom_gadget.rx * listener_rx_t #Iom_gadget.tx) ->
+        ?sock:S.t -> ?qlen:int -> S.address_t -> Iom_gadget.kernel_t ->
+        ('s, unit) Iom_gadget.t
+    
+    val endpoint:
+        c:(endpoint_tx_t #Iom_gadget.rx * endpoint_rx_t #Iom_gadget.tx) -> 
+        S.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+end
+
+let is_blocked_ x = List.memq x [ Unix.EAGAIN; Unix.EWOULDBLOCK ]
+let is_blocked_connect_ x = List.memq x [ Unix.EALREADY; Unix.EINPROGRESS ]
+
+module Create(S: Cf_sock_stream.T): (T with module S = S) = struct
+    module G = Iom_gadget
+    open Cf_cmonad.Op
+    
+    module S = S
+    
+    type 's service_aux_t =
+        | Aux_load
+        | Aux_unload
+        | Aux_working of ('s, unit) G.t
+        | Aux_final of ('s, unit) G.t
+    
+    class virtual file_aux rwx e =
+        let e = (e :> S.basic) in
+        object(self:'self)
+            inherit [(unit, unit) G.t] Cf_poll.file rwx e#fd
+
+            val mutable limit_ = 0
+            
+            method virtual private service_aux: 's. 's service_aux_t
+
+            method private service_ p =
+                match self#service_aux with
+                | Aux_final v -> `Final v
+                | Aux_working v -> `Working (p, v)
+                | _ -> assert (not true); state_
+
+            method limit:
+                's 'c. c:(G.control_t #G.tx as 'c) -> int ->
+                    ('s, unit) G.t =
+                fun ~c n ->
+                    let c = (c :> G.control_t G.tx) in
+                    let n0 = limit_ in
+                    limit_ <- n;
+                    if n <= 0 && n0 > 0 then
+                        c#put G.C_unload
+                    else if n > 0 && n0 <= 0 then begin
+                        match self#service_aux with
+                        | Aux_load ->
+                            c#put G.C_load
+                        | Aux_unload ->
+                            c#put G.C_unload
+                        | Aux_working v
+                        | Aux_final v ->
+                            if limit_ <= 0 then
+                                v
+                            else
+                                v >>= fun () ->
+                                c#put G.C_load
+                    end
+                    else
+                        Cf_cmonad.return ()
+            
+            method extend:
+                's 'c. c:(G.control_t #G.tx as 'c) -> int ->
+                    ('s, unit) G.t =
+                fun ~c n ->
+                    self#limit ~c (limit_ + n)
+        end
+    
+    type connection_t = S.t * S.address_t * S.address_t
+
+    type initiator_rx_t =
+        | I_connect of connection_t
+        | I_error of exn
+
+    type listener_tx_t =
+        | L_extend of int
+        | L_limit of int
+        | L_close
+    
+    type listener_rx_t =
+        | L_bind of S.address_t
+        | L_connect of connection_t
+        | L_error of exn
+    
+    type try_signaling_t =
+        | Try_sig_connect of connection_t
+        | Try_sig_error of exn
+        | Try_sig_pending
+    
+    let i_cons_connection_ ~s =
+        let laddr = s#getsockname in
+        let sock = s#socket in
+        let raddr = S.P.AF.of_sockaddr (Cf_socket.getpeername sock) in
+        Try_sig_connect (sock, laddr, raddr)
+
+    let i_try_connect_ ~s =
+        let sock = s#socket in
+        try
+            s#connect;
+            i_cons_connection_ ~s
+        with
+        | Unix.Unix_error (code, _, _) when is_blocked_connect_ code ->
+            let error = Unix.error_message code in
+            Try_sig_pending
+        | error ->
+            Try_sig_error error
+    
+    let i_try_connect_0_ ~s =
+        match i_try_connect_ ~s with
+        | Try_sig_error (Unix.Unix_error (Unix.EISCONN, _, _)) ->
+            (try i_cons_connection_ ~s with x -> Try_sig_error x)
+        | v ->
+            v
+    
+    class i_poll ~ic s =
+        let ic = (ic :> initiator_rx_t G.tx) in
+        let s = (s :> S.initiator) in
+        object
+            inherit [(unit, unit) G.t] Cf_poll.file `W s#fd
+                                    
+            method private service_ _ =
+                match i_try_connect_0_ ~s with
+                | Try_sig_connect connection ->
+                    `Final (ic#put (I_connect connection))
+                | Try_sig_error error ->
+                    `Final (ic#put (I_error error))
+                | Try_sig_pending ->
+                    state_
+        end
+    
+    let i_cancel_rx_ c () =
+        let c = (c :> G.control_t G.tx) in
+        c#put G.C_unload >>= fun () ->
+        c#put G.C_final
+
+    let i_cancel_ ~cancel ~c =
+        let cancel = (cancel :> unit G.rx) in
+        let c = (c :> G.control_t G.tx) in
+        G.start (c#put G.C_load) () >>= fun () ->
+        G.start (G.guard (cancel#get (i_cancel_rx_ c))) ()
+    
+    let initiator ~c ?cancel ?sock ?src addr k =
+        let ic = (c :> initiator_rx_t G.tx) in
+        try
+            let s = new S.initiator ?sock ?src addr in
+            Unix.set_nonblock s#fd;
+            match i_try_connect_ ~s with
+            | Try_sig_error error ->
+                ic#put (I_error error)
+            | Try_sig_connect connection ->
+                ic#put (I_connect connection)
+            | Try_sig_pending ->
+                let e = new i_poll ~ic s in
+                match cancel with
+                | None ->
+                    G.poll ~e k
+                | Some cancel ->
+                    G.simplex >>= fun (rx, tx) ->
+                    G.poll ~c:rx ~e k >>= fun () ->
+                    i_cancel_ ~cancel ~c:tx
+        with
+        | x ->
+            G.start (ic#put (I_error x)) ()
+        
+    let l_try_accept_ ~s =
+        let s = (s :> S.listener) in
+        try
+            let snew, raddr = s#accept in
+            let laddr = S.P.AF.of_sockaddr (Cf_socket.getsockname snew) in
+            Try_sig_connect (snew, laddr, raddr)
+        with
+        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
+            Try_sig_pending
+        | error ->
+            Try_sig_error error
+            
+    class l_poll ~lr s =
+        let lr = (lr :> listener_rx_t G.tx) in
+        let s = (s :> S.listener) in
+        object
+            inherit file_aux `R s
+                        
+            method private service_aux =
+                if limit_ <= 0 then
+                    Aux_unload
+                else
+                    match l_try_accept_ ~s with
+                    | Try_sig_pending ->
+                        Aux_load
+                    | Try_sig_error error ->
+                        limit_ <- 0;
+                        Aux_final (lr#put (L_error error))
+                    | Try_sig_connect connection ->
+                        limit_ <- pred limit_;
+                        let m = lr#put (L_connect connection) in
+                        if limit_ > 0 then Aux_working m else Aux_final m
+        end
+        
+    let l_control_loop_ ~c ~rc =
+        let lc, _ = c in
+        let lc = (lc :> listener_tx_t G.rx) in
+        let rc = (rc :> G.control_t G.tx) in
+        let rec loop () = G.guard (lc#get cmd_rx)
+        and cmd_rx msg =
+            G.load >>= fun (socket, poll) ->
+            let socket = (socket :> S.listener) in
+            let poll = (poll :> l_poll) in
+            match msg with
+            | L_extend n ->
+                poll#extend ~c:rc n >>= loop
+            | L_limit n ->
+                poll#limit ~c:rc n >>= loop
+            | L_close ->
+                socket#close;
+                rc#put G.C_unload >>= fun () ->
+                rc#put G.C_final
+        in
+        fun s ->
+            G.start (loop ()) s
+        
+    let listener ~c ?sock ?qlen addr k =
+        let _, lr = c in
+        let lr = (lr :> listener_rx_t G.tx) in
+        try
+            let s = new S.listener ?sock addr in
+            begin
+                match qlen, sock with
+                | Some n, _ -> s#listen n
+                | None, None -> s#listen 1
+                | _, _ -> ()
+            end;
+            let address = s#getsockname in
+            Unix.set_nonblock s#fd;
+            let e = new l_poll ~lr s in
+            lr#put (L_bind address) >>= fun () ->
+            G.simplex >>= fun (rcRx, rcTx) ->
+            l_control_loop_ ~c ~rc:rcTx (s, e) >>= fun () ->
+            G.poll ~c:rcRx ~e k
+        with
+        | error ->
+            G.start (lr#put (L_error error)) ()
+        
+    type e_poll_t = PX_txr | PX_rxf | PX_err of exn | PX_close
+    
+    type e_try_data_t =
+        | Try_data_working of int
+        | Try_data_error of exn
+        | Try_data_pending
+    
+    let e_try_recv_ ~s buf pos len =
+        let s = (s :> S.endpoint) in
+        try
+            let n = s#recv buf pos len in
+            Try_data_working n
+        with
+        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
+            Try_data_pending
+        | error ->
+            Try_data_error error
+        
+    class e_rx_poll ~s ~er ~px =
+        let s = (s :> S.endpoint) in
+        let er = (er :> endpoint_rx_t G.tx) in
+        let px = (px :> e_poll_t G.tx) in
+        object(self)
+            inherit file_aux `R s
+            
+            val mutable buffer_ = String.create 0
+            
+            method private service_aux =
+                if limit_ <= 0 then
+                    Aux_unload
+                else begin
+                    if String.length buffer_ < limit_ then
+                        buffer_ <- String.create limit_;
+                    match e_try_recv_ ~s buffer_ 0 limit_ with
+                    | Try_data_pending ->
+                        Aux_load
+                    | Try_data_error error ->
+                        Aux_final (px#put (PX_err error))
+                    | Try_data_working n ->
+                        if n = 0 then begin
+                            buffer_ <- "";
+                            limit_ <- 0;
+                            Aux_final (px#put PX_rxf)
+                        end
+                        else begin
+                            let msg = [ String.sub buffer_ 0 n, 0, n ] in
+                            let v = er#put (IO_rx_data msg) in
+                            if n < limit_ then begin
+                                limit_ <- limit_ - n;
+                                Aux_working v
+                            end
+                            else begin
+                                limit_ <- 0;
+                                Aux_final v
+                            end
+                        end
+                end
+        end
+    
+    let rec e_msg_buf_pos_len_ = function
+        | _ :: _ :: _ as m ->
+            assert (not true);
+            e_msg_buf_pos_len_ (Cf_message.copy m)
+        | hd :: [] ->
+            hd
+        | [] ->
+            "", 0, 0
+            
+    let e_try_send_ ~s msg =
+        let s = (s :> S.endpoint) in
+        let buf, pos, len = e_msg_buf_pos_len_ msg in
+        try
+            Try_data_working (s#send buf pos len)
+        with
+        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
+            Try_data_pending
+        | error ->
+            Try_data_error error
+            
+    class e_tx_poll ~s ~px =
+        let s = (s :> S.endpoint) in
+        let px = (px :> e_poll_t G.tx) in
+        object(self:'self)
+            inherit [(unit, unit) G.t] Cf_poll.file `W s#fd
+            
+            val mutable working_ = []
+
+            method private service_aux: 's. 's service_aux_t =
+                if working_ = [] then
+                    Aux_unload
+                else
+                    let working = Cf_message.flatten working_ in
+                    match e_try_send_ ~s working with
+                    | Try_data_pending ->
+                        Aux_load
+                    | Try_data_error error ->
+                        Aux_final (px#put (PX_err error))
+                    | Try_data_working n ->
+                        working_ <- Cf_message.shift ~pos:n working;
+                        let v = Cf_cmonad.return () in
+                        match working_ with
+                        | [] -> Aux_final v
+                        | _ -> Aux_working v
+
+            method private service_ p =
+                assert (working_ <> []);
+                let working = Cf_message.flatten working_ in
+                match e_try_send_ ~s working with
+                | Try_data_pending ->
+                    assert (not true);
+                    state_
+                | Try_data_error error ->
+                    `Final (px#put (PX_err error))
+                | Try_data_working n ->
+                    working_ <- Cf_message.shift ~pos:n working;
+                    match working_ with
+                    | [] -> `Final (px#put PX_txr)
+                    | _ -> `Working (p, Cf_cmonad.return ())
+            
+            method push:
+                's 'c. c:(G.control_t #G.tx as 'c) -> Cf_message.t ->
+                    ('s, unit) G.t =
+                fun ~c m ->
+                    assert (m <> []);
+                    let c = (c :> G.control_t G.tx) in
+                    working_ <- working_ @ m;
+                    match self#service_aux with
+                    | Aux_load ->
+                        c#put G.C_load
+                    | Aux_unload ->
+                        assert (not true);
+                        Cf_cmonad.return ()
+                    | Aux_working v ->
+                        v >>= fun () ->
+                        c#put G.C_load
+                    | Aux_final v ->
+                        v
+            
+            method release: 's. ('s, unit) G.t =
+                match working_ with
+                | [] ->  px#put PX_txr
+                | _ -> Cf_cmonad.return ()
+        end
+    
+    type e_state_t =
+        | ES_duplex of S.endpoint * e_rx_poll * e_tx_poll
+        | ES_rsimplex of S.endpoint * e_rx_poll
+        | ES_wsimplex of S.endpoint * e_tx_poll
+        | ES_closing of S.endpoint
+        | ES_unlinking of S.endpoint
+        
+    let e_control_loop_ ~c ~rc ~wc ~px =
+        let ec, er = c in
+        let ec = (ec :> endpoint_tx_t G.rx) in
+        let er = (er :> endpoint_rx_t G.tx) in
+        let rc = (rc :> G.control_t G.tx) in
+        let wc = (wc :> G.control_t G.tx) in
+        let px = (px :> e_poll_t G.wire_t) in
+        let pxRx = new G.rx px and pxTx = new G.tx px in
+        let rec loop () =
+            G.guard begin
+                ec#get cmd_rx >>= fun () ->
+                pxRx#get poll_rx
+            end
+        and cmd_release state txp =
+            let txp = (txp :> e_tx_poll) in
+            G.store state >>= fun () ->
+            txp#release >>= fun () -> 
+            loop ()
+        and cmd_finalize_poll xc =
+            let xc = (xc :> G.control_t G.tx) in
+            xc#put G.C_unload >>= fun () ->
+            xc#put G.C_final
+        and cmd_reset e =
+            G.store (ES_closing e) >>= fun () ->
+            e#shutdown Unix.SHUTDOWN_ALL;
+            e#close;
+            Cf_cmonad.return ()
+        and cmd_rx = function
+            | IO_tx_release ->
+                G.load >>= begin function
+                | ES_duplex (e, rxp, txp) ->
+                    cmd_release (ES_rsimplex (e, rxp)) txp
+                | ES_wsimplex (e, txp) ->
+                    cmd_release (ES_closing e) txp
+                | _ ->
+                    loop ()
+                end
+            | IO_tx_data msg ->
+                if msg <> [] then
+                    G.load >>= begin function
+                    | ES_duplex (e, _, txp)
+                    | ES_wsimplex (e, txp) ->
+                        txp#push ~c:wc msg >>= loop
+                    | _ ->
+                        loop ()
+                    end
+                else
+                    loop ()
+            | IO_tx_window n ->
+                G.load >>= begin function
+                | ES_duplex (e, rxp, _)
+                | ES_rsimplex (e, rxp) ->
+                    rxp#limit ~c:rc n >>= loop
+                | _ ->
+                    loop ()
+                end
+            | IO_tx_extend n ->
+                G.load >>= begin function
+                | ES_duplex (e, rxp, _)
+                | ES_rsimplex (e, rxp) ->
+                    rxp#extend ~c:rc n >>= loop
+                | _ ->
+                    loop ()
+                end
+            | IO_tx_unlink ->
+                G.load >>= begin function
+                | ES_duplex (e, rxp, txp) ->
+                    rxp#limit ~c:rc 0 >>= fun () ->
+                    txp#release >>= fun () ->
+                    G.store (ES_unlinking e) >>= loop
+                | ES_rsimplex (e, rxp) ->
+                    rxp#limit ~c:rc 0 >>= fun () ->
+                    er#put IO_rx_unlinked
+                | ES_wsimplex (e, txp) ->
+                    txp#release >>= fun () ->
+                    G.store (ES_unlinking e) >>= loop
+                | _ ->
+                    loop ()
+                end
+            | IO_tx_reset ->
+                G.load >>= begin function
+                | ES_duplex (e, _, _) ->
+                    cmd_finalize_poll rc >>= fun () ->
+                    cmd_finalize_poll wc >>= fun () ->
+                    cmd_reset e
+                | ES_rsimplex (e, _) ->
+                    cmd_finalize_poll rc >>= fun () ->
+                    cmd_reset e
+                | ES_wsimplex (e, _) ->
+                    cmd_finalize_poll wc >>= fun () ->
+                    cmd_reset e
+                | ES_unlinking _ ->
+                    loop ()
+                | ES_closing _ ->
+                    pxTx#put PX_close >>= loop
+                end
+        and poll_rx event =
+            match event with
+            | PX_txr ->
+                G.load >>= begin function
+                | ES_duplex _
+                | ES_wsimplex _ ->
+                    er#put IO_rx_blocked >>= loop
+                | ES_rsimplex (e, _) ->
+                    e#shutdown Unix.SHUTDOWN_SEND;
+                    loop ()
+                | ES_unlinking _ ->
+                    er#put IO_rx_unlinked
+                | ES_closing _ ->
+                    loop ()
+                end
+            | PX_rxf ->
+                G.load >>= begin function
+                | ES_duplex (e, _, txs) ->
+                    G.store (ES_wsimplex (e, txs)) >>= fun () ->
+                    er#put IO_rx_release >>= loop
+                | ES_rsimplex (e, _) ->
+                    G.store (ES_closing e) >>= fun () ->
+                    pxTx#put PX_close >>= loop
+                | _ ->
+                    assert (not true);
+                    loop ()
+                end
+            | PX_err error ->
+                G.load >>= begin function
+                | ES_closing e
+                | ES_unlinking e
+                | ES_duplex (e, _, _)
+                | ES_rsimplex (e, _)
+                | ES_wsimplex (e, _) ->
+                    e#close;
+                    er#put (IO_rx_error error)
+                end
+            | PX_close ->
+                G.load >>= begin function
+                | ES_unlinking e
+                | ES_duplex (e, _, _)
+                | ES_rsimplex (e, _)
+                | ES_wsimplex (e, _) ->
+                    assert (not true);
+                    Cf_cmonad.return ()
+                | ES_closing e ->
+                    e#close;
+                    er#put IO_rx_closed
+                end
+        in
+        loop ()
+    
+    let endpoint ~c s k =
+        let ec, er = c in
+        let ec = (ec :> endpoint_tx_t G.rx) in
+        let er = (er :> endpoint_rx_t G.tx) in
+        let s = new S.endpoint s in
+        Unix.set_nonblock s#fd;
+        G.duplex >>= fun ((rcRx, wcTx), (wcRx, rcTx)) ->
+        G.wire >>= fun px ->
+        let pxTx = new G.tx px in
+        let rxp = new e_rx_poll ~s ~er ~px:pxTx in
+        let txp = new e_tx_poll ~s ~px:pxTx in
+        let state = ES_duplex (s, rxp, txp) in
+        G.start (e_control_loop_ ~c ~rc:rcTx ~wc:wcTx ~px) state >>= fun () ->
+        G.poll ~c:rcRx ~e:rxp k >>= fun () ->
+        G.poll ~c:wcRx ~e:txp k
+end
+
+(*--- End of File [ iom_sock_stream.ml ] ---*)

iom/iom_sock_stream.mli

+(*---------------------------------------------------------------------------*
+  INTERFACE  iom_sock_stream.mli
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+type endpoint_tx_t =
+    | IO_tx_data of Cf_message.t	(* send data to peer *)
+    | IO_tx_release                 (* initiate orderly release *)
+    | IO_tx_extend of int           (* extend application receive window *)
+    | IO_tx_window of int           (* set application receive window *)
+ (* | IO_tx_peek of (Cf_message.t -> int option) peek until extend permitted *)
+    | IO_tx_reset                   (* abortive close *)
+    | IO_tx_unlink                  (* flush output and stop input *)
+
+type endpoint_rx_t =
+    | IO_rx_error of exn            (* socket closed on error condition *)
+    | IO_rx_data of Cf_message.t	(* data received from peer *)
+    | IO_rx_release                 (* peer initiated orderly release *)
+    | IO_rx_blocked                 (* transmit flow control *)
+    | IO_rx_ready                   (* ready for transmit *)
+    | IO_rx_closed                  (* orderly close complete *)
+    | IO_rx_unlinked                (* output flushed and input stopped *)
+
+module type T = sig
+    module S: Cf_sock_stream.T
+    
+    type connection_t = S.t * S.address_t * S.address_t
+
+    type initiator_rx_t =
+        | I_connect of connection_t
+        | I_error of exn
+    
+    val initiator:
+        c:initiator_rx_t #Iom_gadget.tx -> ?cancel:unit #Iom_gadget.rx ->
+        ?sock:S.t -> ?src:S.address_t -> S.address_t -> Iom_gadget.kernel_t ->
+        ('s, unit) Iom_gadget.t
+    
+    type listener_tx_t =
+        | L_extend of int
+        | L_limit of int
+        | L_close
+    
+    type listener_rx_t =
+        | L_bind of S.address_t
+        | L_connect of connection_t
+        | L_error of exn
+        
+    val listener:
+        c:(listener_tx_t #Iom_gadget.rx * listener_rx_t #Iom_gadget.tx) ->
+        ?sock:S.t -> ?qlen:int -> S.address_t -> Iom_gadget.kernel_t ->
+        ('s, unit) Iom_gadget.t
+    
+    val endpoint:
+        c:(endpoint_tx_t #Iom_gadget.rx * endpoint_rx_t #Iom_gadget.tx) -> 
+        S.t -> Iom_gadget.kernel_t -> ('s, unit) Iom_gadget.t
+end
+
+module Create(S: Cf_sock_stream.T): T with module S = S
+
+(*--- End of File [ iom_sock_stream.mli ] ---*)
+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  t_iom.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+Random.self_init ();;
+
+let signalF n = failwith (Printf.sprintf "signal %d" n);;
+for i = 1 to 31 do
+    ignore (Sys.signal Sys.sigpipe (Sys.Signal_handle signalF))
+done;;
+
+(*
+Gc.set {
+    (Gc.get ()) with
+    Gc.verbose = 0x3ff;
+    (* Gc.verbose = 0x14; *)
+};;
+
+Gc.create_alarm begin fun () ->
+    let min, pro, maj = Gc.counters () in
+    Printf.printf "[Gc] minor=%f promoted=%f major=%f\n" min pro maj;
+    flush stdout
+end
+*)
+
+module T1 = struct
+    module G = Iom_gadget
+    open Cf_cmonad.Op
+
+    module SR = Iom_sock_stream
+    module TCP_s = Cf_tcp4_socket
+    module TCP_r = Iom_tcp4_socket
+    module INADDR = Cf_ip4_addr
+    let any_address = Cf_ip4_addr.any
+    
+    (*
+    let xtag = "[T1.test] "
+    
+    let xprintf fmt =
+        print_string xtag;
+        Printf.printf fmt
+    
+    let xsprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
+    *)
+    
+    type top_event_t = [
+        | `Error of exn
+        | `Listen of TCP_s.address_t
+        | `Connect of TCP_s.t
+        | `Receive
+    ]
+        
+    let initiator_ ~top addr k =
+        let top = (top :> top_event_t G.tx) in
+        G.simplex >>= fun (rx, tx) ->
+        TCP_r.initiator ~c:tx addr k >>= fun () ->
+        let rx = (rx :> TCP_r.initiator_rx_t G.rx) in
+        let loop () =
+            G.guard begin
+                rx#get begin function
+                | TCP_r.I_connect (socket, _, _) -> top#put (`Connect socket)
+                | TCP_r.I_error error -> top#put (`Error error)
+                end
+            end
+        in
+        G.start (loop ()) ()
+    
+    let listener_ ~top k =
+        let top = (top :> top_event_t G.tx) in
+        let address = (any_address, 0 :> TCP_s.address_t) in
+        G.duplex >>= fun ((lr, lc), c) ->
+        TCP_r.listener ~c address k >>= fun () ->
+        let lr = (lr :> TCP_r.listener_rx_t G.rx) in
+        let lc = (lc :> TCP_r.listener_tx_t G.tx) in
+        let rec loop () =
+            G.guard begin
+                lr#get begin function
+                | TCP_r.L_error error ->
+                    top#put (`Error error)
+                | TCP_r.L_connect (socket, _, _) ->
+                    lc#put TCP_r.L_close >>= fun () ->
+                    top#put (`Connect socket)
+                | TCP_r.L_bind address ->
+                    lc#put (TCP_r.L_limit 1) >>= fun () ->
+                    top#put (`Listen address) >>= loop
+                end
+            end
+        in
+        G.start (loop ()) ()
+    
+    let exchange_ ~top msg socket k =
+        let top = (top :> top_event_t G.tx) in
+        G.duplex >>= fun ((er, ec), c) ->
+        TCP_r.endpoint ~c socket k >>= fun () ->
+        let ec = (ec :> SR.endpoint_tx_t G.tx) in
+        let rxmsg, txmsg = msg in
+        let initM =
+            ec#put (SR.IO_tx_extend (succ (String.length rxmsg))) >>= fun () ->
+            ec#put (SR.IO_tx_data (Cf_message.create txmsg))
+        in
+        G.start initM () >>= fun () ->
+        let er = (er :> SR.endpoint_rx_t G.rx) in
+        let rec loop () = G.guard (er#get rx_endpoint)
+        and rx_endpoint = function
+            | SR.IO_rx_data m ->
+                let str = Cf_message.contents m in
+                if str <> rxmsg then
+                    failwith "Unexpected message received."
+                else
+                    ec#put SR.IO_tx_release >>= loop
+            | SR.IO_rx_error error ->
+                top#put (`Error error)
+            | SR.IO_rx_closed ->
+                top#put `Receive
+            | _ ->
+                loop ()
+        in
+        G.start (loop ()) 0
+    
+    type state_t =
+        | S_init
+        | S_connecting
+        | S_receiving of int
+        | S_done
+    
+    let okay = ref false
+    
+    let reactor_ k =
+        G.wire >>= fun top ->
+        let topRx = new G.rx top in
+        let topTx = new G.tx top in
+        listener_ ~top:topTx k >>= fun () ->
+        let rec loop () = G.guard begin
+            topRx#get begin function
+            | `Error error ->
+                raise error
+            | `Listen addr ->
+                initiator_ ~top:topTx addr k >>= loop
+            | `Connect socket ->
+                G.load >>= begin function
+                | S_init ->
+                    G.store S_connecting >>= fun () ->
+                    exchange_ ~top:topTx ("foo", "bar") socket k >>= loop
+                | S_connecting ->
+                    G.store (S_receiving 2) >>= fun () ->
+                    exchange_ ~top:topTx ("bar", "foo") socket k >>= loop
+                | _ ->
+                    failwith "toploop_: `Connect state error"
+                end
+            | `Receive ->
+                G.load >>= begin function
+                | S_receiving n when n > 0 ->
+                    let n = pred n in
+                    let s =
+                        if n > 0 then
+                            S_receiving n
+                        else begin
+                            okay := true;
+                            S_done
+                        end
+                    in
+                    G.store s >>= loop
+                | _ ->
+                    failwith "toploop_: `Receive state error"
+                end
+            end
+        end in
+        G.start (loop ()) S_init
+    
+    let test () =
+        try
+            G.run reactor_ ();
+            if not !okay then failwith "reactor: processing completed early."
+        with
+        | Unix.Unix_error (error, fname, arg) ->
+            let msg =
+                let error = Unix.error_message error in
+                Printf.sprintf "Unix error \"%s\" in %s(%s).\n" error fname arg
+            in
+            failwith msg
+        | x ->
+            raise x
+end
+
+module T2 = struct
+    module G = Iom_gadget
+    open Cf_cmonad.Op
+
+    module SR = Iom_sock_stream
+    module TCP_s = Cf_tcp6_socket
+    module TCP_r = Iom_tcp6_socket
+    module INADDR = Cf_ip6_addr
+    let any_address = Cf_ip6_addr.unspecified
+    
+    (*
+    let xtag = "[T2.test] "
+    
+    let xprintf fmt =
+        print_string xtag;
+        Printf.printf fmt
+    
+    let xsprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
+    *)
+    
+    type top_event_t = [
+        | `Error of exn
+        | `Listen of TCP_s.address_t
+        | `Connect of TCP_s.t
+        | `Receive
+    ]
+        
+    let initiator_ ~top addr k =
+        let top = (top :> top_event_t G.tx) in
+        G.simplex >>= fun (rx, tx) ->
+        TCP_r.initiator ~c:tx addr k >>= fun () ->
+        let rx = (rx :> TCP_r.initiator_rx_t G.rx) in
+        let loop () = G.guard begin
+            rx#get begin function
+            | TCP_r.I_connect (socket, _, _) -> top#put (`Connect socket)
+            | TCP_r.I_error error -> top#put (`Error error)
+            end
+        end in
+        G.start (loop ()) ()
+    
+    let listener_ ~top k =
+        let top = (top :> top_event_t G.tx) in
+        let address = (any_address, 0 :> TCP_s.address_t) in
+        G.duplex >>= fun ((lr, lc), c) ->
+        TCP_r.listener ~c address k >>= fun () ->
+        let lr = (lr :> TCP_r.listener_rx_t G.rx) in
+        let lc = (lc :> TCP_r.listener_tx_t G.tx) in
+        let rec loop () = G.guard begin
+            lr#get begin function
+            | TCP_r.L_error error ->
+                top#put (`Error error)
+            | TCP_r.L_connect (socket, _, _) ->
+                lc#put TCP_r.L_close >>= fun () ->
+                top#put (`Connect socket)
+            | TCP_r.L_bind address ->
+                lc#put (TCP_r.L_limit 1) >>= fun () ->
+                top#put (`Listen address) >>= loop
+            end
+        end in
+        G.start (loop ()) ()
+    
+    let exchange_ ~top msg socket k =
+        let top = (top :> top_event_t G.tx) in
+        G.duplex >>= fun ((er, ec), c) ->
+        TCP_r.endpoint ~c socket k >>= fun () ->
+        let ec = (ec :> SR.endpoint_tx_t G.tx) in
+        let rxmsg, txmsg = msg in
+        let initM =
+            ec#put (SR.IO_tx_extend (succ (String.length rxmsg))) >>= fun () ->
+            ec#put (SR.IO_tx_data (Cf_message.create txmsg))
+        in
+        G.start initM () >>= fun () ->
+        let er = (er :> SR.endpoint_rx_t G.rx) in
+        let rec loop () = G.guard (er#get rx_endpoint)
+        and rx_endpoint = function
+            | SR.IO_rx_data m ->
+                let str = Cf_message.contents m in
+                if str <> rxmsg then
+                    failwith "Unexpected message received."
+                else
+                    ec#put SR.IO_tx_release >>= loop
+            | SR.IO_rx_error error ->
+                top#put (`Error error)
+            | SR.IO_rx_closed ->
+                top#put `Receive
+            | _ ->
+                loop ()
+        in
+        G.start (loop ()) 0
+    
+    type state_t =
+        | S_init
+        | S_connecting
+        | S_receiving of int
+        | S_done
+    
+    let okay = ref false
+    
+    let reactor_ k =
+        G.wire >>= fun top ->
+        let topRx = new G.rx top in
+        let topTx = new G.tx top in
+        listener_ ~top:topTx k >>= fun () ->
+        let rec loop () = G.guard begin
+            topRx#get begin function
+            | `Error error ->
+                raise error
+            | `Listen addr ->
+                initiator_ ~top:topTx addr k >>= loop
+            | `Connect socket ->
+                G.load >>= begin function
+                | S_init ->
+                    G.store S_connecting >>= fun () ->
+                    exchange_ ~top:topTx ("foo", "bar") socket k >>= loop
+                | S_connecting ->
+                    G.store (S_receiving 2) >>= fun () ->
+                    exchange_ ~top:topTx ("bar", "foo") socket k >>= loop
+                | _ ->
+                    failwith "toploop_: `Connect state error"
+                end
+            | `Receive ->
+                G.load >>= begin function
+                | S_receiving n when n > 0 ->
+                    let n = pred n in
+                    let s =
+                        if n > 0 then
+                            S_receiving n
+                        else begin
+                            okay := true;
+                            S_done
+                        end
+                    in
+                    G.store s >>= loop
+                | _ ->
+                    failwith "toploop_: `Receive state error"
+                end
+            end
+        end in
+        G.start (loop ()) S_init
+    
+    let test () =
+        try
+            G.run reactor_ ();
+            if not !okay then failwith "reactor: processing completed early."
+        with
+        | Unix.Unix_error (error, fname, arg) ->
+            let msg =
+                let error = Unix.error_message error in
+                Printf.sprintf "Unix error \"%s\" in %s(%s).\n" error fname arg
+            in
+            failwith msg
+        | x ->
+            raise x
+end
+
+let main () =
+    let tests = [
+        T1.test;
+        T2.test;
+    ] in
+    Printf.printf "1..%d\n" (List.length tests);
+    flush stdout;
+        
+    let test i f =
+        begin
+            try
+                f ();
+                Printf.printf "ok %d\n" i
+            with
+            | Failure(s) ->
+                Printf.printf "not ok %d (Failure \"%s\")\n" i s
+            | x ->
+                Printf.printf "not ok %d\n" i;
+                flush stdout;
+                raise x
+        end;
+        flush stdout;
+        succ i
+    in
+    let _ = List.fold_left test 1 tests in
+    exit 0
+;;
+
+main ();;
+
+
+(*--- End of File [ t_iom.ml ] ---*)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.