Commits

Anonymous committed 15e59ca

And Now... The 'Parvel.process_limit' You've All Been Waiting For!

Comments (0)

Files changed (4)

     me mq disp =
       IO.catch
         (fun () ->
-           (IO.printf "s.c.: waiting\n%!" >>= fun () ->
+           ( (* IO.printf "s.c.: waiting\n%!" >>= fun () -> *)
             IO.Mq.take mq >>= fun msg ->
-            IO.printf "s.c.: taken\n%!" >>= fun () ->
+             (* IO.printf "s.c.: taken\n%!" >>= fun () -> *)
             (disp msg : IO.m (process_result _)) >>= fun res ->
             IO.return res
            )
           ([], [])
           progs_queue
       in
-        IO.join (List.map (fun c -> c ()) codes)
+      let () = Queue.clear progs_queue in
+      IO.join (List.map (fun c -> c ()) codes)
     ;
 
     value run_programs () =
 *)
 
 
-(*
+    value cmd_shutdown = Cmd `Shutdown;
+
     value (process_limit :
       ?nmax:int ->
+      ?dbg:(string -> unit) ->
       (factory 'i 'o) ->
       factory 'i 'o
     )
-    ?(nmax=42) in_fact =
+    ?(nmax=42) ?dbg in_fact =
 
-      fun out_context ->
+      let dbg msg =
+        match dbg with
+        [ None -> ()
+        | Some f -> f msg
+        ]
+      in
 
-        create_process_group () >>= fun group ->
+      fun _out_context ->
 
-        let common_mq = IO.Mq.create_empty ()
-        and nchildren = ref 0 in
+        create_process_group () >>= fun in_group ->
+        create_process_group () >>= fun mon_group ->
 
-        let monitor_disp = fun
-          [ Msg `Spawn | Cmd (`Exited _) ->
-              create_process_inner
-                group in_fact ~mq:common_mq >>= fun (_ : process _) ->
-              process_loop monitor_disp
-          | Msg (`Kill_n n) ->
-              .
-          | Cmd `Shutdown ->
-              process_exit ()
-          ]
+        let common_mq = IO.Mq.create_empty () in
+
+        let msg_spawn = Msg `Spawn in
+
+        let monitor_factory mcontext =
+          let do_spawn () =
+            create_process_inner
+              in_group in_fact ~mq:common_mq >>= fun process ->
+              let () = monitor mcontext process in
+              let () = dbg "mon: spawned" in
+              IO.return ()
+          in
+          IO.return (monitor_disp ~count:0 ~fin:False)
+          where rec monitor_disp ~count ~fin = fun
+            [ Msg `Spawn ->
+                let () = dbg "mon: `Spawn" in
+                do_spawn () >>= fun () ->
+                process_continue (monitor_disp ~count:(count + 1) ~fin)
+
+            | Cmd (`Exited _) ->
+                if not fin
+                then
+                  let () = dbg "mon: `Exited (fin=False)" in
+                  do_spawn () >>= fun () ->
+                  process_continue (monitor_disp ~count ~fin)
+                else
+                  let new_count = count - 1 in
+                  let () = dbg (Printf.sprintf
+                    "mon: `Exited (fin=True, new_count=%i)" new_count) in
+                  if new_count <= 0
+                  then
+                    process_exit ()
+                  else
+                    process_continue (monitor_disp ~count:new_count ~fin)
+
+            | Cmd `Shutdown ->
+                let () = dbg "mon: `Shutdown" in
+                (loop count >>= fun () ->
+                 process_continue (monitor_disp ~count ~fin:True)
+                )
+                where rec loop left =
+                  if left = 0
+                  then IO.return ()
+                  else
+                    IO.Mq.put common_mq cmd_shutdown >>= fun () ->
+                    loop (left - 1)
+            ]
         in
 
-        create_process group (noinit monitor_disp) >>= fun monitor ->
+        create_process mon_group monitor_factory >>= fun monitor ->
   
-        IO.return & fun
+        IO.return (out_disp ~count:0)
+        where rec out_disp ~count = fun
         [ (Msg _) as msg ->
+            let () = dbg "lim: Msg" in
 
-            if nchildren.val < nmax
+            if count < nmax
             then
-          - узнать у общей очереди, сможет ли кто-нибудь читать из неё сейчас
-  
-          - отправить сообщение в общую очередь неблокирующим образом
-  
-          - если нет читающих, то послать монитору сообщение Рожай
-            неблокирующим образом, новое состояние: M := M + 1
-  
-            else  (* M >= N *)
-  
-              IO.Mq.put_blocking common_mq msg
-  
-          - поставить сообщение в очередь блокирующим образом --
-            подождать, пока это сообщение не подхватит кто-то из
-            пользовательских процессов
-        | Cmd (`Exited _) -> assert False
-        | Cmd `Shutdown -> 
+              let () = dbg "lim:   count < nmax" in
+              let any_reader = IO.Mq.idle_reader_exists common_mq in
+              IO.Mq.put common_mq msg >>= fun () ->
+              (if not any_reader
+               then
+                 let () = dbg "lim:     no readers" in
+                 send monitor msg_spawn >>= fun () ->
+                 IO.return (count + 1)
+               else
+                 let () = dbg "lim:     have reader" in
+                 IO.return count
+              ) >>= fun new_count ->
+              process_continue (out_disp ~count:new_count)
+            else
+              (* count >= nmax *)
+              let () = dbg "lim:   count >= nmax" in
+
+              IO.Mq.put_blocking common_mq msg >>= fun () ->
+              process_continue (out_disp ~count)
+
+        | Cmd (`Exited _) ->
+            (* мы никого не мониторим *)
+            let () = dbg "lim: `Exited" in
+            process_continue (out_disp ~count)
+
+        | Cmd `Shutdown ->
+            let () = dbg "lim: `Shutdown" in
+            send monitor cmd_shutdown >>= fun () ->
+            process_exit ()
         ]
     ;
-*)
 
 
     (* todo: художественно описать process_limit с порождением
       server_factory 'i 'o
     ;
 
-(*
     (* processes with parallel execution, with strict limit
        of concurrently executing processes.
      *)
 
     value process_limit :
       ?nmax:int ->
+      ?dbg:(string -> unit) ->
       process_factory 'i ->
-      IO.m (process 'i)
+      process_factory 'i
     ;
-*)
 
   end
 ;
       sig
         type t 'a;
         value create_empty : unit -> t 'a;
+
         value put : t 'a -> 'a -> m unit;
+        value put_blocking : t 'a -> 'a -> m unit;
+
         value take : t 'a -> m 'a;
+
+        value idle_reader_exists : t 'a -> bool;
       end
     ;
 
 
 value () = printf "go!\n%!";
 
-value main () =
+value do_run () =
   try
     match IO.runIO & P.run_programs () with
     [ `Ok () -> ()
-    | `Error e -> printf "main: %s\n%!" & Printexc.to_string e
+    | `Error e -> printf "do_run: %s\n%!" & Printexc.to_string e
     ]
   with
-  [ e -> printf "main: uncaught exception: %s\n%!" & Printexc.to_string e ]
+  [ e -> printf "do_run: uncaught exception: %s\n%!" & Printexc.to_string e ]
 ;
 
-value () = main ()
+value () = do_run ()
 ;
+
+
+
+value () = P.register_program `Main "tests2/io_main" & fun () ->
+
+(* todo: разобраться, почему при 
+  let () = Random.init 740_666 in
+  let prob p = (Random.float 1. < p) in
+  оказывается так, что последние сообщения обрабатывает какой-то один
+  процесс, и дело не доходит до качественного shutdown
+*)
+
+  let st = Random.State.make [| 740 ; 666 |] in
+  let prob p = (Random.State.float st 1. < p) in
+(* *)
+
+  let worker_number = ref (-1) in
+
+  let worker_sleep = 0.01 in
+
+  let worker_factory _context =
+    let () = incr worker_number in
+    let worker_number = worker_number.val in
+    let print msg =
+      Printf.printf "w%i:%s %!" worker_number
+        (match msg with
+         [ `Msg n -> Printf.sprintf "m%i" n
+         | `Exit -> "exit"
+         | `Killed -> "killed"
+         | `Exited -> "`Exited???"
+         ])
+    in
+    IO.return worker_disp
+    where rec worker_disp = fun
+      [ P.Msg n ->
+          if prob 0.1
+          then
+            ( print `Exit
+            ; P.process_exit ()
+            )
+          else
+            ( print (`Msg n)
+            ; Lwt_unix.sleep worker_sleep >>= fun () ->
+              P.process_continue worker_disp
+            )
+      | P.Cmd `Shutdown ->
+          ( print `Killed
+          ; P.process_exit ()
+          )
+      | P.Cmd (`Exited _) ->
+          ( print `Exited
+          ; P.process_continue worker_disp
+          )
+      ]
+  in
+
+  let messages_rev = ref [] in
+  let dbg msg = messages_rev.val := [msg :: messages_rev.val] in
+
+  let workers_factory_limited = P.process_limit ~dbg ~nmax:3 worker_factory in
+
+  P.create_process_group () >>= fun group ->
+  P.create_process group workers_factory_limited >>= fun workers ->
+
+  (
+   send_messages 0 >>= fun () ->
+   P.send workers (P.Cmd `Shutdown) >>= fun () ->
+   IO.printf "sent shutdown\n" >>= fun () ->
+   IO.flush Lwt_io.stdout >>= fun () ->
+   Lwt_unix.sleep 2.0 >>= fun () ->
+   let () = Printf.printf " .\n%!" in
+   let () =
+     ( Printf.printf "debug messages:\n"
+     ; List.iter (Printf.printf "> %s\n") (List.rev messages_rev.val)
+     ; Printf.printf "\n%!"
+     )
+   in
+   IO.return ()
+  )
+
+  where rec send_messages i =
+    if i >= 200
+    then
+      IO.return ()
+    else
+      P.send workers (P.Msg i) >>= fun () ->
+      send_messages (i + 1)
+;
+
+
+value () = printf "go 2!\n%!";
+
+value () = do_run ()
+;