Anonymous avatar Anonymous committed 08df12a

Checkpoint. An attempt at fixing the SASL cancel semantics. Some progress
on generalizing the BEEP core loopback test for reuse in multiple test
functions. (Not functional yet.)

Comments (0)

Files changed (3)

beep/beep_sasl.ml

 open Iom_reactor
 open Cf_cmonad.Op
 
-type 'id event_t =
-    | V_success of 'id * Sasl_security.codec_t option
-    | V_failed of Beep_channel.M_error.t
-    | V_error of exn
+type 'id control_rx_t =
+    | C_rx_success of 'id * Sasl_security.codec_t option
+    | C_rx_failed of Beep_channel.M_error.t
+    | C_rx_error of exn
+
+type client_tx_t =
+    | C_tx_close of Beep_channel.M_close.t
+    | C_tx_restart
 
 let x_bad_frame_type_ = Err.X Err.X_bad_frame_type
 let x_authentication_error_ = Err.X Err.X_authentication_error
 
-let rec client_exchange_ ?loc eventTx mechTx ctrlTx msg =
-    let eventTx = (eventTx :> 'id event_t tx) in
+let rec client_exchange_ ?loc ctrlTx mechTx chTx msg =
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
     let mechTx = (mechTx :> SaslM.client_tx_t tx) in
-    let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
+    let chTx = (chTx :> C.control_tx_t tx) in
     object(self)
         inherit Exchange.client msg ()
         
         method private abort_ code event =
             mechTx#put `Abort >>= fun () ->
             let ctrl = (C.M_close.cons ?loc code) in
-            ctrlTx#put (C.C_tx_close ctrl) >>= fun () ->
-            eventTx#put event
+            chTx#put (C.C_tx_close ctrl) >>= fun () ->
+            ctrlTx#put event
         
         method private rpy = function
             | Cf_exnopt.X x ->
-                self#abort_ Err.Std_code.general_syntax_error (V_error x)
+                self#abort_ Err.Std_code.general_syntax_error (C_rx_error x)
             | Cf_exnopt.U (text, status) ->
-                let msg = Cf_message.create text in
+                let rpy = Cf_message.create text in
                 match status with
                 | `Continue ->
-                    mechTx#put (`Continue msg)
+                    mechTx#put (`Continue rpy)
                 | `Complete ->
-                    let msgOpt = match msg with [] -> None | _ -> Some msg in
-                    mechTx#put (`Final msgOpt)
+                    let _, cmd = msg in
+                    if cmd <> `Abort then begin
+                        let msgOpt =
+                            match rpy with [] -> None | _ -> Some rpy
+                        in
+                        mechTx#put (`Final msgOpt)
+                    end
+                    else
+                        Cf_cmonad.return ()
                 | `Abort ->
                     let code = Err.Std_code.parameter_syntax_error in
                     let error = Err.X Err.X_authentication_error in
-                    self#abort_ code (V_error error)
+                    self#abort_ code (C_rx_error error)
         
         method private err: U.M_error.t Cf_exnopt.t -> ('s, unit) t = function
             | Cf_exnopt.X x ->
-                self#abort_ Err.Std_code.general_syntax_error (V_error x)
+                self#abort_ Err.Std_code.general_syntax_error (C_rx_error x)
             | Cf_exnopt.U error ->
-                self#abort_ Err.Std_code.success (V_failed error)
+                self#abort_ Err.Std_code.success (C_rx_failed error)
         
         method private frame_type_error_ () =
             let code = Err.Std_code.general_syntax_error in
-            let error = V_error (Err.X Err.X_bad_frame_type) in
+            let error = C_rx_error (Err.X Err.X_bad_frame_type) in
             self#abort_ code error
         
         method private ans _ = self#frame_type_error_ ()
         method private nul = self#frame_type_error_ ()
     end
 
-let service_exchange_ ?loc eventTx mechRx mechTx ctrlTx =
-    let eventTx = (eventTx :> 'id event_t tx) in
+let service_exchange_ ?loc ctrlTx mechRx mechTx chTx =
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
     let mechRx = (mechRx :> 'id SaslM.service_rx_t rx) in
     let mechTx = (mechTx :> SaslM.service_tx_t tx) in
-    let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
+    let chTx = (chTx :> C.control_tx_t tx) in
     object(self)
         inherit Exchange.service ()
         
         method private error_ rspTx code event =
             let rspTx = (rspTx :> 'rsp tx) in
             mechTx#put `Abort >>= fun () ->
-            eventTx#put event >>= fun () ->
+            ctrlTx#put event >>= fun () ->
             rspTx#put (self#err (C.M_error.cons ?loc code))
         
         method private continue_ rspTx mechEvt =
                     | None -> ""
                 in
                 rspTx#put (self#rpy (msg, `Complete)) >>= fun () ->
-                ctrlTx#put C.C_tx_reset >>= fun () ->
-                eventTx#put (V_success (id, secOpt))
+                chTx#put C.C_tx_reset >>= fun () ->
+                ctrlTx#put (C_rx_success (id, secOpt))
             | `Error x ->
                 let code = Err.Std_code.authentication_failed in
-                self#error_ rspTx code (V_error x)
+                self#error_ rspTx code (C_rx_error x)
 
         method private msg rspTx blobEvt =
             let rspTx = (rspTx :> 'rsp tx) in
             match blobEvt with
             | Cf_exnopt.X x ->
                 let code = Err.Std_code.authentication_failed in
-                self#error_ rspTx code (V_error x)
+                self#error_ rspTx code (C_rx_error x)
             | Cf_exnopt.U (msg, cmd) ->
                 let msg = Cf_message.create msg in
                 match cmd with
                 | `Complete ->
                     let code = Err.Std_code.parameter_syntax_error in
                     let x = Err.X Err.X_authentication_error in
-                    self#error_ rspTx code (V_error x)
+                    self#error_ rspTx code (C_rx_error x)
                 | `Abort ->
                     mechTx#put `Cancel >>= fun () ->
                     let code = Err.Std_code.success in
                     let errorMsg = C.M_error.cons ?loc code in
-                    eventTx#put (V_failed errorMsg) >>= fun () ->
+                    ctrlTx#put (C_rx_failed errorMsg) >>= fun () ->
                     rspTx#put (self#err errorMsg)
     end
 
     | Cf_seq.P (hd, tl) ->
         try NM.search hd cmap with Not_found -> select_mechanism_ cmap tl
 
-let client_ ?loc ?closeRx eventTx mechRx mechTx ctrlRx ctrlTx =
-    let closeRx = match closeRx with None -> new rx null | Some rx -> rx in
-    let closeRx = (closeRx :> C.M_close.t rx) in
-    let eventTx = (eventTx :> 'id event_t tx) in
-    let ctrlRx = (ctrlRx :> C.control_rx_t rx) in
-    let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
-    let mechRx = (mechRx :> 'id SaslM.client_rx_t rx) in
-    let mechTx = (mechTx :> SaslM.client_tx_t tx) in
+let client_ ~c ?loc ctrlRx ctrlTx mechRx mechTx chRx chTx =
+    let c = (c :> 'id SaslM.client) in
+    let ctrlRx = (ctrlRx :> client_tx_t rx) in
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
+    let chRx = (chRx :> C.control_rx_t rx) in
+    let chTx = (chTx :> C.control_tx_t tx) in
     let rec loop () =
+        load >>= fun state ->
         guard begin
-            ctrlRx#get get_ctrlRx_ >>= fun () ->
-            mechRx#get get_mechRx_ >>= fun () ->
-            closeRx#get get_closeRx_
+            chRx#get (do_chRx_ mechTx) >>= fun () ->
+            ctrlRx#get (do_ctrlRx_ mechTx) >>= fun () ->
+            match state with
+            | `Pending (mechRx, _) ->
+                let mechRx = (mechRx :> 'id SaslM.client_rx_t rx) in
+                mechRx#get (do_mechRx_ mechTx)
+            | `Canceled ->
+                Cf_cmonad.return ()
         end
-    and get_ctrlRx_ = function
+    and do_chRx_ mechTx = function
         | C.C_rx_exchange exch ->
             let obj = error_service_ ?loc Err.Std_code.service_unavailable in
             obj#link exch >>= loop
         | C.C_rx_close closeMsg ->
-            let finished = ctrlTx#put C.C_tx_release in
+            let finished = chTx#put C.C_tx_release in
             if closeMsg.C.M_close.code = Err.Std_code.success then
                 finished
             else
                     C.M_error.lang = closeMsg.C.M_close.lang;
                     C.M_error.content = closeMsg.C.M_close.content;
                 } in
-                eventTx#put (V_failed errorMsg) >>= fun () ->
+                ctrlTx#put (C_rx_failed errorMsg) >>= fun () ->
                 finished
         | C.C_rx_error errorMsg ->
-            eventTx#put (V_failed errorMsg)
+            ctrlTx#put (C_rx_failed errorMsg)
         | C.C_rx_release ->
             Cf_cmonad.return ()
         | C.C_rx_abort ->
+            let mechTx = (mechTx :> SaslM.client_tx_t tx) in
             mechTx#put `Abort
-    and get_mechRx_ = function
+    and do_mechRx_ mechTx = function
         | `Continue msg ->
-            do_continue_ (Cf_message.contents msg)
+            do_continue_ (Cf_message.contents msg) mechTx
         | `Cancel ->
-            let msg = "", `Abort in
-            let obj = client_exchange_ ?loc eventTx mechTx ctrlTx msg in
-            obj#link >>= fun exch ->
-            ctrlTx#put (C.C_tx_exchange exch) >>= loop
+            do_restart_ mechTx
         | `Prompt ->
-            do_continue_ ""
+            do_continue_ "" mechTx
         | `Done (id, codecOpt) ->
-            eventTx#put (V_success (id, codecOpt)) >>= fun () ->
-            ctrlTx#put C.C_tx_reset
+            ctrlTx#put (C_rx_success (id, codecOpt)) >>= fun () ->
+            chTx#put C.C_tx_reset
         | `Error x ->
             let closeMsg =
                 C.M_close.cons ?loc Err.Std_code.transaction_error
             in
-            ctrlTx#put (C.C_tx_close closeMsg) >>= fun () ->
-            eventTx#put (V_error x) >>= loop
-    and get_closeRx_ closeMsg =
-        ctrlTx#put (C.C_tx_close closeMsg) >>= loop
-    and do_continue_ msg =
+            chTx#put (C.C_tx_close closeMsg) >>= fun () ->
+            ctrlTx#put (C_rx_error x) >>= loop
+    and do_ctrlRx_ mechTx = function
+        | C_tx_close closeMsg ->
+            chTx#put (C.C_tx_close closeMsg) >>= loop
+        | C_tx_restart ->
+            do_restart_ mechTx
+    and do_restart_ mechTx =
+        let msg = "", `Abort in
+        let obj = client_exchange_ ?loc ctrlTx mechTx chTx msg in
+        obj#link >>= fun exch ->
+        chTx#put (C.C_tx_exchange exch) >>= fun () ->
+        store `Canceled >>= loop
+    and do_continue_ msg mechTx =
         let msg = msg, `Continue in
-        let obj = client_exchange_ ?loc eventTx mechTx ctrlTx msg in
+        let obj = client_exchange_ ?loc ctrlTx mechTx chTx msg in
         obj#link >>= fun exch ->
-        ctrlTx#put (C.C_tx_exchange exch) >>= loop
+        chTx#put (C.C_tx_exchange exch) >>= loop
     in
-    start (loop ()) ()
+    let mechRx = (mechRx :> 'id SaslM.client_rx_t rx) in
+    let mechTx = (mechTx :> SaslM.client_tx_t tx) in
+    start (loop ()) (`Pending (mechRx, mechTx))
 
 class ['id] client ~m ~g =
     let m = (m :> 'id SaslM.client list) in
         try List.find (fun c -> NS.member c#name gmap) m with
         | Not_found -> invalid_arg "Beep_sasl.client: empty mechanism list"
     in
-    fun ?srv ?loc ?close eventTx ->
+    fun ?srv ?loc ctrlRx ctrlTx ->
     let loc = (loc :> Beep_locale.t option) in
-    let close = (close :> C.M_close.t rx option) in
-    let eventTx = (eventTx :> 'id event_t tx) in
+    let ctrlRx = (ctrlRx :> client_tx_t rx) in
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
     let uri = uri_prefix_ ^ c#name in
     object(self:'self)
         constraint 'self = #C.client
             let mechTx = (mechTx :> SaslM.client_tx_t tx) in
             let profileTx = (profileTx :> C.profile tx) in
             mechTx#put `Abort >>= fun () ->
-            eventTx#put (V_error x) >>= fun () ->
+            ctrlTx#put (C_rx_error x) >>= fun () ->
             profileTx#put (new C.profile)
     
         method private cons_ profileTx =
             let profile = object
                 inherit C.profile
                 method private control_ =
-                    let closeRx = close in
-                    client_ ?loc ?closeRx eventTx mechRx mechTx
+                    client_ ~c ?loc ctrlRx ctrlTx mechRx mechTx
             end in
             profileTx#put profile
                 
                     self#profile_ profileMsg mechRx mechTx profileTx
                 | C.CS_error errorMsg ->
                     mechTx#put `Abort >>= fun () ->
-                    eventTx#put (V_failed errorMsg)
+                    ctrlTx#put (C_rx_failed errorMsg)
             and do_mechRx_ = function
                 | `Continue msg ->
                     do_startMsg_ (Some msg)
                 | `Cancel
                 | `Prompt ->
                     do_startMsg_ None
-                | `Error _
+                | `Error x ->
+                    ctrlTx#put (C_rx_error x)
                 | `Done _ ->
-                    J.stdout#fail
-                        "Beep_sasl.client#connect/do_mechRx_: incomplete!"
+                    ctrlTx#put (C_rx_error (Err.X Err.X_authentication_error))
             and do_startMsg_ blobOpt =
                 let blobMsg =
                     match blobOpt with
             start (loop ()) ()
     end
 
-let service_ ?loc eventTx mechRx mechTx ctrlRx ctrlTx =
+let service_ ?loc ctrlTx mechRx mechTx chRx chTx =
     let loc = (loc :> Beep_locale.t option) in
-    let eventTx = (eventTx :> 'id event_t tx) in
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
     let mechRx = (mechRx :> 'id SaslM.service_rx_t rx) in
     let mechTx = (mechTx :> SaslM.service_tx_t tx) in
-    let ctrlRx = (ctrlRx :> C.control_rx_t rx) in
-    let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
-    let rec loop () = guard (ctrlRx#get do_ctrlRx_)
-    and do_ctrlRx_ = function
+    let chRx = (chRx :> C.control_rx_t rx) in
+    let chTx = (chTx :> C.control_tx_t tx) in
+    let rec loop () = guard (chRx#get do_chRx_)
+    and do_chRx_ = function
         | C.C_rx_exchange exch ->
-            let obj = service_exchange_ ?loc eventTx mechRx mechTx ctrlTx in
+            let obj = service_exchange_ ?loc ctrlTx mechRx mechTx chTx in
             obj#link exch >>= loop
         | C.C_rx_close closeMsg ->
-            ctrlTx#put C.C_tx_release
+            chTx#put C.C_tx_release
         | C.C_rx_release
         | C.C_rx_error _ ->
             assert (not true);
 let compare_name_ name s =
     not (String.compare s#name name <> 0)
 
-class ['id] service ~m ?loc eventTx =
+class ['id] service ~m ?loc ctrlTx =
     let loc = (loc :> Beep_locale.t option) in
-    let eventTx = (eventTx :> 'id event_t tx) in
+    let ctrlTx = (ctrlTx :> 'id control_rx_t tx) in
     let doSrvRspError srvRspRx srvRspTx code =
         let srvRspRx = (srvRspRx :> C.service_start_t rx) in
         let srvRspTx = (srvRspTx :> C.service_start_t tx) in
         let cons profileMsg =
             let obj = object
                 inherit C.profile
-                method private control_ = service_ ?loc eventTx mechRx mechTx
+                method private control_ = service_ ?loc ctrlTx mechRx mechTx
             end in
             srvRspTx#put (C.SS_profile (profileMsg, obj))
         in
                 let obj =
                     object
                         inherit C.profile
-                        method private control_ _ ctrlTx =
-                            let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
-                            ctrlTx#put C.C_tx_reset
+                        method private control_ _ chTx =
+                            let chTx = (chTx :> C.control_tx_t tx) in
+                            chTx#put C.C_tx_reset
                     end
                 in
                 let msg =
                 in
                 let blob = M_blob.to_string (msg, `Complete) in
                 let profileMsg = C.M_profile.cons ~uri blob in
-                eventTx#put (V_success (id, secOpt)) >>= fun () ->
+                ctrlTx#put (C_rx_success (id, secOpt)) >>= fun () ->
                 srvRspTx#put (C.SS_profile (profileMsg, obj))
             | `Error x ->
                 let code = Err.Std_code.authentication_failed in
                 let errorMsg = C.M_error.cons ?loc code in
                 srvRspTx#put (C.SS_error errorMsg) >>= fun () ->
-                eventTx#put (V_failed errorMsg)
+                ctrlTx#put (C_rx_failed errorMsg)
         in
         start (loop ()) () >>= fun () ->
         Cf_cmonad.return (Some srvRspRx)
                     let uri = profileMsg.C.M_profile.uri in
                     match M_blob.of_string profileMsg.C.M_profile.content with
                     | Cf_exnopt.X x ->
-                        eventTx#put (V_error x) >>= fun () ->
+                        ctrlTx#put (C_rx_error x) >>= fun () ->
                         let code = Err.Std_code.general_syntax_error in
                         doSrvRspError srvRspRx srvRspTx code
                     | Cf_exnopt.U (blob, cmd : M_blob.t) ->

beep/beep_sasl.mli

 
 open Iom_reactor
 
-type 'id event_t =
-    | V_success of 'id * Sasl_security.codec_t option
-    | V_failed of Beep_channel.M_error.t
-    | V_error of exn
+type 'id control_rx_t =
+    | C_rx_success of 'id * Sasl_security.codec_t option
+    | C_rx_failed of Beep_channel.M_error.t
+    | C_rx_error of exn
+
+type client_tx_t =
+    | C_tx_close of Beep_channel.M_close.t
+    | C_tx_restart
 
 class ['id] client:
     m:'id #Sasl_mechanism.client list -> g:Beep_channel.M_profile.t list ->
-    ?srv:string -> ?loc:#Beep_locale.t -> ?close:Beep_channel.M_close.t #rx ->
-    'id event_t #tx -> Beep_channel.client
+    ?srv:string -> ?loc:#Beep_locale.t -> client_tx_t #rx ->
+    'id control_rx_t #tx -> Beep_channel.client
 
 class ['id] service:
     m:'id #Sasl_mechanism.service list -> ?loc:#Beep_locale.t ->
-    'id event_t #tx -> Beep_channel.service
+    'id control_rx_t #tx -> Beep_channel.service
 
 (*--- End of File [ beep_sasl.mli ] ---*)
 
 (* let gc = Gc.get () in gc.Gc.verbose <- 0x91; Gc.set gc;; *)
 
+(**)
+module J = Cf_journal
+(**)
+
+(*
 module type X_tag = sig
     val tag: string
 end
     let sprintf (fmt : ('a, unit, string) format) =
         Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
 end
+*)
 
 module T1 = struct
-    module X = X_create(struct let tag = "T1" end)
+    (* module X = X_create(struct let tag = "T1" end) *)
     
     let header channel msgno seqno = {
         Beep_frame.channel = Int32.of_int channel;
 end
 
 module T2 = struct
-    module X = X_create(struct let tag = "T2" end)
+    (* module X = X_create(struct let tag = "T2" end) *)
     
     open Iom_reactor
     open Cf_cmonad.Op
                             let cMsg = C.M_close.cons 200 in
                             ctrlTx#put (C.C_tx_close cMsg) >>= loop
                         | C.C_rx_close closeMsg ->
-                            failwith
-                              (X.sprintf
-                                "client#reactor: incomplete! [C_rx_close]");
-                            Cf_cmonad.return ()
+                            J.stdout#fail
+                                "client#reactor: incomplete! [C_rx_close]";
                         | C.C_rx_error errorMsg ->
-                            failwith
-                              (X.sprintf
-                                "client#reactor: incomplete! [C_rx_error]");
-                            Cf_cmonad.return ()
+                            J.stdout#fail
+                                "client#reactor: incomplete! [C_rx_error]";
                         | C.C_rx_release ->
                             doneTx#put ()
                         | C.C_rx_abort ->
-                            failwith
-                              (X.sprintf
-                                "client#reactor: incomplete! [C_rx_abort]");
-                            Cf_cmonad.return ()
+                            J.stdout#fail
+                                "client#reactor: incomplete! [C_rx_abort]";
                     end
                 in
                 start (loop ()) ()
                             | C.CS_profile (_, profileTx) ->
                                 profileTx#put (self :> C.profile)
                             | C.CS_error errorMsg ->
-                                failwith (X.sprintf "client#connect: error");
-                                Cf_cmonad.return ()
+                                J.stdout#fail "client#connect: incomplete!"
                         end
                     end
                 in
                         | C.C_rx_close closeMsg ->
                             ctrlTx#put C.C_tx_release
                         | _ ->
-                            failwith
-                              (X.sprintf "service#reactor: incomplete! [1]");
-                            Cf_cmonad.return ()
+                            J.stdout#fail "service#reactor: incomplete!"
                     end
                 in
                 let c = new foo_client in
                 assert (not true);
                 loop ()
             | U.C_rx_close closeMsg ->
-                failwith
-                (X.sprintf "client_/get_ctrlRx_: incomplete! [C_rx_close]")
+                J.stdout#fail "client_/get_ctrlRx_: incomplete! [C_rx_close]"
             | U.C_rx_reset ->
-                failwith
-                (X.sprintf "client_/get_ctrlRx_: incomplete! [C_rx_reset]")
+                J.stdout#fail "client_/get_ctrlRx_: incomplete! [C_rx_reset]"
             | U.C_rx_final ->
                 topTx#put Top_final
         and get_doneRx_ () =
             | U.C_rx_close _ ->
                 ctrlTx#put U.C_tx_release >>= loop                
             | U.C_rx_reset ->
-                failwith
-                (X.sprintf "service_/get_ctrlRx_: incomplete! [C_rx_reset]")
+                J.stdout#fail "service_/get_ctrlRx_: incomplete! [C_rx_reset]"
             | U.C_rx_final ->
                 topTx#put Top_final
         in
                 topTx#put (Top_error x)
             | U.C_rx_greeting greetingMsg ->
                 if greetingMsg <> U.M_greeting.default then
-                    failwith
-                        (X.sprintf "greet_/get_ctrlRx_: unexpected content");
+                    J.stdout#fail "greet_/get_ctrlRx_: unexpected content";
                 if role == U.R_initiator then
                     client_ topTx ctrlRx ctrlTx textPair
                 else
                     service_ topTx ctrlRx ctrlTx textPair
             | U.C_rx_error errorMsg ->
-                failwith
-                (X.sprintf "greet_/get_ctrlRx_: incomplete! [C_rx_error]")
+                J.stdout#fail "greet_/get_ctrlRx_: incomplete! [C_rx_error]"
             | U.C_rx_close _
             | U.C_rx_reset
             | U.C_rx_final ->
                 try raise error with
                 | Beep_error.X berror ->
                     let explain = Beep_error.to_string berror in
-                    failwith (X.sprintf "transport error: \"%s\"" explain)
+                    J.stdout#fail "transport error: \"%s\"" explain
             end
             | Top_listen addr ->
                 initiator_ topTx addr k >>= loop
         for i = 1 to 3 do test_aux i done
 end
 
+module type Loopback_Profile_T = sig
+    module Sock: Cf_sock_stream.T    
+
+    val any_address: Sock.address_t
+    val tcp_nodelay:
+        (bool, Sock.P.AF.tag_t, Sock.P.ST.tag_t) Cf_socket.sockopt_t
+end
+
+module type Loopback_T = sig
+    module Sock: Cf_sock_stream.T
+
+    type top_event_t =
+        | Top_error of exn
+        | Top_listen of Sock.address_t
+        | Top_connect of Sock.t
+        | Top_final
+
+    val test: unit -> unit
+end
+
+module Loopback(P: Loopback_Profile_T): Loopback_T = struct
+    open Iom_reactor
+    open Cf_cmonad.Op
+    
+    module Sock = P.Sock
+    module Iom = Iom_sock_stream.Create(P.Sock)
+
+    module U = Beep_transport
+    module UM = Beep_tcp_mapping.Create(P.Sock)
+    
+    type top_event_t =
+        | Top_error of exn
+        | Top_listen of Sock.address_t
+        | Top_connect of Sock.t
+        | Top_final
+
+    let client_ topTx ctrlRx ctrlTx =
+        J.stdout#fail "Loopback.client_: unimplemented!"
+
+    let server_ topTx ctrlRx ctrlTx =
+        J.stdout#fail "Loopback.server_: unimplemented!"
+
+    let initiator_ topTx addr k =
+        let topTx = (topTx :> top_event_t tx) in
+        simplex >>= fun (ctrlRx, ctrlTx) ->
+        Iom.initiator ~c:ctrlTx addr k >>= fun () ->
+        let ctrlRx = (ctrlRx :> Iom.initiator_rx_t rx) in
+        let guard () = guard begin
+            ctrlRx#get begin fun i ->
+                topTx#put begin
+                    match i with
+                    | Iom.I_connect (sock, _, _) -> Top_connect sock
+                    | Iom.I_error error -> Top_error error
+                end
+            end
+        end in
+        start (guard ()) ()
+    
+    let listener_ topTx k =
+        let topTx = (topTx :> top_event_t tx) in
+        duplex >>= fun (c, (lRx, lTx)) ->
+        Iom.listener ~c P.any_address k >>= fun () ->
+        let lRx = (lRx :> Iom.listener_rx_t rx) in
+        let lTx = (lTx :> Iom.listener_tx_t tx) in
+        let rec loop () = guard begin
+            lRx#get begin function
+            | Iom.L_error error ->
+                topTx#put (Top_error error)
+            | Iom.L_connect (socket, _, _) ->
+                lTx#put Iom.L_close >>= fun () ->
+                topTx#put (Top_connect socket)
+            | Iom.L_bind address ->
+                lTx#put (Iom.L_limit 1) >>= fun () ->
+                topTx#put (Top_listen address) >>= loop
+            end
+        end in
+        start (loop ()) ()
+
+    let reactor_ okayRef k =
+        simplex >>= fun (topRx, topTx) ->
+        let topRx = (topRx :> top_event_t rx) in
+        listener_ topTx k >>= fun () ->
+        let rec loop () =
+            guard (topRx#get do_topRx_)
+        and do_topRx_ = function
+            | Top_error error ->
+                raise error
+            | Top_listen addr ->
+                initiator_ topTx addr k >>= loop
+            | Top_connect socket ->
+                do_topRx_connect socket
+            | Top_final ->
+                do_topRx_final ()
+        and do_topRx_connect socket =
+            Cf_socket.setsockopt socket P.tcp_nodelay true;
+            load >>= function
+            | `Init ->
+                store `Connecting >>= fun () ->
+                UM.create socket U.R_listener k >>= fun (ctrlRx, ctrlTx) ->
+                client_ topTx ctrlRx ctrlTx >>= loop
+            | `Connecting ->
+                store (`Receiving 2) >>= fun () ->
+                UM.create socket U.R_initiator k >>= fun (ctrlRx, ctrlTx) ->
+                server_ topTx ctrlRx ctrlTx >>= loop
+            | _ ->
+                assert (not true);
+                Cf_cmonad.return ()
+        and do_topRx_final () =
+            load >>= function
+            | `Receiving n when n > 0 ->
+                let n = pred n in
+                let s =
+                    if n > 0 then
+                        `Receiving n
+                    else begin
+                        okayRef := true;
+                        `Done
+                    end
+                in
+                store s >>= loop
+            | _ ->
+                assert (not true);
+                Cf_cmonad.return ()
+        in
+        start (loop ()) `Init
+
+    let test () =
+        let okay = ref false in
+        try
+            run (reactor_ okay) ();
+            if not !okay then
+                J.stdout#fail "reactor: processing completed early.";
+        with
+        | Unix.Unix_error (error, fname, arg) ->
+            let error = Unix.error_message error in
+            J.stdout#fail
+                "Unix error \"%s\" in %s(%s).\n" error fname arg
+        | Sasl_error.X error ->
+            J.stdout#fail "SASL error \"%s\"" (Sasl_error.to_string error)
+        | Beep_error.X error ->
+            J.stdout#fail "BEEP error \"%s\"" (Beep_error.to_string error)
+        | x ->
+            raise x
+end
+
+module T3_profile: Loopback_Profile_T = struct
+    module Sock = Cf_tcp6_socket
+    let any_address = (Cf_ip6_addr.unspecified, 0 :> Sock.address_t)
+    let tcp_nodelay = Cf_ip_common.tcp_nodelay
+end
+
+module T3 = Loopback(T3_profile)
+
 let main () =
     let tests = [
-        T1.test; T2.test;
+        T1.test; T2.test; T3.test
     ] in
     Printf.printf "1..%d\n" (List.length tests);
     flush stdout;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.