Anonymous avatar Anonymous committed 8abc8bd

local sync is needed anyway

Comments (0)

Files changed (1)

       ]
     ;
 
+    type optsync 'm =
+      [ Async of 'm
+      | Sync of 'm and IO.sink unit
+      ]
+    ;
+
     type process_status =
       [ PSAlive of list (pid -> process_exit_status -> IO.m unit)
       | PSExited of process_exit_status
      *)
     type process 'i =
       { p_pid : lpid
-      ; p_send_msg : process_message_req 'i -> IO.m unit
+      ; p_send_smsg : optsync (process_message_req 'i) -> IO.m unit
       ; p_status : mutable process_status
       }
     ;
     value send_ctx_cmd ctx cmd = ctx.c_send_cmd cmd
     ;
 
+    value send_optsync
+     : process 'i -> optsync (process_message_req 'i) -> IO.m unit
+     = fun proc smsg ->
+         proc.p_send_smsg smsg
+    ;
+
     value send
      : process 'i -> process_message_req 'i -> IO.m unit
      = fun proc msg ->
-         proc.p_send_msg msg
+         send_optsync proc (Async msg)
+    ;
+
+    value send_sync
+     : process 'i -> process_message_req 'i -> IO.sink unit -> IO.m unit
+     = fun proc msg sink ->
+         send_optsync proc (Sync msg sink)
     ;
 
     value addrt_of_proc proc =
-      { asend = proc.p_send_msg
+      { asend = fun msg -> proc.p_send_smsg (Async msg)
       }
     ;
 
     value rec (process_loop
       : context ->
         (process 'a) ->
-        (IO.Mq.t (process_message_req 'a)) ->
+        (IO.Mq.t (optsync (process_message_req 'a))) ->
         (_ -> IO.m (process_result 'a)) ->
         _
     )
     context me mq disp =
       ( (* IO.printf "s.c.: waiting\n%!" >>= fun () -> *)
-        IO.Mq.take mq >>= fun msg ->
+        IO.Mq.take mq >>= fun smsg ->
         (* IO.printf "s.c.: taken\n%!" >>= fun () -> *)
-        process_pass_msg_sync disp msg
+        match smsg with
+        [ Async msg -> process_pass_msg_sync disp msg
+        | Sync msg sink ->
+            process_pass_msg_sync disp msg >>= fun res ->
+            ( IO.put_sink sink ()
+            ; IO.return res
+            )
+        ]
       )
       >>= fun
       [ Exit pe -> process_do_exit ~context ~me ~pe
       in
 *)
       let process : process _ =
-        { p_send_msg = fun msg -> IO.Mq.put mq msg
+        { p_send_smsg = fun smsg -> IO.Mq.put mq smsg
         ; p_status = PSAlive []
         ; p_pid = Counter.next pid_local_counter
         }
       in
       let context =
-        { c_send_cmd = fun cmd -> IO.Mq.put mq (Cmd cmd)
+        { c_send_cmd = fun cmd -> IO.Mq.put mq (Async (Cmd cmd))
         ; c_mq_set_block_limit = fun n -> IO.Mq.set_block_limit mq n
         ; c_mq_set_fail_limit = fun n -> IO.Mq.set_fail_limit mq n
         ; c_finalizers = []
     *)
 
 
-    value cmd_shutdown = Cmd `Shutdown;
+    value cmd_shutdown_msg = Cmd `Shutdown;
+    value cmd_shutdown = Async cmd_shutdown_msg;
 
     value (process_limit :
       ?nmax:int ->
             then
               let () = dbg "lim:   count < nmax" in
               let any_reader = IO.Mq.idle_reader_exists common_mq in
-              IO.Mq.put common_mq msg >>= fun () ->
+              IO.Mq.put common_mq (Async msg) >>= fun () ->
               (if not any_reader
                then
                  let () = dbg "lim:     no readers" in
               (* count >= nmax *)
               let () = dbg "lim:   count >= nmax" in
 
-              IO.Mq.put_blocking common_mq msg >>= fun () ->
+              IO.Mq.put_blocking common_mq (Async msg) >>= fun () ->
               process_continue (out_disp ~count)
 
         | Cmd (`Exited _) ->
 
         | Cmd `Shutdown ->
             let () = dbg "lim: `Shutdown" in
-            send monitor cmd_shutdown >>= fun () ->
+            send monitor cmd_shutdown_msg >>= fun () ->
             process_exit ()
         ]
     ;
              *)
            (* тут -- только локальный случай. *)
            let () = ignore ti_proto in
-           IO.run_and_ignore_result (send server (Msg p))
+           let (source, sink) = IO.pipe1 () in
+           let () = IO.run_and_ignore_result
+             (send_optsync server (Sync (Msg p) sink)) in
+           let () = IO.run_and_ignore_result source in
+           ()
         ;
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.