Commits

Anonymous committed fc8026b

untied the recursion introduced by naive ctx#continue, to allow catching errors in constant stack size

  • Participants
  • Parent commits ea65b06

Comments (0)

Files changed (3)

+(*
 module type COUNTER
  =
   sig
 value p_counter = Counter.create ()
   and g_counter = Counter.create ()
 ;
+*)
 
 
 (* Всё, что зависит от конкретной реализации ввода-вывода,
     value _notimpl msg = failwith
       (Printf.sprintf "function %S: not implemented" msg);
 
+
     (* "процесс" -- это функция, позволяющая послать процессу сообщение.
        в случае локальной передачи сообщений этого хватит.
      *)
-    type process 'i = ('i -> IO.m unit);
+    type process 'i = ('i -> IO.m unit)
+    ;
 
     type group_gen 'p =
       { (* gid : Counter.t
       io_iter (fun proc -> send proc msg) group.children
     ;
 
+    type proc_res 'i = [ Exit | Continue of 'i -> IO.m (proc_res 'i) ];
+
     type process_context 'i =
       < me : process 'i
-      ; continue : ('i -> IO.m unit) -> IO.m unit
       ; my_group : process_group 'i
+
+      ; continue : ('i -> IO.m (proc_res 'i)) -> IO.m (proc_res 'i)
+      ; exit : IO.m (proc_res 'i)
       >
     ;
 
        указать/накодить, что будет частичное применение сначала
        контекста.
      *)
-    type process_dispatcher 'i = process_context 'i -> 'i -> IO.m unit;
+    type process_dispatcher 'i =
+      process_context 'i -> 'i -> IO.m (proc_res 'i)
+    ;
+
     type server_dispatcher 'i 'o = server_context 'i 'o -> 'i -> IO.m 'o;
 
 
       group.children := [process :: group.children ]
     ;
 
-    value create_process group disp =
-      (* let pid = Counter.next p_counter in *)
-      let mq = IO.Mq.create_empty () in
-      let process_ref = ref None in
-      let get_my_process = lazy (
-        match process_ref.contents with
-        [ None -> assert False
-        | Some p -> p
-        ])
-      in
-      let schedule_continuation cont =
-        IO.run_and_ignore_result
-          (IO.printf "s.c.: waiting\n%!" >>= fun () ->
-           IO.Mq.take mq >>= fun msg ->
-           IO.printf "s.c.: taken\n%!" >>= fun () ->
-           ((cont msg) : IO.m unit)
-          )
-      in
+
+    value rec process_loop mq disp_ctx =
+      IO.catch
+        (fun () ->
+           try
+             (IO.printf "s.c.: waiting\n%!" >>= fun () ->
+              IO.Mq.take mq >>= fun msg ->
+              IO.printf "s.c.: taken\n%!" >>= fun () ->
+              (disp_ctx msg : IO.m (proc_res _)) >>= fun res ->
+              IO.return (`Ok res)
+             )
+           with
+           [ e -> IO.error e ]
+        )
+        (fun e ->
+           IO.printf "worker exitted with exception: %s\n%!"
+             (Printexc.to_string e)
+           >>= fun () ->
+           IO.return (`Error e)
+        )
+      >>= fun
+      [ `Ok Exit -> IO.return ()
+      | `Ok (Continue k) -> process_loop mq k
+      | `Error _ -> IO.return ()  (* already reported, exitting. *)
+      ]
+    ;
+
+
+    value create_process group disp : IO. m (process _) =
+      let self_ref = ref None in
+      let lazy_me = lazy
+        (match self_ref.val with [None -> assert False | Some me -> me ]) in
       let rec context =
         object
-          method me = Lazy.force get_my_process;
           method my_group = group;
-          method continue c =
-            let () = schedule_continuation c in
-            IO.return ();
-            (* todo: продумать, что делать с теми процессами, которые
-               добавили сообщение в очередь, но не дождались continue
-               (т.е. процесс вышел).
-             *)
+          method me = Lazy.force lazy_me;
+          method exit = IO.return Exit;
+          method continue k = IO.return (Continue k);
         end
-      and process msg =
-        IO.Mq.put mq msg
       in
-      ( add_to_group group process
-      ; process_ref.contents := Some process
-      ; let disp_ctx = disp context in
-        schedule_continuation disp_ctx
+      let mq = IO.Mq.create_empty () in
+      let process : process _ = fun msg -> IO.Mq.put mq msg in
+      let disp_ctx = disp context in
+      ( self_ref.val := Some process
+      ; add_to_group group process
+      ; IO.run_and_ignore_result (process_loop mq disp_ctx)
       ; IO.return process
-      )
+      ) 
     ;
 
 
     value dest_put : dest 'o -> 'o -> IO.m unit
     ;
 
+    type proc_res 'i
+    ;
+
     type process_context 'i =
       < me : process 'i
-      ; continue : ('i -> IO.m unit) -> IO.m unit
       ; my_group : process_group 'i
+
+      ; exit : IO.m (proc_res 'i)
+      ; continue : ('i -> IO.m (proc_res 'i)) -> IO.m (proc_res 'i)
       >
     ;
 
     ;
 
 
-    (* за'loop 1:
-
-       value proc ctx =
-         loop
-         where rec loop msg =
-           ...
-           >>= fun () ->
-           ctx#continue loop
-
-       за'loop 2:
-
-       value rec proc ctx msg =
-         ...
-         >>= fun () ->
-         ctx#continue (proc ctx)
-
-    *)
-
-
     (* диспетчер сообщений -- то, что содержит пользовательский код *)
 
-    type process_dispatcher 'i = process_context 'i -> ('i -> IO.m unit);
+    type process_dispatcher 'i =
+      process_context 'i -> ('i -> IO.m (proc_res 'i))
+    ;
 
     type server_dispatcher 'i 'o = server_context 'i 'o -> ('i -> IO.m 'o);
 
 ;
 
 
+(*
 value (loop : P.process_dispatcher 'i ->
               P.process_dispatcher 'i
       )
     IO.printf "loop: going to continue\n%!" >>= fun () ->
     ctx#continue part_app
 ;
+*)
 
 
 value func_something _ctx n =