Commits

Dmitry Grebeniuk  committed e864085

local protocols (proto_server) will check for unfilled dests

  • Participants
  • Parent commits 42de179

Comments (0)

Files changed (1)

       IO.run_and_ignore_result (f my_pid process_exit_status)
     ;
 
+    value process_pass_msg_sync
+     = fun disp msg ->
+         IO.catch
+           (fun () ->
+              (disp msg : IO.m (process_result _)) >>= fun res ->
+              IO.return res
+           )
+           (fun e -> IO.return (Exit (PE_Error (`Exn e)))
+           )
+    ;
+
+    value process_do_exit
+     = fun ~context ~me ~pe ->
+         let () =
+           let my_pid = me.p_pid in
+           match me.p_status with
+           [ PSAlive monitors ->
+               ( List.iter
+                   (fun fin -> try fin () with [ _ -> () ]
+                   )
+                   context.c_finalizers
+               ; List.iter
+                   (notify_monitor ~my_pid ~process_exit_status:pe)
+                   monitors
+               )
+           | PSExited _ ->
+               assert False
+           ]
+         in
+           IO.return ()
+    ;
+
+
     value rec (process_loop
       : context ->
         (process 'a) ->
         _
     )
     context me mq disp =
-      IO.catch
-        (fun () ->
-           ( (* IO.printf "s.c.: waiting\n%!" >>= fun () -> *)
-            IO.Mq.take mq >>= fun msg ->
-             (* IO.printf "s.c.: taken\n%!" >>= fun () -> *)
-            (disp msg : IO.m (process_result _)) >>= fun res ->
-            IO.return res
-           )
-        )
-        (fun e -> IO.return (Exit (PE_Error (`Exn e)))
-        )
+      ( (* IO.printf "s.c.: waiting\n%!" >>= fun () -> *)
+        IO.Mq.take mq >>= fun msg ->
+        (* IO.printf "s.c.: taken\n%!" >>= fun () -> *)
+        process_pass_msg_sync disp msg
+      )
       >>= fun
-      [ Exit pe ->
-          let () =
-            let my_pid = me.p_pid in
-            match me.p_status with
-            [ PSAlive monitors ->
-                ( List.iter
-                    (fun fin -> try fin () with [ _ -> () ]
-                    )
-                    context.c_finalizers
-                ; List.iter
-                    (notify_monitor ~my_pid ~process_exit_status:pe)
-                    monitors
-                )
-            | PSExited _ ->
-                assert False
-            ]
-          in
-            IO.return ()
+      [ Exit pe -> process_do_exit ~context ~me ~pe
       | Continue k -> process_loop context me mq k
       ]
     ;
         open Cd_All; open Cdt; open Cd_Ser;
 
         value rec get_dests
-         : ubox -> list (int * (string -> unit) * (unit -> string))
-                       (* id, deserialize, serialize *)
+         : ubox -> list
+             (int * (string -> unit) * (unit -> string) * (unit -> bool))
+                       (* id, deserialize, serialize, get_is_filled *)
          = fun u ->
              let rec get_dests_td td u =
                match td with
              and get_dests_arr uarr =
                List.concat & Array.map_to_list get_dests uarr
 
+             and get_from_dest u =
+               match u with
+               [ [| ("dest_ref", ur) ; ("dest_kind", uk) |] ->
+                   match uk.ub_uti#type_desc with
+                   [ Sum_type destr _constr ->
+                       match destr uk with
+                       [ ("DRemote", disp) ->
+                           let uarr = disp "get_dests" in
+                           match uarr with
+                           [ [| ui ; _u_tn ; uput_dest ; uget_dest |] ->
+                                [ ( uget_exn ti_int ui
+                                  , fun s ->
+                                      uget_exn ti_unit &
+                                        u_app
+                                          uput_dest
+                                          (ubox ti_string s)
+                                  , fun () ->
+                                      uget_exn ti_string &
+                                        u_app
+                                          uget_dest
+                                          (ubox ti_unit ())
+                                  , get_is_filled ur
+                                  )
+                                ]
+                           | _ -> assert False
+                           ]
+                       | _ -> assert False
+                       ]
+                   | Tuple _ _ _ | Simple _ | Lambda _ _ _
+                   | Dispatch_method _ | Record_type _ _ _ _
+                       -> assert False
+                   ]
+               | _ -> assert False
+               ]
+             and get_is_filled uref =
+               fun () ->
+               let (destr, _utis, _fields, _constr) =
+                 ti_expect_record_exn uref.ub_uti#type_desc
+               in
+                 match destr uref with
+                 [ [| ("val", uopt) |] ->
+                     let (destr, _constr) =
+                       ti_expect_sum_exn uopt.ub_uti#type_desc in
+                     match destr uopt with
+                     [ ("None", _) -> False
+                     | ("Some", _) -> True
+                     | _ -> assert False
+                     ]
+                 | _ -> assert False
+                 ]
              in
                let uti = u.ub_uti in
                if has_meth "is_dest" uti
                then
-                 match uti#type_desc with
-                 [ Tuple _destr _utis _constr
-                     -> assert False
-                 | Simple _
-                 | Lambda _ _ _
-                 | Sum_type _ _
-                     -> assert False
-
-                 | Record_type destr _utis _fields _constr ->
-                     match destr u with
-                     [ [| ("dest_ref", _ur) ; ("dest_kind", uk) |] ->
-                         match uk.ub_uti#type_desc with
-                         [ Sum_type destr _constr ->
-                             match destr uk with
-                             [ ("DRemote", disp) ->
-                                 let uarr = disp "get_dests" in
-                                 match uarr with
-                                 [ [| ui ; _u_tn ; uput_dest ; uget_dest |] ->
-                                      [ ( uget_exn ti_int ui
-                                        , fun s ->
-                                            uget_exn ti_unit &
-                                              u_app uput_dest (ubox ti_string s)
-                                        , fun () ->
-                                            uget_exn ti_string &
-                                              u_app uget_dest (ubox ti_unit ())
-                                        )
-                                      ]
-                                 | _ -> assert False
-                                 ]
-                             | _ -> assert False
-                             ]
-                         | Tuple _ _ _ | Simple _ | Lambda _ _ _
-                         | Dispatch_method _ | Record_type _ _ _ _
-                             -> assert False
-                         ]
-                     | _ -> assert False
-                     ]
-
-                 | Dispatch_method _
-                     -> assert False
-                 ]
+                 let (destr, _utis, _fields, _constr) =
+                   ti_expect_record_exn uti#type_desc
+                 in
+                   get_from_dest & destr u
                else
                  get_dests_td u.ub_uti#type_desc u
         ;
         value proto_call
          : ti 'p -> process 'p -> 'p -> unit
          = fun ti_proto server p ->
-           (*
+             (*
              let dests : list (int * (string -> unit) * (unit -> string)) =
-               sort_dests &
+               (* sort_dests & *)
                get_dests &
                ubox ti_proto p
              in
-             let () =
+             let __ () =
              ( List.iter
                  (fun (i, _d, _s) ->
                     dbg "dest found: id = %i\n%!" i
                dests
                filled_dests
              in
-             ()
-           *)
+             *)
            (* тут -- только локальный случай. *)
            let () = ignore ti_proto in
            IO.run_and_ignore_result (send server (Msg p))
              let p_deser = ti_deser ti_proto in
              fun i_str ->
                let i = p_deser i_str in
-               let dests : list (int * (string -> unit) * (unit -> string)) =
-                 sort_dests &
-                 get_dests &
-                 ubox ti_proto i
-               in
                let () = server_func i in
                let filled : proto_call_resp = List.map
                  (fun (i, _sd_put, sd_get) -> (i, sd_get ())
              let () = ignore ti_proto in
 
              fun outer_ctx ->
-               create_process inner_factory >>= fun inner_process ->
-               let () = monitor outer_ctx inner_process in
-               let inner_pid = process_pid inner_process in
-               let rec outer_disp = fun
-                 [ Msg _ as msg ->
+               inner_factory outer_ctx >>= fun inner_disp ->
+               let rec outer_disp ~inner_disp = fun
+                 [ Msg p as msg ->
+                     let dests : list
+                       (int * (string -> unit) * (unit -> string)
+                         * (unit -> bool))
+                      =
+                       (* sort_dests & *)
+                       get_dests &
+                       ubox ti_proto p
+                     in
                      (* пред/пост обработка -- тут. *)
-                     pass msg
-                 | Cmd `Shutdown as msg -> pass msg
-                 | Cmd (`Exited (pid, ps)) as msg ->
-                     if pid = inner_pid
-                     then process_exit_by_status ps
-                     else pass msg
+                     process_pass_msg_sync inner_disp msg >>= fun res ->
+                       if List.for_all
+                            (fun (_, _, _, get_is_filled) -> get_is_filled ())
+                            dests
+                       then
+                         cont res
+                       else
+                         process_exit_exn (Failure
+                           "proto_server function hadn't filled all 'dest's")
+                 | Cmd _ as msg ->
+                     process_pass_msg_sync inner_disp msg >>= cont
                  ]
-               and pass msg =
-                 send inner_process msg >>= fun () ->
-                 process_continue outer_disp
+               and cont = fun
+                 [ Exit pe -> process_exit_by_status pe
+                 | Continue k -> process_continue k
+                 ]
                in
-                 IO.return outer_disp
+                 IO.return & outer_disp ~inner_disp
         ;