Commits

Anonymous committed 27b630e

protocols (for local calls only, for now)

Comments (0)

Files changed (2)

     ;
 
 
+    value process_pid p = p.p_pid
+    ;
 
     value () = Printexc.register_printer
       (fun
     value process_continue k = IO.return (Continue k)
     ;
 
+    value process_exit_by_status = fun ps -> IO.return (Exit ps)
+      (* fun
+      [ PE_Normal -> process_exit ()
+      | PE_Error err -> process_exit_error err
+      ] *)
+    ;
+
     value (create_process_inner :
       process_factory 'a ->
       ~mq:(IO.Mq.t _) ->
     ;
 
 
-    value monitor my_context some_process =
+    value monitor
+     : context -> process 'a -> unit
+     = fun my_context some_process ->
       let notify_me =
         fun pid pe ->
           send_ctx_cmd my_context (`Exited (pid, pe))
          }
     ;
 
+
+    value dest_remote_counter = ref 0
+    ;
+
+    value dest_remote
+     : ! 'a . #Cdt.ti 'a -> dest 'a
+     =
+      fun ti_a ->
+        let ti_cr_a = ti_call_resp ti_a in
+        let r = ref None in
+        ( incr dest_remote_counter
+        ; { dest_ref = r
+          ; dest_kind =
+              DRemote
+                dest_remote_counter.val
+                ti_a#type_name
+                (fun s ->
+                   r.val := Some (Cd_Ser.ti_deser ti_cr_a s)
+                )
+                (dest_ubox_get ti_cr_a r)
+          }
+        )
+    ;
+
+
     value dest_get
      : dest 'o -> IO.m (call_resp 'o)
      = fun d ->
     ;
 
 
+    (*************************************************************)
 
+    module Proto
+     =
+      struct
 
+        open Cd_All; open Cdt; open Cd_Ser;
+
+        value rec get_dests
+         : ubox -> list (int * (string -> unit) * (unit -> string))
+                       (* id, deserialize, serialize *)
+         = fun u ->
+             let rec get_dests_td td u =
+               match td with
+               [ Simple _ -> []
+               | Dispatch_method disp -> get_dests_td (disp "ser") u
+               | Sum_type destr _constr ->
+                   let (_vname, disp) = destr u in
+                   let uarr = disp "ser" in
+                   get_dests_arr uarr
+               | Record_type destr _utis _fields _constr ->
+                   let uarrfn = destr u in
+                   let uarr = Array.map (fun (_field_name, u) -> u) uarrfn in
+                   get_dests_arr uarr
+               | Lambda _ _ _ -> []
+               | Tuple destr _utis _constr ->
+                   get_dests_arr (destr u)
+               ]
+
+             and get_dests_arr uarr =
+               List.concat & Array.map_to_list get_dests uarr
+
+             in
+               let uti = u.ub_uti in
+               if has_meth "is_dest" uti
+               then
+                 match uti#type_desc with
+                 [ Tuple _destr _utis _constr
+                     -> assert False
+                 | Simple _
+                 | Lambda _ _ _
+                 | Sum_type _ _
+                     -> assert False
+
+                 | Record_type destr _utis _fields _constr ->
+                     match destr u with
+                     [ [| ("dest_ref", _ur) ; ("dest_kind", uk) |] ->
+                         match uk.ub_uti#type_desc with
+                         [ Sum_type destr _constr ->
+                             match destr uk with
+                             [ ("DRemote", disp) ->
+                                 let uarr = disp "get_dests" in
+                                 match uarr with
+                                 [ [| ui ; _u_tn ; uput_dest ; uget_dest |] ->
+                                      [ ( uget_exn ti_int ui
+                                        , fun s ->
+                                            uget_exn ti_unit &
+                                              u_app uput_dest (ubox ti_string s)
+                                        , fun () ->
+                                            uget_exn ti_string &
+                                              u_app uget_dest (ubox ti_unit ())
+                                        )
+                                      ]
+                                 | _ -> assert False
+                                 ]
+                             | _ -> assert False
+                             ]
+                         | Tuple _ _ _ | Simple _ | Lambda _ _ _
+                         | Dispatch_method _ | Record_type _ _ _ _
+                             -> assert False
+                         ]
+                     | _ -> assert False
+                     ]
+
+                 | Dispatch_method _
+                     -> assert False
+                 ]
+               else
+                 get_dests_td u.ub_uti#type_desc u
+        ;
+
+
+        type proto_call_resp = list (int * string)
+        ;
+
+        value ti_resp_elem =
+          new Tuple2.ti
+                ti_int
+                ti_string
+                ()
+        ;
+        value () = ti_add_ser_deser (ti_resp_elem :> ti _)
+        ;
+
+        value ti_proto_call_resp : #ti proto_call_resp =
+          new List.ti ti_resp_elem
+            ()
+        ;
+        value ti_proto_call_resp = (ti_proto_call_resp :> ti _)
+        ;
+        value () = ti_add_ser_deser ti_proto_call_resp
+        ;
+
+
+
+        value sort_dests =
+          List.sort (fun (a, _, _) (b, _, _) -> compare a b)
+        ;
+
+
+        value proto_call
+         : ti 'p -> process 'p -> 'p -> unit
+         = fun ti_proto server p ->
+           (*
+             let dests : list (int * (string -> unit) * (unit -> string)) =
+               sort_dests &
+               get_dests &
+               ubox ti_proto p
+             in
+             let () =
+             ( List.iter
+                 (fun (i, _d, _s) ->
+                    dbg "dest found: id = %i\n%!" i
+                 )
+                 dests
+             ; printf "dests total: %i\n%!" dests_count
+             )
+             in
+             let proto_call_resp = server & ti_ser ti_proto p in
+             let filled_dests = ti_deser ti_proto_call_resp proto_call_resp in
+             let () = List.iter2
+               (fun (sd_id, sd_put, _sd_get) (rc_id, rc_data) ->
+                  if sd_id <> rc_id
+                  then failwith "call_proto: dest id's mismatch (sd=%i rc=%i)"
+                         sd_id rc_id
+                  else
+                    sd_put rc_data
+               )
+               dests
+               filled_dests
+             in
+             ()
+           *)
+           (* тут -- только локальный случай. *)
+           let () = ignore ti_proto in
+           IO.run_and_ignore_result (send server (Msg p))
+        ;
+
+
+        value proto_server
+         : ti 'p -> process_factory 'p -> process_factory 'p
+         = fun ti_proto inner_factory ->
+             (*
+             let p_deser = ti_deser ti_proto in
+             fun i_str ->
+               let i = p_deser i_str in
+               let dests : list (int * (string -> unit) * (unit -> string)) =
+                 sort_dests &
+                 get_dests &
+                 ubox ti_proto i
+               in
+               let () = server_func i in
+               let filled : proto_call_resp = List.map
+                 (fun (i, _sd_put, sd_get) -> (i, sd_get ())
+                 )
+                 dests
+               in
+               ti_ser ti_proto_call_resp filled
+             *)
+             (* тут -- только локальный случай. *)
+             let () = ignore ti_proto in
+
+             fun outer_ctx ->
+               create_process inner_factory >>= fun inner_process ->
+               let () = monitor outer_ctx inner_process in
+               let inner_pid = process_pid inner_process in
+               let rec outer_disp = fun
+                 [ Msg _ as msg ->
+                     (* пред/пост обработка -- тут. *)
+                     pass msg
+                 | Cmd `Shutdown as msg -> pass msg
+                 | Cmd (`Exited (pid, ps)) as msg ->
+                     if pid = inner_pid
+                     then process_exit_by_status ps
+                     else pass msg
+                 ]
+               and pass msg =
+                 send inner_process msg >>= fun () ->
+                 process_continue outer_disp
+               in
+                 IO.return outer_disp
+        ;
+
+
+      end
+    ;
+
+
+    value proto_server = Proto.proto_server
+    ;
 
 
     (*************************************************************)
 value server_respawn
  : ?max_count:int -> server_factory 'i 'o -> server_factory 'i 'o
 ;
+
+
+(* wraps process_factory into new process_factory that allows to
+   use "protocols".  Protocol is the type ['p] that has values
+   of type [dest 'o] in it.  Protocol typeinfo [Cdt.ti 'p] should
+   allow deconstruction of every value of type ['p] to access
+   all [dest 'o] in runtime.  This is required in local protocol
+   communication to enforce the protocol (to make sure that all
+   [dest 'o] are filled (todo), this is required in remote protocol
+   communication to pass [dest]s from caller to server and to pass
+   [dest]s' values back from server to caller over network, using
+   some serialization (todo: processes should declare, based on
+   [Cdt.ti 'p], which serialization they accept/require/deny).
+   Note: it's bad idea to wrap [process_factory] with [proto_server]
+   more than once, but it's hard/clumsy to enforce this constraint
+   with types. (or.. any ideas?)
+ *)
+value proto_server : Cdt.ti 'p -> process_factory 'p -> process_factory 'p
+;