1. Dmitry Grebeniuk
  2. parvel

Commits

Dmitry Grebeniuk  committed a8b9e03

processes now receive messages with type like `Msg of 'msg | `Cmd of cmd

  • Participants
  • Parent commits 8e0c990
  • Branches default

Comments (0)

Files changed (3)

File Makefile

View file
 
 tests : all
 	_build/$(TESTBIN)
+
+infer :
+	ocamlbuild parvel.inferred.mli && cat _build/parvel.inferred.mli

File parvel.ml

View file
     ;
 
     type process_result 'i =
-      [ Exit | Continue of 'i -> IO.m (process_result 'i) ]
+      [ Exit
+      | Continue of process_message 'i -> IO.m (process_result 'i)
+      ]
     ;
 
     type process_context 'i =
       < me : process 'i
       ; my_group : process_group 'i
 
-      ; continue : ('i -> IO.m (process_result 'i)) -> IO.m (process_result 'i)
+      ; continue :
+          (process_message 'i ->
+          IO.m (process_result 'i)) ->
+          IO.m (process_result 'i)
       ; exit : IO.m (process_result 'i)
       >
     ;
     ;
 
 
-    value rec process_loop mq disp_ctx =
+    value rec (process_loop
+      : (IO.Mq.t (process_message 'a)) -> (_ -> IO.m (process_result 'a)) -> _
+    )
+    mq disp_ctx =
       IO.catch
         (fun () ->
            try
       IO.m (server 'i 'o)
       )
     server_group server_disp =
-      let proc_disp ctx_for_proc =
+      let proc_disp ctx_for_proc :
+(process_message (call 'i 'o) -> IO.m (process_result (call 'i 'o)))
+ =
         let ctx_for_serv = (ctx_for_proc :> server_context _ _) in
         let disp_ctx = server_disp ctx_for_serv in
         server_loop
     value (call : server 'i 'o -> 'i -> IO.m 'o) server arg =
       let (source, sink) = IO.pipe1 () in
       let dest = `Local (fun o -> IO.return (IO.put_sink sink o)) in
-      send server (arg, dest) >>= fun () ->
+      send server (`Msg (arg, dest)) >>= fun () ->
       IO.wait_source source
     ;
 
              let workers = ref (Map.empty key_cmp) in
              loop
              where rec loop = fun
-             [ (`Add_worker (k, w), dest) ->
+             [ `Msg (`Add_worker (k, w), dest) ->
                  ( workers.val := Map.add k w workers.val
                  ; dest_put dest `Worker_added >>= fun () ->
                    ctx#continue loop
                  )
-             | (`Call (k, i), dest) ->
+             | `Msg (`Call (k, i), dest) ->
                  match Map.find_opt workers.val k with
                  [ None ->
                      dest_put dest `No_worker_for_key >>= fun () ->
                      ctx#continue loop
                  | Some w -> send w
-                     ( i
+                     ( `Msg ( i
                      , dest_pre_map (fun r -> `Ok r) dest
-                     ) >>= fun () ->
+                     )) >>= fun () ->
                      ctx#continue loop
                  ]
-             | (`Get_keys, dest) ->
+             | `Msg (`Get_keys, dest) ->
                  dest_put dest (`Keys (Map.keys workers.val)) >>= fun () ->
                  ctx#continue loop
+             | `Cmd `Shutdown ->
+                 ctx#exit
              ]
           )
     ;
         ]
     ;
 
+
   end
 ;

File parvel.mli

View file
  :
   sig
 
+    type process_command =
+      [= `Shutdown ]
+    ;
+
+    type process_message 'i =
+      [= `Msg of 'i
+      |  `Cmd of process_command
+      ]
+    ;
+
     type process 'i;
 
     (* группа процессов *)
       ; my_group : process_group 'i
 
       ; exit : IO.m (process_result 'i)
-      ; continue : ('i -> IO.m (process_result 'i)) -> IO.m (process_result 'i)
+      ; continue :
+          (process_message 'i -> IO.m (process_result 'i)) ->
+          IO.m (process_result 'i)
       >
     ;
 
       >
     ;
 
-    type process_command =
-      [= `Shutdown ]
-    ;
-
-    type process_message 'i =
-      [= `Msg of 'i
-      |  `Cmd of process_command
-      ]
-    ;
-
     (* диспетчер сообщений -- то, что содержит пользовательский код *)
 
     type process_dispatcher 'i =
       server_dispatcher 'i 'o ->
       IO.m (server 'i 'o);
 
-    value send : process 'i -> 'i -> IO.m unit;
+    value send : process 'i -> process_message 'i -> IO.m unit;
 
-    value send_group : process_group 'i -> 'i -> IO.m unit;
+    value send_group : process_group 'i -> process_message 'i -> IO.m unit;
 
     value call : server 'i 'o -> 'i -> IO.m 'o;