Commits

Anonymous committed 75d27db

Rewrote the [endpoint] reactor to use less consing, i.e. the state is now a
couple of mutable objects derived from [Cf_poll.event]. Events are plumbed
around the inside of the reactor in shorter paths.

Comments (0)

Files changed (1)

iom/iom_sock_stream.ml

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
-(*
-let xtag = "[Iom_sock_stream] "
-
-let xprintf fmt =
-    print_string xtag;
-    Printf.printf fmt
-
-let xsprintf (fmt : ('a, unit, string) format) =
-    Printf.sprintf (Obj.magic (xtag ^ (Obj.magic fmt)))
-*)
+(**)
+let jout = Cf_journal.stdout
+(* let () = jout#setlimit `None *)
+(**)
 
 type endpoint_tx_t =
     | IO_tx_data of Cf_message.t	(* send data to peer *)
         | Aux_working of ('s, unit) G.t
         | Aux_final of ('s, unit) G.t
     
-    class virtual file_aux rwx e =
+    class virtual file_aux ctrlTx rwx e =
+        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
         let e = (e :> S.basic) in
         object(self:'self)
             inherit [(unit, unit) G.t] Cf_poll.file rwx e#fd
 
             val mutable limit_ = 0
+            val mutable finalized_ = false
             
-            method virtual private service_aux: 's. 's service_aux_t
+            method virtual private service_aux_: 's. 's service_aux_t
 
             method private service_ p =
-                match self#service_aux with
+                match self#service_aux_ with
                 | Aux_final v -> `Final v
                 | Aux_working v -> `Working (p, v)
                 | _ -> assert (not true); state_
 
-            method limit:
-                's 'c. c:(G.control_t #G.tx as 'c) -> int ->
-                    ('s, unit) G.t =
-                fun ~c n ->
-                    let c = (c :> G.control_t G.tx) in
+            method limit: 's. int -> ('s, unit) G.t = fun n ->
+                if finalized_ then
+                    Cf_cmonad.return ()
+                else begin
                     let n0 = limit_ in
                     limit_ <- n;
                     if n <= 0 && n0 > 0 then
-                        c#put G.C_unload
+                        ctrlTx#put G.C_unload
                     else if n > 0 && n0 <= 0 then begin
-                        match self#service_aux with
+                        match self#service_aux_ with
                         | Aux_load ->
-                            c#put G.C_load
+                            ctrlTx#put G.C_load
                         | Aux_unload ->
-                            c#put G.C_unload
+                            ctrlTx#put G.C_unload
                         | Aux_working v
                         | Aux_final v ->
-                            if limit_ <= 0 then
+                            if n <= 0 then
                                 v
                             else
                                 v >>= fun () ->
-                                c#put G.C_load
+                                ctrlTx#put G.C_load
                     end
                     else
                         Cf_cmonad.return ()
+                end
             
-            method extend:
-                's 'c. c:(G.control_t #G.tx as 'c) -> int ->
-                    ('s, unit) G.t =
-                fun ~c n ->
-                    self#limit ~c (limit_ + n)
+            method extend: 's. int -> ('s, unit) G.t = fun n ->
+                self#limit (limit_ + n)
+            
+            method finalize: 's. unit -> ('s, unit) G.t = fun () ->
+                finalized_ <- true;
+                ctrlTx#put G.C_unload >>= fun () ->
+                ctrlTx#put G.C_final
+            
+            method is_finished = finalized_
         end
     
     type connection_t = S.t * S.address_t * S.address_t
         Try_sig_connect (sock, laddr, raddr)
 
     let i_try_connect_ ~s =
-        let sock = s#socket in
         try
             s#connect;
             i_cons_connection_ ~s
         | error ->
             Try_sig_error error
             
-    class l_poll ~lr s =
+    class l_poll ~lr ctrlTx s =
         let lr = (lr :> listener_rx_t G.tx) in
         let s = (s :> S.listener) in
         object
-            inherit file_aux `R s
+            inherit file_aux ctrlTx `R s
                         
-            method private service_aux =
+            method private service_aux_ =
                 if limit_ <= 0 then
                     Aux_unload
                 else
             let poll = (poll :> l_poll) in
             match msg with
             | L_extend n ->
-                poll#extend ~c:rc n >>= loop
+                poll#extend n >>= loop
             | L_limit n ->
-                poll#limit ~c:rc n >>= loop
+                poll#limit n >>= loop
             | L_close ->
                 socket#close;
                 rc#put G.C_unload >>= fun () ->
             end;
             let address = s#getsockname in
             Unix.set_nonblock s#fd;
-            let e = new l_poll ~lr s in
+            G.simplex >>= fun (rcRx, rcTx) ->
+            let e = new l_poll ~lr rcTx s in
             lr#put (L_bind address) >>= fun () ->
-            G.simplex >>= fun (rcRx, rcTx) ->
             l_control_loop_ ~c ~rc:rcTx (s, e) >>= fun () ->
             G.poll ~c:rcRx ~e k
         with
         | error ->
             G.start (lr#put (L_error error)) ()
-        
-    type e_poll_t = PX_txr | PX_rxf | PX_err of exn | PX_close
+    
     
     type e_try_data_t =
         | Try_data_working of int
         | Try_data_error of exn
         | Try_data_pending
     
-    let e_try_recv_ ~s buf pos len =
-        let s = (s :> S.endpoint) in
+    let e_try_recv_ e buf pos len =
+        let e = (e :> S.endpoint) in
         try
-            let n = s#recv buf pos len in
+            let n = e#recv buf pos len in
             Try_data_working n
         with
         | Unix.Unix_error (code, _, _) when is_blocked_ code ->
             Try_data_pending
         | error ->
             Try_data_error error
-        
-    class e_rx_poll ~s ~er ~px =
+    
+    let rec e_msg_buf_pos_len_ = function
+        | _ :: _ :: _ as m ->
+            assert (not true);
+            e_msg_buf_pos_len_ (Cf_message.copy m)
+        | hd :: [] ->
+            hd
+        | [] ->
+            "", 0, 0
+    
+    let e_try_send_ ~s msg =
         let s = (s :> S.endpoint) in
-        let er = (er :> endpoint_rx_t G.tx) in
-        let px = (px :> e_poll_t G.tx) in
-        object(self)
-            inherit file_aux `R s
-            
+        let buf, pos, len = e_msg_buf_pos_len_ msg in
+        try
+            Try_data_working (s#send buf pos len)
+        with
+        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
+            Try_data_pending
+        | error ->
+            Try_data_error error
+    
+    type e_signal_t =
+        | Ev_error of exn
+        | Ev_rcvrel
+        | Ev_txsync
+    
+    class e_rx_poll ctrlTx sigTx upTx e =
+        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
+        let sigTx = (sigTx :> e_signal_t G.tx) in
+        let upTx = (upTx :> endpoint_rx_t G.tx) in
+        object(self:'self)
+            inherit file_aux ctrlTx `R e as super
+
             val mutable buffer_ = String.create 0
             
-            method private service_aux =
+            method private service_aux_ =
                 if limit_ <= 0 then
                     Aux_unload
                 else begin
                     if String.length buffer_ < limit_ then
                         buffer_ <- String.create limit_;
-                    match e_try_recv_ ~s buffer_ 0 limit_ with
+                    match e_try_recv_ e buffer_ 0 limit_ with
                     | Try_data_pending ->
                         Aux_load
                     | Try_data_error error ->
-                        Aux_final (px#put (PX_err error))
+                        Aux_final (sigTx#put (Ev_error error))
                     | Try_data_working n ->
                         if n = 0 then begin
                             buffer_ <- "";
                             limit_ <- 0;
-                            Aux_final (px#put PX_rxf)
+                            Aux_final (sigTx#put Ev_rcvrel)
                         end
                         else begin
                             let msg = [ String.sub buffer_ 0 n, 0, n ] in
-                            let v = er#put (IO_rx_data msg) in
+                            let v = upTx#put (IO_rx_data msg) in
                             if n < limit_ then begin
                                 limit_ <- limit_ - n;
                                 Aux_working v
                 end
         end
     
-    let rec e_msg_buf_pos_len_ = function
-        | _ :: _ :: _ as m ->
-            assert (not true);
-            e_msg_buf_pos_len_ (Cf_message.copy m)
-        | hd :: [] ->
-            hd
-        | [] ->
-            "", 0, 0
+    type e_tx_sync_t = Tx_open | Tx_releasing | Tx_unlinking | Tx_finalized
+    
+    class e_tx_poll ctrlTx sigTx upTx e =
+        let ctrlTx = (ctrlTx :> G.control_t G.tx) in
+        let sigTx = (sigTx :> e_signal_t G.tx) in
+        let upTx = (upTx :> endpoint_rx_t G.tx) in
+        object(self:'self)
+            inherit [(unit, unit) G.t] Cf_poll.file `W e#fd
+
+            val mutable queue_ = Cf_deque.nil
+            val mutable sync_ = Tx_open
             
-    let e_try_send_ ~s msg =
-        let s = (s :> S.endpoint) in
-        let buf, pos, len = e_msg_buf_pos_len_ msg in
-        try
-            Try_data_working (s#send buf pos len)
-        with
-        | Unix.Unix_error (code, _, _) when is_blocked_ code ->
-            Try_data_pending
-        | error ->
-            Try_data_error error
-            
-    class e_tx_poll ~s ~px =
-        let s = (s :> S.endpoint) in
-        let px = (px :> e_poll_t G.tx) in
-        object(self:'self)
-            inherit [(unit, unit) G.t] Cf_poll.file `W s#fd
-            
-            val mutable working_ = []
-
-            method private service_aux: 's. 's service_aux_t =
-                if working_ = [] then
-                    Aux_unload
-                else
-                    let working = Cf_message.flatten working_ in
-                    match e_try_send_ ~s working with
-                    | Try_data_pending ->
-                        Aux_load
-                    | Try_data_error error ->
-                        Aux_final (px#put (PX_err error))
-                    | Try_data_working n ->
-                        working_ <- Cf_message.shift ~pos:n working;
-                        let v = Cf_cmonad.return () in
-                        match working_ with
-                        | [] -> Aux_final v
-                        | _ -> Aux_working v
+            method private service_aux_: 's. 's service_aux_t =
+                let msg = Cf_deque.fold (fun a s -> s :: a) [] queue_ in
+                queue_ <- Cf_deque.nil;
+                let msg = Cf_message.flatten msg in
+                assert (msg <> []);
+                match e_try_send_ e msg with
+                | Try_data_pending ->
+                    queue_ <- Cf_deque.B.push (List.hd msg) queue_;
+                    Aux_load
+                | Try_data_error error ->
+                    Aux_final (sigTx#put (Ev_error error))
+                | Try_data_working n ->
+                    let msg = Cf_message.shift ~pos:n msg in
+                    let v = Cf_cmonad.return () in
+                    match msg with
+                    | [] ->
+                        Aux_final v
+                    | _ ->
+                        queue_ <- Cf_deque.B.push (List.hd msg) queue_;
+                        Aux_working v
 
             method private service_ p =
-                assert (working_ <> []);
-                let working = Cf_message.flatten working_ in
-                match e_try_send_ ~s working with
-                | Try_data_pending ->
+                match self#service_aux_ with
+                | Aux_working v ->
+                    `Working (p, v)
+                | Aux_final v ->
+                    begin
+                        match sync_ with
+                        | Tx_open ->
+                            `Final (upTx#put IO_rx_ready)
+                        | Tx_unlinking ->
+                            `Final (sigTx#put Ev_txsync)
+                        | _ ->
+                            assert (sync_ <> Tx_finalized);
+                            e#shutdown Unix.SHUTDOWN_SEND;
+                            `Final (sigTx#put Ev_txsync)
+                    end
+                | _ ->
                     assert (not true);
                     state_
-                | Try_data_error error ->
-                    `Final (px#put (PX_err error))
-                | Try_data_working n ->
-                    working_ <- Cf_message.shift ~pos:n working;
-                    match working_ with
-                    | [] -> `Final (px#put PX_txr)
-                    | _ -> `Working (p, Cf_cmonad.return ())
+                        
+            method push: 's. Cf_message.t -> ('s, bool) G.t = fun m ->
+                if m <> [] || sync_ <> Tx_open then begin
+                    queue_ <-
+                        List.fold_left (fun q s -> Cf_deque.A.push s q)
+                        queue_ m;
+                    if state_ <> `Unloaded then
+                        Cf_cmonad.return true
+                    else begin
+                        match self#service_aux_ with
+                        | Aux_load ->
+                            ctrlTx#put G.C_load >>= fun () ->
+                            Cf_cmonad.return false
+                        | Aux_unload ->
+                            assert (not true);
+                            Cf_cmonad.return true
+                        | Aux_working v ->
+                            (* v >>= fun () -> *)
+                            ctrlTx#put G.C_load >>= fun () ->
+                            Cf_cmonad.return false
+                        | Aux_final v ->
+                            (* v >>= fun () -> *)
+                            Cf_cmonad.return true
+                    end
+                end
+                else
+                    Cf_cmonad.return true
             
-            method push:
-                's 'c. c:(G.control_t #G.tx as 'c) -> Cf_message.t ->
-                    ('s, unit) G.t =
-                fun ~c m ->
-                    assert (m <> []);
-                    let c = (c :> G.control_t G.tx) in
-                    working_ <- working_ @ m;
-                    match self#service_aux with
-                    | Aux_load ->
-                        c#put G.C_load
-                    | Aux_unload ->
-                        assert (not true);
-                        Cf_cmonad.return ()
-                    | Aux_working v ->
-                        v >>= fun () ->
-                        c#put G.C_load
-                    | Aux_final v ->
-                        v
+            method finalize: 's. unit -> ('s, unit) G.t = fun () ->
+                assert (state_ = `Unloaded);
+                sync_ <- Tx_finalized;
+                ctrlTx#put G.C_unload >>= fun () ->
+                ctrlTx#put G.C_final
             
-            method release: 's. ('s, unit) G.t =
-                match working_ with
-                | [] ->  px#put PX_txr
-                | _ -> Cf_cmonad.return ()
+            method sync: 's. e_tx_sync_t -> ('s, bool) G.t = fun sync ->
+                if state_ <> `Unloaded then begin
+                    sync_ <- sync;
+                    Cf_cmonad.return false
+                end
+                else begin
+                    if sync = Tx_releasing then e#shutdown Unix.SHUTDOWN_SEND;
+                    self#finalize () >>= fun () ->
+                    Cf_cmonad.return true
+                end
+            
+            method sync_val = sync_
         end
     
-    type e_state_t =
-        | ES_duplex of S.endpoint * e_rx_poll * e_tx_poll
-        | ES_rsimplex of S.endpoint * e_rx_poll
-        | ES_wsimplex of S.endpoint * e_tx_poll
-        | ES_closing of S.endpoint
-        | ES_unlinking of S.endpoint
-        
-    let e_control_loop_ ~c ~rc ~wc ~px =
-        let ec, er = c in
-        let ec = (ec :> endpoint_tx_t G.rx) in
-        let er = (er :> endpoint_rx_t G.tx) in
-        let rc = (rc :> G.control_t G.tx) in
-        let wc = (wc :> G.control_t G.tx) in
-        let px = (px :> e_poll_t G.wire_t) in
-        let pxRx = new G.rx px and pxTx = new G.tx px in
+    let endpoint ~c s k =
+        let upRx, upTx = c in
+        let upRx = (upRx :> endpoint_tx_t G.rx) in
+        let upTx = (upTx :> endpoint_rx_t G.tx) in
+        let e = new S.endpoint s in
+        Unix.set_nonblock e#fd;
+        let fd: int = Obj.magic e#fd in
+        G.simplex >>= fun (sigRx, sigTx) ->
+        let sigRx = (sigRx :> e_signal_t G.rx) in
+        let sigTx = (sigTx :> e_signal_t G.tx) in
+        G.simplex >>= fun (ctrlRx, ctrlTx) ->
+        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
+        let rxPoll = new e_rx_poll ctrlTx sigTx upTx e in
+        G.poll ~c:ctrlRx ~e:rxPoll k >>= fun () ->
+        G.simplex >>= fun (ctrlRx, ctrlTx) ->
+        let ctrlRx = (ctrlRx :> G.control_t G.rx) in
+        let txPoll = new e_tx_poll ctrlTx sigTx upTx e in
+        G.poll ~c:ctrlRx ~e:txPoll k >>= fun () ->
         let rec loop () =
             G.guard begin
-                ec#get cmd_rx >>= fun () ->
-                pxRx#get poll_rx
+                upRx#get do_upRx_ >>= fun () ->
+                sigRx#get do_sigRx_
             end
-        and cmd_release state txp =
-            let txp = (txp :> e_tx_poll) in
-            G.store state >>= fun () ->
-            txp#release >>= fun () -> 
-            loop ()
-        and cmd_finalize_poll xc =
-            let xc = (xc :> G.control_t G.tx) in
-            xc#put G.C_unload >>= fun () ->
-            xc#put G.C_final
-        and cmd_reset e =
-            G.store (ES_closing e) >>= fun () ->
-            e#shutdown Unix.SHUTDOWN_ALL;
-            e#close;
-            Cf_cmonad.return ()
-        and cmd_rx = function
+        and do_upRx_ = function
+            | IO_tx_data msg ->
+                txPoll#push msg >>= fun idle ->
+                if idle then
+                    loop ()
+                else
+                    upTx#put IO_rx_blocked >>= loop
             | IO_tx_release ->
-                G.load >>= begin function
-                | ES_duplex (e, rxp, txp) ->
-                    cmd_release (ES_rsimplex (e, rxp)) txp
-                | ES_wsimplex (e, txp) ->
-                    cmd_release (ES_closing e) txp
-                | _ ->
-                    loop ()
+                txPoll#sync Tx_releasing >>= fun idle ->
+                if idle && rxPoll#is_finished then begin
+                    e#close;
+                    upTx#put IO_rx_closed
                 end
-            | IO_tx_data msg ->
-                if msg <> [] then
-                    G.load >>= begin function
-                    | ES_duplex (e, _, txp)
-                    | ES_wsimplex (e, txp) ->
-                        txp#push ~c:wc msg >>= loop
-                    | _ ->
-                        loop ()
-                    end
                 else
                     loop ()
+            | IO_tx_extend n ->
+                rxPoll#extend n >>= loop
             | IO_tx_window n ->
-                G.load >>= begin function
-                | ES_duplex (e, rxp, _)
-                | ES_rsimplex (e, rxp) ->
-                    rxp#limit ~c:rc n >>= loop
-                | _ ->
+                rxPoll#limit n >>= loop
+            | IO_tx_reset ->
+                rxPoll#finalize () >>= fun () ->
+                txPoll#finalize () >>= fun () ->
+                e#shutdown Unix.SHUTDOWN_ALL;
+                e#close;
+                Cf_cmonad.return ()
+            | IO_tx_unlink ->
+                rxPoll#finalize () >>= fun () ->
+                txPoll#sync Tx_unlinking >>= fun idle ->
+                if idle then begin
+                    txPoll#finalize () >>= fun () ->
+                    upTx#put IO_rx_unlinked
+                end
+                else
                     loop ()
+        and do_sigRx_ = function
+            | Ev_error x ->
+                rxPoll#finalize () >>= fun () ->
+                txPoll#finalize () >>= fun () ->
+                e#shutdown Unix.SHUTDOWN_ALL;
+                e#close;
+                upTx#put (IO_rx_error x)
+            | Ev_rcvrel ->
+                rxPoll#finalize () >>= fun () ->
+                begin
+                    match txPoll#sync_val with
+                    | Tx_open
+                    | Tx_unlinking ->
+                        upTx#put IO_rx_release >>= loop
+                    | Tx_releasing ->
+                        loop ()
+                    | Tx_finalized ->
+                        e#close;
+                        upTx#put IO_rx_closed
                 end
-            | IO_tx_extend n ->
-                G.load >>= begin function
-                | ES_duplex (e, rxp, _)
-                | ES_rsimplex (e, rxp) ->
-                    rxp#extend ~c:rc n >>= loop
-                | _ ->
+            | Ev_txsync ->
+                match txPoll#sync_val with
+                | Tx_unlinking ->
+                    rxPoll#finalize () >>= fun () ->
+                    upTx#put IO_rx_unlinked
+                | sync ->
+                    assert (sync <> Tx_open);
                     loop ()
-                end
-            | IO_tx_unlink ->
-                G.load >>= begin function
-                | ES_duplex (e, rxp, txp) ->
-                    rxp#limit ~c:rc 0 >>= fun () ->
-                    txp#release >>= fun () ->
-                    G.store (ES_unlinking e) >>= loop
-                | ES_rsimplex (e, rxp) ->
-                    rxp#limit ~c:rc 0 >>= fun () ->
-                    er#put IO_rx_unlinked
-                | ES_wsimplex (e, txp) ->
-                    txp#release >>= fun () ->
-                    G.store (ES_unlinking e) >>= loop
-                | _ ->
-                    loop ()
-                end
-            | IO_tx_reset ->
-                G.load >>= begin function
-                | ES_duplex (e, _, _) ->
-                    cmd_finalize_poll rc >>= fun () ->
-                    cmd_finalize_poll wc >>= fun () ->
-                    cmd_reset e
-                | ES_rsimplex (e, _) ->
-                    cmd_finalize_poll rc >>= fun () ->
-                    cmd_reset e
-                | ES_wsimplex (e, _) ->
-                    cmd_finalize_poll wc >>= fun () ->
-                    cmd_reset e
-                | ES_unlinking _ ->
-                    loop ()
-                | ES_closing _ ->
-                    pxTx#put PX_close >>= loop
-                end
-        and poll_rx event =
-            match event with
-            | PX_txr ->
-                G.load >>= begin function
-                | ES_duplex _
-                | ES_wsimplex _ ->
-                    er#put IO_rx_blocked >>= loop
-                | ES_rsimplex (e, _) ->
-                    e#shutdown Unix.SHUTDOWN_SEND;
-                    loop ()
-                | ES_unlinking _ ->
-                    er#put IO_rx_unlinked
-                | ES_closing _ ->
-                    loop ()
-                end
-            | PX_rxf ->
-                G.load >>= begin function
-                | ES_duplex (e, _, txs) ->
-                    G.store (ES_wsimplex (e, txs)) >>= fun () ->
-                    er#put IO_rx_release >>= loop
-                | ES_rsimplex (e, _) ->
-                    G.store (ES_closing e) >>= fun () ->
-                    pxTx#put PX_close >>= loop
-                | _ ->
-                    assert (not true);
-                    loop ()
-                end
-            | PX_err error ->
-                G.load >>= begin function
-                | ES_closing e
-                | ES_unlinking e
-                | ES_duplex (e, _, _)
-                | ES_rsimplex (e, _)
-                | ES_wsimplex (e, _) ->
-                    e#close;
-                    er#put (IO_rx_error error)
-                end
-            | PX_close ->
-                G.load >>= begin function
-                | ES_unlinking e
-                | ES_duplex (e, _, _)
-                | ES_rsimplex (e, _)
-                | ES_wsimplex (e, _) ->
-                    assert (not true);
-                    Cf_cmonad.return ()
-                | ES_closing e ->
-                    e#close;
-                    er#put IO_rx_closed
-                end
         in
-        loop ()
-    
-    let endpoint ~c s k =
-        let ec, er = c in
-        let ec = (ec :> endpoint_tx_t G.rx) in
-        let er = (er :> endpoint_rx_t G.tx) in
-        let s = new S.endpoint s in
-        Unix.set_nonblock s#fd;
-        G.duplex >>= fun ((rcRx, wcTx), (wcRx, rcTx)) ->
-        G.wire >>= fun px ->
-        let pxTx = new G.tx px in
-        let rxp = new e_rx_poll ~s ~er ~px:pxTx in
-        let txp = new e_tx_poll ~s ~px:pxTx in
-        let state = ES_duplex (s, rxp, txp) in
-        G.start (e_control_loop_ ~c ~rc:rcTx ~wc:wcTx ~px) state >>= fun () ->
-        G.poll ~c:rcRx ~e:rxp k >>= fun () ->
-        G.poll ~c:wcRx ~e:txp k
+        G.start (loop ()) ()
 end
 
 (*--- End of File [ iom_sock_stream.ml ] ---*)