Commits

Dmitry Grebeniuk  committed e77a7f5

Parvel.stream_transform for 'many-to-many' messages transformation with state

  • Participants
  • Parent commits 73bee9d

Comments (0)

Files changed (2)

  ~ -- в работе
 )
 
-| [!] Обработка потока сообщений в режиме "many-to-many", когда
-      0..N входящих сообщений вызывают отправку 0..M исходящих сообщений.
-      Из-за использования акторов это выливается в функцию, пригодную
-      для чего-то типа create_server, но имеющую на выходе не одно
-      сообщение, а список сообщений (в том числе пустой), и неявным образом
-      отправляющую эти сообщения нужному адресату.  То есть,
-        value create_mtm :
-          addressee 'o ->
-          ('i -> IO.m (list 'o)) ->
-          process_factory 'i;
-
-| [!] Оформление процессов в типа-пайпы:
+| [] Оформление процессов в типа-пайпы:
       addrt 'o + process 'i = pipe 'i 'o
       и подмножество из пайпов, гарантирующих "1 входное = 1 выходное"
 
 
     (*************************************************************)
 
+
+    value (io_res : (unit -> IO.m 'a)
+                 -> IO.m [= `Ok of 'a | `Error of exn ]
+          ) func =
+      IO.catch
+        (fun () -> func () >>= fun r -> IO.return (`Ok r))
+        (fun e -> IO.return (`Error e))
+    ;
+
+
+    (* stream transform: many to many *)
+
+    value stream_transform
+      (fact : factory ('i * 's) ((list 'o) * 's))
+      (state : 's)
+      (addr : addrt 'o)
+     :
+      IO.m (process 'i)
+     =
+      create_process_group () >>= fun group ->
+
+      create_process group (
+      fun context ->
+        fact context >>= fun disp ->
+        let rec st_loop ~state msg =
+          let cont ~state = process_continue (st_loop ~state) in
+          match msg with
+          [ Msg m ->
+              io_res (fun () -> disp (m, state)) >>= fun
+              [ `Ok (outlist, state) ->
+                  io_iter (fun m -> sendt addr (Msg m)) outlist >>= fun () ->
+                  cont ~state
+              | `Error e -> process_exit_error (`Exn e)
+              ]
+          | Cmd `Shutdown -> process_exit ()
+          | Cmd (`Exited _) -> cont ~state
+          ]
+        in
+          IO.return (st_loop ~state)
+      )
+    ;
+
+
+    (*************************************************************)
+
     (* registering and running programs *)
 
     value progs_queue = Queue.create ()