Commits

jhwoodyatt  committed 3eb6e2d

Initial submit of OCaml NAE BEEP Core.

  • Participants
  • Parent commits d19c347

Comments (0)

Files changed (1)

File beep/t/t_beep.ml

+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  t_beep.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+Random.self_init ();;
+
+(* let gc = Gc.get () in gc.Gc.verbose <- 0x91; Gc.set gc;; *)
+
+module type X_tag = sig
+    val tag: string
+end
+
+module X_create(T: X_tag) = struct
+    let tag = Printf.sprintf "[%s] " T.tag
+
+    let printf fmt =
+        print_string tag;
+        Printf.printf fmt
+    
+    let sprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
+end
+
+module T1 = struct
+    module X = X_create(struct let tag = "T1" end)
+    
+    let header channel msgno seqno = {
+        Beep_frame.channel = Int32.of_int channel;
+        Beep_frame.msgno = Int32.of_int msgno;
+        Beep_frame.seqno = Int32.of_int seqno;
+    }
+    
+    let payload ?(more = Beep_frame.Last) data = more, (Cf_message.create data)
+    
+    let frame: Beep_tcp_frame.t =
+        `ERR ((header 123 456 7890), (payload "FOO!\r\n"))
+    
+    let check ~chan ~size =
+        if chan <> Int32.of_int 123 then
+            raise (Beep_error.X Beep_error.X_channel_closed);
+        if size > 6 then
+            raise (Beep_error.X Beep_error.X_window_exceeded)
+    
+    (*
+    class msg_cursor m =
+        object
+            inherit Beep_frame.Parser.msg_cursor m as super
+            
+            method advance ch =
+                X.printf "msg_cursor#advance '%c'\n" ch;
+                flush stdout;
+                super#advance ch
+            
+            method shift ~pos =
+                X.printf "msg_cursor#shift ~pos:%d\n" pos;
+                flush stdout;
+                super#shift ~pos
+            
+            initializer
+                let s = Cf_message.contents m in
+                X.printf "msg_cursor#initializer:\n%s\n" s;
+                flush stdout
+        end
+    
+    class parser =
+        object
+            inherit [msg_cursor, Beep_tcp_frame.t] Beep_tcp_frame.Parser.core
+            
+            method private cursor = new msg_cursor
+        end
+    *)
+    
+    let test () =
+        (* let parser = new parser in *)
+        let p = Beep_tcp_frame.parse in
+        let m = Beep_tcp_frame.emit frame in
+        let m = Cf_message.flatten m in
+        match p#parse ~check m with
+        | Some (_, []) ->
+            ()
+        | Some _ ->
+            failwith "Frame.parse: trailing data not parsed"
+        | None ->
+            failwith "Frame.parse: no frame parsed"
+end
+
+module T2 = struct
+    module X = X_create(struct let tag = "T2" end)
+    
+    open Iom_reactor
+    open Cf_cmonad.Op
+    
+    module Sock_tcp = Cf_tcp6_socket
+    module Iom_tcp = Iom_tcp6_socket
+    module INADDR = Cf_ip6_addr
+    let any_address = INADDR.unspecified
+    
+    module E = Beep_exchange
+    module U = Beep_transport
+    module U_tcp = Beep_tcp6_endpoint
+    module C = Beep_channel
+
+    type top_event_t =
+        | Top_error of exn
+        | Top_listen of Sock_tcp.address_t
+        | Top_connect of Sock_tcp.t
+        | Top_final
+        
+    let initiator_ topTx addr k =
+        let topTx = (topTx :> top_event_t tx) in
+        simplex >>= fun (ctrlRx, ctrlTx) ->
+        Iom_tcp.initiator ~c:ctrlTx addr k >>= fun () ->
+        let ctrlRx = (ctrlRx :> Iom_tcp.initiator_rx_t rx) in
+        let guard () = guard begin
+            ctrlRx#get begin fun i ->
+                topTx#put begin
+                    match i with
+                    | Iom_tcp.I_connect (sock, _, _) -> Top_connect sock
+                    | Iom_tcp.I_error error -> Top_error error
+                end
+            end
+        end in
+        start (guard ()) ()
+    
+    let listener_ topTx k =
+        let topTx = (topTx :> top_event_t tx) in
+        let address = (any_address, 0 :> Sock_tcp.address_t) in
+        duplex >>= fun (c, (lRx, lTx)) ->
+        Iom_tcp.listener ~c address k >>= fun () ->
+        let lRx = (lRx :> Iom_tcp.listener_rx_t rx) in
+        let lTx = (lTx :> Iom_tcp.listener_tx_t tx) in
+        let rec loop () = guard begin
+            lRx#get begin function
+            | Iom_tcp.L_error error ->
+                topTx#put (Top_error error)
+            | Iom_tcp.L_connect (socket, _, _) ->
+                lTx#put Iom_tcp.L_close >>= fun () ->
+                topTx#put (Top_connect socket)
+            | Iom_tcp.L_bind address ->
+                lTx#put (Iom_tcp.L_limit 1) >>= fun () ->
+                topTx#put (Top_listen address) >>= loop
+            end
+        end in
+        start (loop ()) ()
+    
+    module E_foo = struct
+        open Cf_parser.Op
+        open Cf_lexer.Op
+                
+        type t = unit
+        
+        let weave_ =
+            object
+                inherit [unit] Beep_parsed_exchange.weaver
+                
+                val lexer_ = Cf_lexer.create !@[ !$"foo" $= () ]
+                
+                method private cursor _ = new Cf_lexer.cursor 0
+                
+                method parse _ m = lexer_ m
+            end
+        
+        let parse = weave_#weave
+        let emit pp () = Format.pp_print_string pp "foo"; E.default_header
+    end
+    
+    module P_foo = struct
+        let header_processing = `H_parse
+        module MSG = E_foo
+        module RPY = E_foo
+        module ERR = E_foo
+        module ANS = E_foo
+    end
+    
+    module M_foo = Beep_parsed_exchange.Create(P_foo)
+
+    class foo_client =
+        object
+            inherit M_foo.client () ()
+
+            method private rpy _ = Cf_cmonad.return ()
+            method private err _ = Cf_cmonad.return ()
+            method private ans _ = Cf_cmonad.return ()
+            method private nul = Cf_cmonad.return ()
+        end
+    
+    class foo_service =
+        object(self)
+            inherit M_foo.service ()
+            
+            method private msg rspTx event =
+                let rspTx = (rspTx :> 'rsp tx) in
+                match event with
+                | Cf_exnopt.U () -> rspTx#put (self#rpy ())
+                | Cf_exnopt.X _ -> rspTx#put (self#err ())
+        end
+
+    let e_profile_ = {
+        C.M_profile.uri = "uri";
+        C.M_profile.encoding = C.M_profile.E_none;
+        C.M_profile.content = "content";
+    }
+    
+    let e_start_ = {
+        C.M_start.serverName = None;
+        C.M_start.profiles = e_profile_, [];
+    }
+    
+    class client textPair doneTx =
+        let msgText, rpyText = textPair in
+        let doneTx = (doneTx :> unit tx) in
+        object(self:'self)
+            inherit C.profile
+            constraint 'self = #C.client
+            
+            method private control_ ctrlRx ctrlTx =
+                let ctrlRx = (ctrlRx :> C.control_rx_t rx) in
+                let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
+                let rec loop () = guard (get_ctrlRx_ ())
+                and get_ctrlRx_ () =
+                    ctrlRx#get begin function
+                        | C.C_rx_exchange exch ->
+                            let c = new foo_service in
+                            c#link exch >>= fun () ->
+                            let cMsg = C.M_close.cons 200 in
+                            ctrlTx#put (C.C_tx_close cMsg) >>= loop
+                        | C.C_rx_close closeMsg ->
+                            failwith
+                              (X.sprintf
+                                "client#reactor: incomplete! [C_rx_close]");
+                            Cf_cmonad.return ()
+                        | C.C_rx_error errorMsg ->
+                            failwith
+                              (X.sprintf
+                                "client#reactor: incomplete! [C_rx_error]");
+                            Cf_cmonad.return ()
+                        | C.C_rx_release ->
+                            doneTx#put ()
+                        | C.C_rx_abort ->
+                            failwith
+                              (X.sprintf
+                                "client#reactor: incomplete! [C_rx_abort]");
+                            Cf_cmonad.return ()
+                    end
+                in
+                start (loop ()) ()
+            
+            method connect startTx connectRx =
+                let startTx = (startTx :> C.M_start.t tx) in
+                let connectRx = (connectRx :> C.client_start_t rx) in
+                startTx#put e_start_ >>= fun () ->
+                let rec loop () =
+                    guard begin
+                        connectRx#get begin function
+                            | C.CS_profile (_, profileTx) ->
+                                profileTx#put (self :> C.profile)
+                            | C.CS_error errorMsg ->
+                                failwith (X.sprintf "client#connect: error");
+                                Cf_cmonad.return ()
+                        end
+                    end
+                in
+                start (loop ()) ()
+        end
+    
+    class service textPair =
+        let msgText, rpyText = textPair in
+        object(self:'self)
+            inherit C.profile
+            constraint 'self = #C.service
+            
+            method private control_ ctrlRx ctrlTx =
+                let ctrlRx = (ctrlRx :> C.control_rx_t rx) in
+                let ctrlTx = (ctrlTx :> C.control_tx_t tx) in
+                let rec loop () = guard (get_ctrlRx_ ())
+                and get_ctrlRx_ () =
+                    ctrlRx#get begin function
+                        | C.C_rx_close closeMsg ->
+                            ctrlTx#put C.C_tx_release
+                        | _ ->
+                            failwith
+                              (X.sprintf "service#reactor: incomplete! [1]");
+                            Cf_cmonad.return ()
+                    end
+                in
+                let c = new foo_client in
+                c#link >>= fun exch ->
+                ctrlTx#put (C.C_tx_exchange exch) >>= fun () ->
+                start (loop ()) ()
+            
+            method accept startMsg =
+                simplex >>= fun (accRx, accTx) ->
+                let accRx = (accRx :> C.service_start_t rx) in
+                let accTx = (accTx :> C.service_start_t tx) in
+                let profile = (self :> C.profile) in
+                accTx#put (C.SS_profile (e_profile_, profile)) >>= fun () ->
+                Cf_cmonad.return (Some accRx)
+        end
+    
+    let client_ topTx ctrlRx ctrlTx textPair =
+        let topTx = (topTx :> top_event_t tx) in
+        let ctrlRx = (ctrlRx :> U.control_rx_t rx) in
+        let ctrlTx = (ctrlTx :> U.control_tx_t tx) in
+        simplex >>= fun (doneRx, doneTx) ->
+        let doneRx = (doneRx :> unit rx) in
+        let doneTx = (doneTx :> unit tx) in
+        let rec loop () =
+            guard begin
+                ctrlRx#get get_ctrlRx_ >>= fun () ->
+                doneRx#get get_doneRx_
+            end
+        and get_ctrlRx_ = function
+            | U.C_rx_failure x ->
+                topTx#put (Top_error x)
+            | U.C_rx_greeting _
+            | U.C_rx_error _ ->
+                assert (not true);
+                loop ()
+            | U.C_rx_close closeMsg ->
+                failwith
+                (X.sprintf "client_/get_ctrlRx_: incomplete! [C_rx_close]")
+            | U.C_rx_reset ->
+                failwith
+                (X.sprintf "client_/get_ctrlRx_: incomplete! [C_rx_reset]")
+            | U.C_rx_final ->
+                topTx#put Top_final
+        and get_doneRx_ () =
+            ctrlTx#put (U.C_tx_close (C.M_close.cons 200)) >>= loop
+        in
+        let clnt = new client textPair doneTx in
+        ctrlTx#put (U.C_tx_start (clnt :> C.client)) >>= fun () ->
+        start (loop ()) `Initial
+    
+    let service_ topTx ctrlRx ctrlTx textPair =
+        let topTx = (topTx :> top_event_t tx) in
+        let ctrlRx = (ctrlRx :> U.control_rx_t rx) in
+        let ctrlTx = (ctrlTx :> U.control_tx_t tx) in
+        let rec loop () = guard (ctrlRx#get get_ctrlRx_)
+        and get_ctrlRx_ = function
+            | U.C_rx_failure x ->
+                topTx#put (Top_error x)
+            | U.C_rx_greeting _
+            | U.C_rx_error _ ->
+                assert (not true);
+                loop ()
+            | U.C_rx_close _ ->
+                ctrlTx#put U.C_tx_release >>= loop                
+            | U.C_rx_reset ->
+                failwith
+                (X.sprintf "service_/get_ctrlRx_: incomplete! [C_rx_reset]")
+            | U.C_rx_final ->
+                topTx#put Top_final
+        in
+        start (loop ()) `Initial
+
+    let greet_ topTx ctrlRx ctrlTx textPair role =
+        let topTx = (topTx :> top_event_t tx) in
+        let ctrlRx = (ctrlRx :> U.control_rx_t rx) in
+        let ctrlTx = (ctrlTx :> U.control_tx_t tx) in
+        let rec loop () = guard (ctrlRx#get get_ctrlRx_)
+        and get_ctrlRx_ = function
+            | U.C_rx_failure x ->
+                topTx#put (Top_error x)
+            | U.C_rx_greeting greetingMsg ->
+                if greetingMsg <> U.M_greeting.default then
+                    failwith
+                        (X.sprintf "greet_/get_ctrlRx_: unexpected content");
+                if role == U.R_initiator then
+                    client_ topTx ctrlRx ctrlTx textPair
+                else
+                    service_ topTx ctrlRx ctrlTx textPair
+            | U.C_rx_error errorMsg ->
+                failwith
+                (X.sprintf "greet_/get_ctrlRx_: incomplete! [C_rx_error]")
+            | U.C_rx_close _
+            | U.C_rx_reset
+            | U.C_rx_final ->
+                assert (not true);
+                Cf_cmonad.return ()
+        in
+        let greetCmd =
+            let srv = new service textPair in
+            U.C_tx_greeting (U.M_greeting.default, (srv :> C.service) :: [])
+        in
+        ctrlTx#put greetCmd >>= fun () ->
+        start (loop ()) `Initial
+
+    let exchange_ topTx role textPair socket k =
+        let topTx = (topTx :> top_event_t tx) in
+        Cf_socket.setsockopt socket Cf_ip_common.tcp_nodelay true;
+        U_tcp.create socket role k >>= fun (ctrlRx, ctrlTx) ->
+        let ctrlTx = (ctrlTx :> U.control_tx_t tx) in
+        greet_ topTx ctrlRx ctrlTx textPair role
+        
+    let okay_ = ref false
+
+    let reactor_ k =
+        simplex >>= fun (topRx, topTx) ->
+        let topRx = (topRx :> top_event_t rx) in
+        listener_ topTx k >>= fun () ->
+        let rec loop () = guard begin
+            topRx#get begin function
+            | Top_error error -> begin
+                try raise error with
+                | Beep_error.X berror ->
+                    let explain = Beep_error.to_string berror in
+                    failwith (X.sprintf "transport error: \"%s\"" explain)
+            end
+            | Top_listen addr ->
+                initiator_ topTx addr k >>= loop
+            | Top_connect socket ->
+                load >>= begin function
+                | `Init ->
+                    store `Connecting >>= fun () ->
+                    let pair = "foo\r\n", "bar\r\n" in
+                    exchange_ topTx U.R_initiator pair socket k >>= loop
+                | `Connecting ->
+                    store (`Receiving 2) >>= fun () ->
+                    let pair = "bar\r\n", "foo\r\n" in
+                    exchange_ topTx U.R_listener pair socket k >>= loop
+                | _ ->
+                    failwith "toploop_: `Connect state error"
+                end
+            | Top_final ->
+                load >>= begin function
+                | `Receiving n when n > 0 ->
+                    let n = pred n in
+                    let s =
+                        if n > 0 then
+                            `Receiving n
+                        else begin
+                            okay_ := true;
+                            `Done
+                        end
+                    in
+                    store s >>= loop
+                | _ ->
+                    failwith "toploop_: Top_final state error"
+                end
+            end
+        end in
+        start (loop ()) `Init
+
+    let test_aux i =
+        (*
+        X.printf ">>> loop %u\n" i;
+        Gc.print_stat stdout;
+        flush stdout;
+        *)
+        (*
+        print_char '.';
+        flush stdout;
+        *)
+        try
+            run reactor_ ();
+            if not !okay_ then
+                failwith "reactor: processing completed early.";
+            (*
+            X.printf "<<< loop %u\n\n" i;
+            flush stdout
+            *)
+        with
+        | Unix.Unix_error (error, fname, arg) ->
+            let msg =
+                let error = Unix.error_message error in
+                Printf.sprintf
+                    "Unix error \"%s\" in %s(%s).\n" error fname arg
+            in
+            failwith msg
+        | x ->
+            raise x
+    
+    let test () =
+        for i = 1 to 3 do test_aux i done
+end
+
+let main () =
+    let tests = [
+        T1.test; T2.test;
+    ] in
+    Printf.printf "1..%d\n" (List.length tests);
+    flush stdout;
+        
+    let test i f =
+        begin
+            try
+                f ();
+                Printf.printf "ok %d\n" i
+            with
+            | Failure s ->
+                Printf.printf "not ok %d (Failure \"%s\")\n" i s
+            | Beep_error.X error ->
+                let s = Beep_error.to_string error in
+                Printf.printf "not ok %d (Beep_error.X \"%s\")\n" i s
+            | x ->
+                Printf.printf "not ok %d\n" i;
+                flush stdout;
+                raise x
+        end;
+        flush stdout;
+        succ i
+    in
+    let _ = List.fold_left test 1 tests in
+    exit 0
+;;
+
+main ();;
+
+(*--- End of File [ t_beep.ml ] ---*)