Commits

Anonymous committed 1c43609

.

Comments (0)

Files changed (2)

       }
     ;
 
+    type addrt 'o =
+      { aty : typeinfot 'o
+      ; asend : process_message_req 'o -> IO.m unit  (* cheating! *)
+      }
+    ;
+
     type group_gen 'p =
       { (* gid : Counter.t
       ;*) children : mutable list 'p
 
     value send proc msg = proc.p_send_msg msg;
 
+    value (sendt : addrt 'o -> process_message_req 'o -> IO.m unit)
+     addrt msg =
+      addrt.asend msg
+    ;
+
+    value addrt_of_proc ty proc =
+      { aty = ty
+      ; asend = proc
+      }
+    ;
+
     value rec io_iter f lst =
       match lst with
       [ [] -> IO.return ()
 
     (*************************************************************)
 
-    (* . *)
-
-
-
-    (*************************************************************)
-
     (* registering and running programs *)
 
     value progs_queue = Queue.create ()
+(*************************************************************)
+
+(* "ports" -- OS processes with message passing interface. *)
+
+module Port
+ :
+  sig
+
+    type port_command =
+      [ Shell of string
+   (* | Process of
+             string
+           * argv : array string
+           * env : array string (?)
+           * redirections
+    *)
+      ]
+    ;
+
+    type port_in =
+      [ Stdin of string
+      | Kill of int
+      ]
+    ;
+
+    type port_out =
+      [ Stdout of string
+      | Stderr of string
+      | Exited of Unix.process_status
+      ]
+    ;
+
+    value run : port_command -> addrt port_out -> IO.m (process port_in)
+    ;
+
+  end
+ =
+  struct
+
+    type port_command =
+      [ Shell of string
+      ]
+    ;
+
+    type port_in =
+      [ Stdin of string
+      | Kill of int
+      ]
+    ;
+
+    type port_out =
+      [ Stdout of string
+      | Stderr of string
+      | Exited of Unix.process_status
+      ]
+    ;
+
+    value bufsz = 4096;
+
+    value run command addrt =
+
+      create_process_group () >>= fun group ->
+
+      let lwt_command =
+        match command with
+        [ Shell str -> Lwt_process.shell str
+        ]
+      in
+
+      let process_full = Lwt_process.open_process_full lwt_command in
+
+      let sender_factory _ctx =
+        IO.return sender
+        where rec sender () =
+          let send_ch mapf ch =
+            let buf = String.make bufsz '\x00' in
+            let rec loop () =
+              IO.read_into ch buf 0 bufsz >>= fun has_read ->
+              let () = assert (0 <= has_read && has_read <= bufsz) in
+              sendt addrt (mapf (String.sub buf 0 has_read)) >>= fun () ->
+              if has_read = 0
+              then return_unit
+              else loop ()
+            in
+              loop ()
+          in
+            IO.catch
+              (fun () ->
+                 Lwt.join
+                   [ (send_ch (fun x -> Msg (Stdout x))
+                       process_full#stdout)
+                   ; (send_ch (fun x -> Msg (Stderr x))
+                       process_full#stderr)
+                   ; (process_full#status >>= fun st ->
+                      sendt addrt (Exited st)
+                     )
+                   ]
+                 >>= fun () ->
+                 process_exit ()
+              )
+              (fun e -> process_exit_error e)
+      in
+
+      let receiver_factory _ctx =
+        let process'_stdin = process_full#stdin in
+        IO.return receiver_loop
+        where rec receiver_loop msg =
+          (match msg with
+           [ Stdin txt ->
+               IO.write process'_stdin txt
+           | Kill signum ->
+               let () = process_full#kill signum in
+               return_unit
+           ]
+          ) >>= fun () ->
+          process_continue receiver_loop
+      in
+
+      create_process group receiver_factory >>= fun rp ->
+      create_process group sender_factory >>= fun _sp ->
+
+      IO.return rp
+    ;
+
+  end
+;