Commits

Dmitry Grebeniuk  committed a4d142a

- groups

  • Participants
  • Parent commits bb931d1

Comments (0)

Files changed (4)

       }
     ;
 
-    type group_gen 'p =
-      { (* gid : Counter.t
-      ;*) children : mutable list 'p
-      }
-    ;
-
     type server 'i 'o = process (call 'i 'o);
 
-    type process_group 'i = group_gen (process 'i);
-    type server_group 'i 'o = group_gen (server 'i 'o);
-
     type context =
       { c_send_cmd : process_command_req -> IO.m unit
       ; c_mq_set_block_limit : int -> unit
       ]
     ;
 
-    value send_group group msg =
-      io_iter (fun proc -> send proc msg) group.children
-    ;
-
     (* диспетчер сообщений -- то, что содержит пользовательский код
      *)
 
 
 
 
-    (* обеспечить мономорфность получающегося типа, чтобы в случае top-level
-       значений были конкретные типы. *)
-
-    value create_process_group () =
-      IO.return
-      {(* gid = Counter.next g_counter
-      ;*) children = []
-      }
-    ;
-
-    value create_server_group = create_process_group
-    ;
-
-    value add_to_group group process =
-      group.children := [process :: group.children ]
-    ;
-
     value notify_monitor ~my_pid ~process_exit_status f =
       IO.run_and_ignore_result (f my_pid process_exit_status)
     ;
     ;
 
     value (create_process_inner :
-      process_group 'a ->
       process_factory 'a ->
       ~mq:(IO.Mq.t _) ->
       IO.m (process 'a)
     )
-    group factory ~mq =
+    factory ~mq =
 (*
       let self_ref = ref None in
       let lazy_me = lazy
         }
       in
       ( (* self_ref.val := Some process *) ()
-      ; add_to_group group process
       ; IO.run_and_ignore_result
           (factory context >>= fun disp -> process_loop process mq disp)
       ; IO.return process
 
 
     value (create_process :
-      process_group 'a ->
       process_factory 'a ->
       IO.m (process 'a)
     )
-    group disp =
+    disp =
       create_process_inner
-        group disp
+        disp
         ~mq:(IO.Mq.create ())
     ;
 
 
 
     value (create_server_inner :
-      server_group 'i 'o ->
       server_factory 'i 'o ->
       ~mq:(IO.Mq.t _) ->
       IO.m (server 'i 'o)
       )
-    server_group server_factory ~mq =
+    server_factory ~mq =
       let (process_factory : process_factory _) ctx =
         server_factory ctx >>= fun server_disp ->
         IO.return server_loop
                 process_continue server_loop
             ]
       in
-        create_process_inner ~mq server_group process_factory
+        create_process_inner ~mq process_factory
     ;
 
 
-    value create_server server_group server_factory =
+    value create_server server_factory =
       create_server_inner
-        server_group server_factory
+        server_factory
         ~mq:(IO.Mq.create ())
     ;
 
       ]
     ;
 
-    (* нужно конкретно для частной задачи, поэтому не уверен в ценности:
-       функция шлёт сообщения группе и дожидается ответа каждого из серверов
-       этой группы, затем возвращает массив ответов *)
-
-    value (call_group
-      : server_group 'i 'o -> 'i -> IO.m (array 'o)
-      ) _ _ = _notimpl "call_group"
-    ;
-
     (* ответы на сообщения будут производиться через передаваемый
       в функцию контекст, где будут функции send для простых процессов
       и reply для call-подобных. *)
       IO.m (switch 'k 'i 'o))
       ?(key_cmp = Pervasives.compare)
       () =
-        create_server_group () >>= fun group ->
         let workers = ref (Map.empty key_cmp) in
-        create_process group
+        create_process
           (fun ctx ->
              IO.return loop
              where rec (loop : process_dispatcher _) = fun
 
       fun _out_context ->
 
-        create_process_group () >>= fun in_group ->
-        create_process_group () >>= fun mon_group ->
-
         let common_mq = IO.Mq.create () in
 
         let msg_spawn = Msg `Spawn in
         let monitor_factory mcontext =
           let do_spawn () =
             create_process_inner
-              in_group in_fact ~mq:common_mq >>= fun process ->
+              in_fact ~mq:common_mq >>= fun process ->
               let () = monitor mcontext process in
               let () = dbg "mon: spawned" in
               IO.return ()
             ]
         in
 
-        create_process mon_group monitor_factory >>= fun monitor ->
+        create_process monitor_factory >>= fun monitor ->
   
         IO.return (out_disp ~count:0)
         where rec out_disp ~count = fun
      :
       IO.m (process 'i)
      =
-      create_process_group () >>= fun group ->
-
-      create_process group (
+      create_process (
       fun context ->
         fact context >>= fun disp ->
         let rec st_loop ~state msg =
   ]
 ;
 
-type process 'i;
-
-(* группа процессов *)
-type process_group 'i;
-
-type server_group 'i 'o;
+type process 'i
+;
 
 type dest 'o
 ;
 value dest_put : dest 'o -> call_resp 'o -> IO.m unit
 ;
 
-value create_process_group : unit -> IO.m (process_group 'i);
-
 value create_process :
-  process_group 'i ->
   process_factory 'i ->
   IO.m (process 'i);
 
-value create_server_group : unit -> IO.m (server_group 'i 'o);
-
 value create_server :
-  server_group 'i 'o ->
   server_factory 'i 'o ->
   IO.m (server 'i 'o);
 
 
 value sendt : addrt 'o -> process_message_req 'o -> IO.m unit;
 
-value send_group : process_group 'i -> process_message_req 'i -> IO.m unit;
-
 value call : server 'i 'o -> 'i -> IO.m (call_resp 'o);
 
 exception ECall of call_resp_error;
 value call_io : server 'i 'o -> 'i -> IO.m 'o;
 
 
-value call_group : server_group 'i 'o -> 'i -> IO.m (array 'o);
-
-
 (* switch-servers *)
 
 type switch 'k 'i 'o;

File parvel_lwt.ml

 
     value run command addrt =
 
-      create_process_group () >>= fun rgroup ->
-      create_process_group () >>= fun sgroup ->
-
       let lwt_command =
         match command with
         [ Shell str -> Lwt_process.shell str
          ]
       in
 
-      create_process sgroup sender_factory >>= fun sender_process ->
+      create_process sender_factory >>= fun sender_process ->
       send sender_process (Msg `Go) >>= fun () ->
 
       let receiver_factory _ctx =
           IO.return receiver_loop
       in
 
-      create_process rgroup receiver_factory >>= fun rp ->
+      create_process receiver_factory >>= fun rp ->
 
       IO.return rp
     ;
 
 
 value () = P.register_program `Main "tests/io_main" & fun () ->
-  P.create_server_group () >>= fun servg ->
 (*
-  P.create_process servg (loop do_something) >>= fun serv ->
+  P.create_process (loop do_something) >>= fun serv ->
 *)
   let servf = P.add_state_to_server (P.noinit func_something_state) [] in
-  P.create_server servg servf >>= fun serv ->
+  P.create_server servf >>= fun serv ->
   P.call_io serv 0.5 >>= fun () ->
   P.call_io serv 1.5 >>= fun () ->
   P.call_io serv 2.5 >>= fun () ->
 
 
 value _io_main_stateless () =
-  P.create_server_group () >>= fun servg ->
 (*
-  P.create_process servg (loop do_something) >>= fun serv ->
+  P.create_process (loop do_something) >>= fun serv ->
 *)
-  P.create_server servg (P.noinit func_something) >>= fun serv ->
+  P.create_server (P.noinit func_something) >>= fun serv ->
   P.call_io serv 0.5 >>= fun () ->
   P.call_io serv 1.5 >>= fun () ->
   P.call_io serv 2.5 >>= fun () ->
   let workers_factory_limited =
     P.process_limit ~dbg ~nmax:3 worker_factory in
 
-  P.create_process_group () >>= fun group ->
-  P.create_process group workers_factory_limited >>= fun workers ->
+  P.create_process workers_factory_limited >>= fun workers ->
 
   (
    send_messages 0 >>= fun () ->
 ;
 
 
-value run_n_processes n group f_process_factory f_init_msg =
+value run_n_processes n f_process_factory f_init_msg =
    loop 0
   where rec loop i =
     if i >= n
     then IO.return ()
     else
-      P.create_process group (f_process_factory i) >>= fun p ->
+      P.create_process (f_process_factory i) >>= fun p ->
       P.send p (f_init_msg p) >>= fun () ->
       loop (i + 1)
 ;
 
 value () = P.register_program `Main "tests3/io_main" & fun () ->
 
-  P.create_server_group () >>= fun worker_group ->
-  P.create_process_group () >>= fun client_group ->
-
   let worker_sleep = 0.02 in
   let clients_count = 10 in
   let client_messages = 20 in
 
   in
 
-  P.create_server worker_group worker_factory >>= fun worker ->
+  P.create_server worker_factory >>= fun worker ->
 
   let f_client_factory n _context =
     let () = Printf.printf "(c%i)%!" n in
      ]
   in
 
-    run_n_processes clients_count client_group
+    run_n_processes clients_count
       f_client_factory (fun _ -> P.Msg ())
     >>= fun () ->
     Lwt_unix.sleep 10. >>= fun () ->
 
 value () = P.register_program `Main "tests4/io_main" & fun () ->
 
-  P.create_process_group () >>= fun group_recv ->
-
   let receiver_factory = P.noinit &
     receive_msg
     where rec receive_msg msg =
       ]
   in
 
-  P.create_process group_recv receiver_factory >>= fun receiver_proc ->
+  P.create_process receiver_factory >>= fun receiver_proc ->
 
   let raddrt = P.addrt_of_proc receiver_proc in