Commits

Dmitry Grebeniuk  committed 0388004

.

  • Participants
  • Parent commits 90c1ae2

Comments (0)

Files changed (6)

-<*.ml> | <*.mli> : camlp4r, pkg_lwt
+<*.ml> | <*.mli> : camlp4r, pkg_lwt, pkg_lwt.unix
 <*.byte> | <*.native> : pkg_lwt, pkg_lwt.unix
 <*> : warn_A
 <parvel_ops.*> : -camlp4r, camlp4o
 
     type server_context 'i 'o =
       < me : server 'i 'o
-(*
-      ; continue : ('i -> IO.m unit) -> unit -> IO.m unit
-*)
       ; my_group : server_group 'i 'o
       >
     ;
        контекста.
      *)
     type process_dispatcher 'i = process_context 'i -> 'i -> IO.m unit;
+    type server_dispatcher 'i 'o = server_context 'i 'o -> 'i -> IO.m 'o;
+
 
     (* по идее, процесс за'loop'ляется так:
        value proc ctx =
       }
     ;
 
+    value create_server_group = create_process_group
+    ;
+
+    value add_to_group group process =
+      group.children := [process :: group.children ]
+    ;
+
     value create_process group disp =
       (* let pid = Counter.next p_counter in *)
       let mq = IO.Mq.create_empty () in
       and process msg =
         IO.Mq.put mq msg
       in
-      ( group.children := [process :: group.children ]
+      ( add_to_group group process
       ; process_ref.contents := Some process
       ; let disp_ctx = disp context in
         schedule_continuation disp_ctx
     ;
 
 
+    value (create_server :
+      server_group 'i 'o ->
+      server_dispatcher 'i 'o ->
+      IO.m (server 'i 'o)
+      )
+    server_group server_disp =
+      let proc_disp ctx_for_proc =
+        let ctx_for_serv = (ctx_for_proc :> server_context _ _) in
+        let disp_ctx = server_disp ctx_for_serv in
+        server_loop
+        where rec server_loop (arg, dest) =
+          disp_ctx arg >>= fun res ->
+          dest_put dest res >>= fun () ->
+          ctx_for_proc#continue server_loop
+      in
+        create_process server_group proc_disp
+    ;
+
+
     (* синхронный вызов *)
-    value (call : server 'i 'o -> 'i -> IO.m 'o) _ _ = notimpl "call";
+    value (call : server 'i 'o -> 'i -> IO.m 'o) server arg =
+      let (source, sink) = IO.pipe () in
+      let dest = `Local (fun o -> IO.return (IO.put_sink sink o)) in
+      send server (arg, dest) >>= fun () ->
+      IO.wait_source source
+    ;
 
 
     (* нужно конкретно для частной задачи, поэтому не уверен в ценности:
     (* группа процессов *)
     type process_group 'i;
 
-    type dest 'o =
-      [= `Local of ('o -> IO.m unit)
-      ]
+    type server_group 'i 'o;
+
+    type dest 'o
+    ;
+
+    (* тип не абстрактный, так как пока это самое простое для
+       обеспечения возможности передавать синхронные вызовы другим
+       процессам/серверам. *)
+    type server 'i 'o = process ('i * dest 'o)
     ;
 
     value dest_put : dest 'o -> 'o -> IO.m unit
       >
     ;
 
+    type server_context 'i 'o =
+      < me : server 'i 'o
+      ; my_group : server_group 'i 'o
+      >
+    ;
+
 
     (* за'loop 1:
 
        но обёрнуто в абстрактный тип. *)
 
     type process_dispatcher 'i = process_context 'i -> ('i -> IO.m unit);
+    type server_dispatcher 'i 'o = server_context 'i 'o -> 'i -> IO.m 'o;
+
 
     (*
     для синхронных серверов предусмотреть что-то похожее на
     value create_process : process_group 'i -> process_dispatcher 'i
       -> IO.m (process 'i);
 
+    value create_server_group : unit -> IO.m (server_group 'i 'o);
+    value create_server :
+      server_group 'i 'o ->
+      server_dispatcher 'i 'o ->
+      IO.m (server 'i 'o);
+
     value send : process 'i -> 'i -> IO.m unit;
     value send_group : process_group 'i -> 'i -> IO.m unit;
 
+    value call : server 'i 'o -> 'i -> IO.m 'o;
+
+
     (* нужно конкретно для частной задачи, поэтому не уверен в ценности:
        функция шлёт сообщения группе и дожидается ответа каждого из процессов
        этой группы, затем возвращает массив ответов:

File parvel_IO.ml

 *)
 
 value printf fmt = Printf.ksprintf (fun s -> write stdout s) fmt;
+
+
+(* временно, будет меняться: *)
+type sink 'a = Lwt.u 'a  (* по идее эта срань должна быть контрвариантной *)
+ and source +'a = Lwt.t 'a
+;
+
+value pipe = Lwt.wait
+  and put_sink = Lwt.wakeup
+  and wait_source s = s
+;
 ;
 
 
-value do_something _ctx =
-  fun n ->
-    IO.printf "do_something: %.2f\n%!" n >>= fun () ->
-    Lwt_unix.sleep n >>= fun () ->
-    IO.printf "do_something: %.2f done\n%!" n >>= fun () ->
-    IO.return ()
+value func_something _ctx n =
+  IO.printf "do_something: %.2f\n%!" n >>= fun () ->
+  Lwt_unix.sleep n >>= fun () ->
+  IO.printf "do_something: %.2f done\n%!" n >>= fun () ->
+  IO.return ()
 ;
 
 
-value io_main =
-  P.create_process_group () >>= fun servg ->
+value io_main () =
+  P.create_server_group () >>= fun servg ->
+(*
   P.create_process servg (loop do_something) >>= fun serv ->
-  P.send serv 0.5 >>= fun () ->
-  P.send serv 1.5 >>= fun () ->
-  P.send serv 2.5 >>= fun () ->
+*)
+  P.create_server servg func_something >>= fun serv ->
+  P.call serv 0.5 >>= fun () ->
+  P.call serv 1.5 >>= fun () ->
+  P.call serv 2.5 >>= fun () ->
+(*
+  если не закомментировано -- Lwt.Cancelled,
+  если закомментировано -- "done", за которым почему-то "s.c.:waiting"
+  разобраться с обоими.
+
+  Lwt_unix.sleep 8.0 >>= fun () ->
+*)
+  IO.printf "io_main: awake\n" >>= fun () ->
   IO.printf "done\n%!"
 ;
 
 
+value _io_main () =
+  IO.printf "1\n%!" >>= fun () ->
+  Lwt_unix.sleep 4.0 >>= fun () ->
+  IO.printf "2\n%!" >>= fun () ->
+  IO.return ()
+;
+
+
+value _io_main =
+  Lwt_unix.sleep 4.0
+;
+
+
 value () = printf "go!\n%!";
 
 value main () =
   try
-    match IO.runIO io_main with
+    match IO.runIO & io_main () with
     [ `Ok () -> ()
     | `Error e -> printf "main: %s\n%!" & Printexc.to_string e
     ]
   with
   [ e -> printf "main: uncaught exception: %s\n%!" & Printexc.to_string e ]
 ;
+
+value () = main ()
+;
 
     value printf : format4 'a unit string (m unit) -> 'a;
 
+
+    (* временно, будет меняться: *)
+    type sink 'a  (* посмотреть на новые Lwt.u, должно быть -'a. *)
+     and source +'a;
+    value pipe : unit -> (source 'a * sink 'a);
+    value put_sink : sink 'a -> 'a -> unit;
+    value wait_source : source 'a -> m 'a;
+    (* . *)
+
   end
 ;