1. james woodyatt
  2. oni

Commits

jhwoodyatt  committed dca474e

Initial submit of OCaml NAE BEEP Core.

  • Participants
  • Parent commits 5af157f
  • Branches default

Comments (0)

Files changed (2)

File beep/beep_entity_exchange.ml

View file
+(*---------------------------------------------------------------------------*
+  IMPLEMENTATION  beep_entity_exchange.ml
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+(**)
+module type X_tag = sig
+    val tag: string
+end
+
+module X_create(T: X_tag) = struct
+    let tag = Printf.sprintf "[%s] " T.tag
+
+    let printf fmt =
+        print_string tag;
+        Printf.printf fmt
+    
+    let sprintf (fmt : ('a, unit, string) format) =
+        Printf.sprintf (Obj.magic (tag ^ (Obj.magic fmt)))
+end
+
+module X = X_create(struct let tag = "Beep_entity_exchange" end)
+(**)
+
+open Cf_cmonad.Op
+open Iom_reactor
+
+module Err = Beep_error
+module F = Beep_frame
+module E = Beep_exchange
+module FE = Beep_frame_exchange
+
+module N_map = Cf_rbtree.Map(Cf_ordered.Int_order)
+
+type entity_t = Beep_exchange.header_t * Cf_message.t
+type cancel_t = Cancel
+type rx_event_t = [ `Entity of entity_t | `Error of exn ]
+type 'a response_t = [ `RPY of 'a | `ERR of 'a | `ANS of 'a | `NUL ]
+
+type ('up, 'dn) c_state_t =
+    | C_initial of 'dn E.response_t rx * 'up response_t tx
+    | C_single of 'dn rx * ('up -> 'up response_t) * E.header_t * Cf_message.t
+    | C_multi of
+        ('dn rx * E.header_t * Cf_message.t) N_map.t *
+        (int * 'dn E.answer_t rx) option
+    constraint 'up = [> rx_event_t ]
+    constraint 'dn = [< FE.entity_rx_t ]
+
+let client:
+    ?initial:unit -> ?lim:int -> ?header:Beep_exchange.header_t ->
+    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
+    (cancel_t #rx * [> [> rx_event_t ] response_t ] #tx) ->
+    ([< [< Beep_frame_exchange.entity_rx_t ] Beep_exchange.response_t ] #rx *
+        [> Beep_frame_exchange.client_tx_t ] #tx) ->
+    ('s, unit) t
+= fun ?initial ?(lim = 4096) ?header msg hp (cancelRx, rpyTx) (dnRx, dnTx) ->
+    let cancelRx = (cancelRx :> cancel_t rx) in
+    let rpyTx = (rpyTx :> 'up response_t tx) in
+    let dnTx = (dnTx :> FE.client_tx_t tx) in
+    
+    (*--- Put the MSG (unless this is the initial exchange of a session) ---*)
+    begin
+        match initial with
+        | Some () ->
+            if msg != [] then invalid_arg (X.sprintf "client: initial MSG");
+            Cf_cmonad.return ()
+        | None ->
+            begin
+                match header with
+                | Some h -> dnTx#put (`Header h)
+                | None -> Cf_cmonad.return ()
+            end >>= fun () ->
+            dnTx#put (`Payload (F.Last, msg))
+    end >>= fun () ->
+    
+    (*--- Main loop ---*)
+    let rec loop () =
+        load >>= fun s ->
+        guard begin
+            cancelRx#get (fun Cancel -> dnTx#put `Abort) >>= fun () ->
+            match s with
+            | C_initial (dnRx, rpyTx) ->
+                get_dnRx_ dnRx rpyTx
+            | C_single (msgRx, consF, header, payload) ->
+                get_msgRx_ msgRx consF header payload
+            | C_multi (ansMap, ansOpt) ->
+                begin
+                    match ansOpt with
+                    | None ->
+                        Cf_cmonad.return ()
+                    | Some (ansN, ansRx) ->
+                        get_ansRx0_ ansMap ansN ansRx
+                end >>= fun () ->
+                let mapF (n, (dnRx, header, payload)) =
+                    get_ansRxN_ n dnRx header payload ansMap ansOpt
+                in
+                let z = N_map.to_seq_incr ansMap in
+                let z = Cf_seq.map mapF z in
+                Cf_seq.C.sequence z
+        end
+    
+    (*--- Get an incoming response event ---*)
+    and get_dnRx_ dnRx rpyTx =
+        let dnRx = (dnRx :> 'dn E.response_t rx) in
+        let rpyTx = (rpyTx :> 'up response_t tx) in
+        dnRx#get begin function
+            | `RPY msgRx ->
+                do_dnRx_single_ (fun v -> `RPY v) msgRx
+            | `ERR msgRx ->
+                do_dnRx_single_ (fun v -> `ERR v) msgRx
+            | `ANS ansRx ->
+                store (C_multi (N_map.nil, Some (0, ansRx))) >>= loop
+            | `NUL ->
+                rpyTx#put `NUL
+        end
+    
+    (*--- Do the RPY/ERR response ---*)
+    and do_dnRx_single_ consF msgRx =
+        store (C_single (msgRx, consF, E.default_header, [])) >>= loop
+        
+    (*--- Get an incoming RPY/ERR frame event ---*)
+    and get_msgRx_ msgRx consF header payload =
+        let msgRx = (msgRx :> 'dn rx) in
+        msgRx#get begin function
+            | `Header h ->
+                store (C_single (msgRx, consF, h, payload)) >>= loop
+            | `Error x ->
+                rpyTx#put (consF (`Error x))
+            | `Payload (more, msg) ->
+                let payload = payload @ msg in
+                match more with
+                | F.More when Cf_message.length payload > lim ->
+                    rpyTx#put (consF (`Error (Err.X Err.X_entity_too_large)))
+                | F.More ->
+                    store (C_single (msgRx, consF, header, payload)) >>= loop
+                | F.Last ->
+                    rpyTx#put (consF (`Entity (header, payload)))
+        end
+    
+    (*--- Get a new incoming ANS entity or NUL frame ---*)
+    and get_ansRx0_ ansMap ansN ansRx =
+        let ansRx = (ansRx :> 'dn E.answer_t rx) in
+        ansRx#get begin function
+            | E.Ans_final sync ->
+                assert (N_map.empty ansMap);
+                start sync () >>= fun () ->
+                rpyTx#put `NUL
+            | E.Ans_entity dnRx ->
+                let ansTup = dnRx, E.default_header, [] in
+                let ansMap = N_map.replace (ansN, ansTup) ansMap in
+                let ansN = succ ansN in
+                store (C_multi (ansMap, Some (ansN, ansRx))) >>= loop
+        end
+    
+    (*--- Get an ANS frame ---*)
+    and get_ansRxN_ ansN dnRx header payload ansMap ansOpt =
+        let dnRx = (dnRx :> 'dn rx) in
+        dnRx#get begin function
+            | `Header h ->
+                do_ansRxN_push_ ansN dnRx h payload ansMap ansOpt
+            | `Error x ->
+                rpyTx#put (`ANS (`Error x)) >>= fun () ->
+                let ansMap = N_map.delete ansN ansMap in
+                store (C_multi (ansMap, ansOpt)) >>= loop
+            | `Payload (more, msg) ->
+                let payload = payload @ msg in
+                match more with
+                | F.More ->
+                    do_ansRxN_push_ ansN dnRx header payload ansMap ansOpt
+                | F.Last ->
+                    rpyTx#put (`ANS (`Entity (header, payload))) >>= fun () ->
+                    let ansMap = N_map.delete ansN ansMap in
+                    store (C_multi (ansMap, ansOpt)) >>= loop                
+        end
+    
+    (*--- Store an ANS frame with the '*' continuation indicator ---*)
+    and do_ansRxN_push_ ansN dnRx header payload ansMap ansOpt =
+        let ansTup = dnRx, header, payload in
+        let ansMap = N_map.replace (ansN, ansTup) ansMap in
+        store (C_multi (ansMap, ansOpt)) >>= loop
+    in
+    
+    (*--- Start the main loop ---*)
+    let dnRx = (dnRx :> 'dn E.response_t rx) in
+    start (loop ()) (C_initial (dnRx, rpyTx))
+
+type 'dn s_state_t =
+    | S_receiving of
+        entity_t response_t F.handle_t rx * FE.entity_rx_t rx * E.header_t *
+        Cf_message.t
+    | S_ready of entity_t response_t F.handle_t rx
+    | S_answering of entity_t response_t F.handle_t rx * 'dn E.answer_t tx
+    constraint 'dn = [> FE.service_tx_t ]
+
+let service:
+    ?initial:unit -> ?lim:int ->
+    [< Beep_frame_exchange.header_processing_t ] ->
+    ([< entity_t response_t ] Beep_frame.handle_t #rx * [> rx_event_t ] #tx) ->
+    ([< Beep_frame_exchange.entity_rx_t ] #rx *
+        [> [> Beep_frame_exchange.service_tx_t ]
+        Beep_exchange.response_t ] #tx) ->
+    ('s, unit) t
+= fun ?initial ?(lim = 4096) hp (ctrlRx, ctrlTx) (dnRx, dnTx) ->
+    let hp = (hp :> FE.header_processing_t) in
+    let ctrlRx = (ctrlRx :> entity_t response_t F.handle_t rx) in
+    let ctrlTx = (ctrlTx :> rx_event_t tx) in
+    let dnTx = (dnTx :> 'dn E.response_t tx) in
+    
+    (*--- The main loop ---*)
+    let rec loop () =
+        load >>= fun s ->
+        guard begin
+            match s with
+            | S_receiving (ctrlRx, dnRx, h, p) ->
+                get_dnRx_ ctrlRx dnRx h p
+            | S_ready ctrlRx ->
+                get_ctrlRx_ ctrlRx
+            | S_answering (ctrlRx, ansTx) ->
+                get_ctrlRx_ans_ ctrlRx ansTx
+        end
+        
+    (*--- Get a MSG entity event ---*)
+    and get_dnRx_ ctrlRx dnRx h p =
+        let dnRx = (dnRx :> FE.entity_rx_t rx) in
+        dnRx#get begin function
+            | `Header h ->
+                store (S_receiving (ctrlRx, dnRx, h, p)) >>= loop
+            | `Error x ->
+                ctrlTx#put (`Error x)
+            | `Payload (more, msg) ->
+                match more with
+                | F.Last ->
+                    store (S_ready ctrlRx) >>= fun () ->
+                    ctrlTx#put (`Entity (h, p @ msg)) >>= loop
+                | F.More ->
+                    let p = p @ msg in
+                    if Cf_message.length p > lim then
+                        ctrlTx#put (`Error (Err.X Err.X_entity_too_large))
+                    else
+                        store (S_receiving (ctrlRx, dnRx, h, p @ msg)) >>= loop
+        end
+    
+    (*--- Get a control event ---*)
+    and get_ctrlRx_ ctrlRx =
+        let ctrlRx = (ctrlRx :> entity_t response_t F.handle_t rx) in
+        ctrlRx#get begin fun rh ->
+            match rh.F.h_data with
+            | `RPY (h, p) ->
+                let consF msgRx = `RPY msgRx in
+                do_ctrlRx_single_ consF rh.F.h_ack h p
+            | `ERR (h, p) ->
+                let consF msgRx = `ERR msgRx in
+                do_ctrlRx_single_ consF rh.F.h_ack h p
+            | `ANS (h, p) ->
+                simplex >>= fun (ansRx, ansTx) ->
+                dnTx#put (`ANS ansRx) >>= fun () ->
+                store (S_answering (ctrlRx, ansTx)) >>= fun () ->
+                do_ctrlRx_ans_ ansTx rh.F.h_ack h p >>= loop
+            | `NUL ->
+                dnTx#put `NUL
+        end
+    
+    (*--- Get a RPY/ERR entity ---*)
+    and do_ctrlRx_single_ consF sync h p =
+        simplex >>= fun (msgRx, msgTx) ->
+        let msgTx = (msgTx :> 'dn tx) in
+        begin
+            match hp with
+            | `H_leave -> Cf_cmonad.return ()
+            | _ -> msgTx#put (`Header h)
+        end >>= fun () ->
+        let ph = { F.h_data = F.Last, p; F.h_ack = sync } in
+        msgTx#put (`Payload ph) >>= fun () ->
+        dnTx#put (consF msgRx)
+    
+    (*--- Get a control event (answering) ---*)
+    and get_ctrlRx_ans_ ctrlRx ansTx =
+        let ctrlRx = (ctrlRx :> entity_t response_t F.handle_t rx) in
+        let ansTx = (ansTx :> 'dn E.answer_t tx) in
+        ctrlRx#get begin fun rh ->
+            match rh.F.h_data with
+            | `RPY _
+            | `ERR _ ->
+                loop ()
+            | `NUL ->
+                let sync =
+                    match rh.F.h_ack with
+                    | None -> Cf_cmonad.return ()
+                    | Some r -> r
+                in
+                ansTx#put (E.Ans_final sync)
+            | `ANS (h, p) ->
+                do_ctrlRx_ans_ ansTx rh.F.h_ack h p >>= loop
+        end
+    
+    (*--- Do ANS entity ---*)
+    and do_ctrlRx_ans_ ansTx sync h p =
+        let ansTx = (ansTx :> 'dn E.answer_t tx) in
+        simplex >>= fun (msgRx, msgTx) ->
+        let msgTx = (msgTx :> 'dn tx) in
+        begin
+            match hp with
+            | `H_leave -> Cf_cmonad.return ()
+            | _ -> msgTx#put (`Header h)
+        end >>= fun () ->
+        let ph = { F.h_data = F.Last, p; F.h_ack = sync } in
+        msgTx#put (`Payload ph) >>= fun () ->
+        ansTx#put (E.Ans_entity msgRx)
+    in
+    
+    (*--- Start the main loop ---*)
+    let ctrlRx = (ctrlRx :> entity_t response_t F.handle_t rx) in
+    let state0 =
+        match initial with
+        | Some () ->
+            S_ready ctrlRx
+        | None ->
+            let dnRx = (dnRx :> FE.entity_rx_t rx) in
+            S_receiving (ctrlRx, dnRx, E.default_header, [])
+    in
+    start (loop ()) state0
+
+class virtual client ?initial ?lim ?header m hp =
+    object(self)
+        inherit FE.client ?initial hp
+        
+        method private virtual entity:
+            's. cancel_t tx -> rx_event_t response_t rx -> ('s, unit) t
+
+        method private frame dnTx dnRx =
+            duplex >>= fun ((ctrlRx, ctrlTx), ctrlIO) ->
+            let ctrlRx = (ctrlRx :> rx_event_t response_t rx) in
+            self#entity ctrlTx ctrlRx >>= fun () ->
+            client ?initial ?lim ?header m hp ctrlIO (dnRx, dnTx)
+    end
+
+class virtual service ?initial ?lim (state0 : 's) hp =
+    object(self)
+        inherit FE.service ?initial hp
+
+        method private virtual entity:
+            rx_event_t -> entity_t response_t F.handle_t tx -> ('s, unit) t
+        
+        method private frame dnRx dnTx =
+            duplex >>= fun ((ctrlRx, ctrlTx), ctrlIO) ->
+            let ctrlRx = (ctrlRx :> rx_event_t rx) in
+            service ?initial ?lim hp ctrlIO (dnRx, dnTx) >>= fun () ->
+            let r =
+                match initial with
+                | Some () ->
+                    self#entity (`Entity (E.default_header, [])) ctrlTx
+                | None ->
+                    guard (ctrlRx#get (fun v -> self#entity v ctrlTx))
+            in
+            start r state0
+    end
+
+let crlf_string_ = "\r\n"
+
+let emit_aux ~f msg =
+    let buffer = Buffer.create 80 in
+    let pp = Format.formatter_of_buffer buffer in
+    let out, flush, _, spaces =
+        Format.pp_get_all_formatter_output_functions pp ()
+    in
+    let newline () = out crlf_string_ 0 2 in
+    let pp = Format.make_formatter out flush in
+    Format.pp_set_all_formatter_output_functions
+        pp ~out ~flush ~newline ~spaces;
+    let header = f pp msg in
+    Format.pp_print_newline pp ();
+    let body = Cf_message.create (Buffer.contents buffer) in
+    header, body
+
+(*--- End of File [ beep_entity_exchange.ml ] ---*)

File beep/beep_entity_exchange.mli

View file
+(*---------------------------------------------------------------------------*
+  INTERFACE  beep_entity_exchange.mli
+
+  Copyright (c) 2003-2004, James H. Woodyatt
+  All rights reserved.
+
+  Redistribution and use in source and binary forms, with or without
+  modification, are permitted provided that the following conditions
+  are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright
+    notice, this list of conditions and the following disclaimer in
+    the documentation and/or other materials provided with the
+    distribution
+
+  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+  OF THE POSSIBILITY OF SUCH DAMAGE. 
+ *---------------------------------------------------------------------------*)
+
+open Iom_reactor
+
+type entity_t = Beep_exchange.header_t * Cf_message.t
+type cancel_t = Cancel
+type rx_event_t = [ `Entity of entity_t | `Error of exn ]
+type 'a response_t = [ `RPY of 'a | `ERR of 'a | `ANS of 'a | `NUL ]
+
+val client:
+    ?initial:unit -> ?lim:int -> ?header:Beep_exchange.header_t ->
+    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
+    (cancel_t #rx * [> [> rx_event_t ] response_t ] #tx) ->
+    ([< [< Beep_frame_exchange.entity_rx_t ] Beep_exchange.response_t ] #rx *
+        [> Beep_frame_exchange.client_tx_t ] #tx) ->
+    ('s, unit) t
+
+val service:
+    ?initial:unit -> ?lim:int ->
+    [< Beep_frame_exchange.header_processing_t ] ->
+    ([< entity_t response_t ] Beep_frame.handle_t #rx * [> rx_event_t ] #tx) ->
+    ([< Beep_frame_exchange.entity_rx_t ] #rx *
+        [> [> Beep_frame_exchange.service_tx_t ]
+        Beep_exchange.response_t ] #tx) ->
+    ('s, unit) t
+
+class virtual client:
+    ?initial:unit -> ?lim:int -> ?header:Beep_exchange.header_t ->
+    Cf_message.t -> [< Beep_frame_exchange.header_processing_t ] ->
+    object            
+        inherit Beep_exchange.client
+        
+        method private virtual entity:
+            's. cancel_t tx -> rx_event_t response_t rx -> ('s, unit) t
+    end
+
+class virtual service:
+    ?initial:unit -> ?lim:int -> 's ->
+    [< Beep_frame_exchange.header_processing_t ] ->
+    object
+        inherit Beep_exchange.service
+        
+        method private virtual entity:
+            rx_event_t -> entity_t response_t Beep_frame.handle_t tx ->
+            ('s, unit) t
+    end
+
+val emit_aux:
+    f:(Format.formatter -> 'a -> Beep_exchange.header_t) -> 'a ->
+    Beep_exchange.header_t * Cf_message.t
+
+(*--- End of File [ beep_entity_exchange.mli ] ---*)