Commits

Anonymous committed 08bae7e

added: monitors (messages sent to your process when one of monitored process exits), PIDs, process_exit_{status,error}, "`Exited (pid, status)" incoming message

Comments (0)

Files changed (2)

   [= `Main | `Per_process ]
 ;
 
-(*
+
 module type COUNTER
  =
   sig
 ;
 
 
+(*
 value p_counter = Counter.create ()
   and g_counter = Counter.create ()
 ;
 *)
 
 
+value pid_local_counter = Counter.create ()
+;
+
+
 (* Всё, что зависит от конкретной реализации ввода-вывода,
    закатываем в функтор: *)
 
  =
   struct
 
+    (* локальный pid в пределах процесса *)
+    type lpid = Counter.t
+    ;
+
+    (* а это -- пока за пределы процесса не вышли *)
+    type pid = lpid
+    ;
+
     value ( >>= ) m f = IO.bind f m;
 
     type call_resp_error =
     value _notimpl msg = failwith
       (Printf.sprintf "function %S: not implemented" msg);
 
+    type process_exit_status =
+      [ PE_Normal
+      | PE_Error of process_exit_error
+      ]
+    and process_exit_error =
+      [= `Exn of exn
+      |  `Exn_string of string
+      ]
+    ;
 
     type process_command_req =
-      [= `Shutdown ]
+      [= `Shutdown
+      |  `Exited of (pid * process_exit_status)
+      ]
     ;
 
     type process_message_req 'i =
       ]
     ;
 
-    (* "процесс" -- это функция, позволяющая послать процессу сообщение.
-       в случае локальной передачи сообщений этого хватит.
+    (* тип, описывающий "процесс" -- нечто, что видят другие и знают
+       его тип, но не имеют доступа ко внутренностями.  (следовательно,
+       тип абстрактный, так как читать тоже не нужно давать в идеале.)
      *)
-    type process 'i = (process_message_req 'i -> IO.m unit)
+    type process 'i =
+      { p_pid : pid
+      ; p_send_msg : process_message_req 'i -> IO.m unit
+      ; p_notify_on_exit : mutable list
+          (pid -> process_exit_status -> IO.m unit)
+      }
     ;
 
     type group_gen 'p =
     type process_group 'i = group_gen (process 'i);
     type server_group 'i 'o = group_gen (server 'i 'o);
 
-    value send proc msg = proc msg;
+    type context =
+      { c_send_cmd : process_command_req -> IO.m unit
+      }
+    ;
+
+    value send_ctx_cmd ctx cmd = ctx.c_send_cmd cmd
+    ;
+
+    value send proc msg = proc.p_send_msg msg;
 
     value rec io_iter f lst =
       match lst with
     and process_dispatcher 'i =
       dispatcher (process_message_req 'i) (process_result 'i)
     and process_result 'i =
-      [ Exit
+      [ Exit of process_exit_status
       | Continue of process_dispatcher 'i
       ]
     ;
 
     (***************************)
 
-    type context = unit
-    ;
-
     type factory 'i 'o = context -> IO.m (dispatcher 'i 'o)
     ;
 
 
 
     value rec (process_loop
-      : (IO.Mq.t (process_message_req 'a)) ->
+      : (process 'a) ->
+        (IO.Mq.t (process_message_req 'a)) ->
         (_ -> IO.m (process_result 'a)) ->
         _
     )
-    mq disp =
+    me mq disp =
       IO.catch
         (fun () ->
-           try
-             (IO.printf "s.c.: waiting\n%!" >>= fun () ->
-              IO.Mq.take mq >>= fun msg ->
-              IO.printf "s.c.: taken\n%!" >>= fun () ->
-              (disp msg : IO.m (process_result _)) >>= fun res ->
-              IO.return (`Ok res)
-             )
-           with
-           [ e -> IO.error e ]
+           (IO.printf "s.c.: waiting\n%!" >>= fun () ->
+            IO.Mq.take mq >>= fun msg ->
+            IO.printf "s.c.: taken\n%!" >>= fun () ->
+            (disp msg : IO.m (process_result _)) >>= fun res ->
+            IO.return res
+           )
         )
-        (fun e ->
-           IO.printf "worker exitted with exception: %s\n%!"
-             (Printexc.to_string e)
-           >>= fun () ->
-           IO.return (`Error e)
+        (fun e -> IO.return (Exit (PE_Error (`Exn e)))
         )
       >>= fun
-      [ `Ok Exit -> IO.return ()
-      | `Ok (Continue k) -> process_loop mq k
-      | `Error _ -> IO.return ()  (* already reported, exitting. *)
+      [ Exit pe ->
+          let () =
+            let my_pid = me.p_pid in
+            List.iter
+              (fun f -> IO.run_and_ignore_result (f my_pid pe))
+              me.p_notify_on_exit
+          in
+            IO.return ()
+      | Continue k -> process_loop me mq k
       ]
     ;
 
+    value exit_normal_ = Exit PE_Normal
+    ;
 
-    value process_exit () = IO.return Exit
+    value process_exit () = IO.return exit_normal_
+    ;
+
+    value process_exit_error e = IO.return (Exit (PE_Error e))
     ;
 
     value process_continue k = IO.return (Continue k)
       IO.m (process 'a)
     )
     group factory ~mq =
+(*
       let self_ref = ref None in
-(*
       let lazy_me = lazy
         (match self_ref.val with [None -> assert False | Some me -> me ])
       in
 *)
-      let process : process _ = fun msg -> IO.Mq.put mq msg
-      and context = ()
+      let process : process _ =
+        { p_send_msg = fun msg -> IO.Mq.put mq msg
+        ; p_notify_on_exit = []
+        ; p_pid = Counter.next pid_local_counter
+        }
       in
-      ( self_ref.val := Some process
+      let context =
+        { c_send_cmd = fun cmd -> IO.Mq.put mq (Cmd cmd)
+        }
+      in
+      ( (* self_ref.val := Some process *) ()
       ; add_to_group group process
       ; IO.run_and_ignore_result
-          (factory context >>= fun disp -> process_loop mq disp)
+          (factory context >>= fun disp -> process_loop process mq disp)
       ; IO.return process
       ) 
     ;
     ;
 
 
+    value monitor my_context some_process =
+      let old = some_process.p_notify_on_exit in
+      let notify_me =
+        fun pid pe ->
+          send_ctx_cmd my_context (`Exited (pid, pe))
+      in
+      (* if find dupes / remove monitor will be required,
+         add [notify_me] to [my_context] *)
+      some_process.p_notify_on_exit := [ notify_me :: old ]
+    ;
+
+
     value string_of_exn = Printexc.to_string
     ;
 
                 process_continue server_loop
             | Cmd `Shutdown ->
                 process_exit ()
+            | Cmd (`Exited _) ->
+                process_continue server_loop
             ]
       in
         create_process_inner ~mq server_group process_factory
         value add : 'k -> 'v -> map 'k 'v -> map 'k 'v;
         value find_opt : map 'k 'v -> 'k -> option 'v;
         value keys : map 'k 'v -> list 'k;
+        value fold : ('k -> 'v -> 'a -> 'a) -> map 'k 'v -> 'a -> 'a;
+        value remove : 'k -> map 'k 'v -> map 'k 'v;
       end
      =
       struct
             ]
         ;
         value keys (_cmp, m) = List.map fst m;
+        value fold func (_cmp, m) init =
+          List.fold_left
+            (fun acc (k, v) ->
+               func k v acc
+            )
+            init
+            m
+        ;
+        value remove the_k (cmp, m) =
+          (cmp, List.filter (fun (k, _v) -> (cmp the_k k) <> 0) m)
+        ;
       end
     ;
-      
+
+    value filter_keys_over_mapfold mapfold =
+      fun pred m ->
+        mapfold
+          (fun k v a ->
+             if pred k v
+             then [k :: a]
+             else a
+          )
+          m
+          []
+    ;
+
+    value map_filter_keys pred m =
+      filter_keys_over_mapfold Map.fold pred m
+    ;
 
     (* switch-диспетчер *)
 
         create_server_group () >>= fun group ->
         let workers = ref (Map.empty key_cmp) in
         create_process group
-          (fun _ctx ->
+          (fun ctx ->
              IO.return loop
              where rec (loop : process_dispatcher _) = fun
              [ Msg (`Add_worker (k, w), dest) ->
                  ( workers.val := Map.add k w workers.val
+                 ; monitor ctx w
                  ; dest_put dest (CR_Ok SR_Worker_added) >>= fun () ->
                    process_continue loop
                  )
                  >>= fun () ->
                  process_continue loop
              | Cmd `Shutdown ->
+                 (* где-то тут надо как бы разослать привет детишкам.
+                    однако подумать: если дети сами регистрируются,
+                    может сами и к ктулху отправятся?
+                    с третьей стороны, подумать, что как-то да надо
+                    намекнуть детям, что свитч вышел.
+                  *)
                  process_exit ()
+             | Cmd (`Exited (the_pid, status)) ->
+                 (match status with
+                  [ PE_Normal ->
+                      IO.printf "Parvel.switch: normal worker's exit.  wtf?\n"
+                  | PE_Error _ ->
+                      IO.return ()
+                  ]
+                 ) >>=
+                 fun () ->
+                 let exited_keys = map_filter_keys
+                   (fun _key process -> process.p_pid = the_pid)
+                   workers.val
+                 in
+                 let () =
+                   workers.val :=
+                     List.fold_left
+                       (fun workers exited_key ->
+                          Map.remove exited_key workers
+                       )
+                       workers.val
+                       exited_keys
+                 in
+                 process_continue loop
              ]
           )
     ;
 
     (*********************************************************)
 
+    type pid
+    ;
+
+    (* статус завершения и ошибки процесса: *)
+
+    type process_exit_status =
+      [ PE_Normal
+      | PE_Error of process_exit_error
+      ]
+    and process_exit_error =
+      [= `Exn of exn
+      |  `Exn_string of string
+      ]
+    ;
+
+    (*********************************************************)
+
     type process_command_req =
-      [= `Shutdown ]
+      [= `Shutdown
+      |  `Exited of (pid * process_exit_status)
+      ]
     ;
 
     type process_message_req 'i =
       process_dispatcher 'i -> IO.m (process_result 'i)
     ;
 
+    (* процесс завершается с ошибкой: *)
+
+    value process_exit_error : process_exit_error -> IO.m (process_result 'i)
+    ;
+
     (* сервер возвращает результат: *)
 
     value server_return : 'o -> IO.m (call_resp 'o)