1. Dmitry Grebeniuk
  2. parvel

Source

parvel / parvel_lwt.ml

The branch 'net-base' does not exist.
(*************************************************************)

(* "ports" -- OS processes with message passing interface. *)

module IO = Parvel_IO
;


(*
value dbg fmt = Printf.ksprintf (fun s -> Printf.printf "DBG: %s\n%!" s) fmt;
*)
value dbg fmt = Printf.ifprintf stdout fmt;


module Port  (* (Parvel_F : Parvel_types.F) *)
 :
  sig

    type port_command =
      [ Shell of string
   (* | Process of
             string
           * argv : array string
           * env : array string (?)
           * redirections
    *)
      ]
    ;

    (* empty string in Stdin means "close stdin",
       in Stdout/Stderr -- "stdout/stderr closed".
     *)

    type port_in =
      [ Stdin of string
      | Kill of int
      ]
    ;

    type port_out =
      [ Stdout of string
      | Stderr of string
      | Exited of Unix.process_status
      ]
    ;

    value run
     : port_command ->
       Parvel.addrt port_out ->
       IO.m (Parvel.process port_in)
    ;

  end
 =
  struct

(*
    module IO = Parvel_IO;
    module Parvel = Parvel_F (IO);
*)

    open Parvel;

    value ( >>= ) m f = IO.bind f m
    ;

    value return_unit = IO.return ()
    ;


    type port_command =
      [ Shell of string
      ]
    ;

    type port_in =
      [ Stdin of string
      | Kill of int
      ]
    ;

    type port_out =
      [ Stdout of string
      | Stderr of string
      | Exited of Unix.process_status
      ]
    ;

    value bufsz = 4096;

    value run command addrt =

      let lwt_command =
        match command with
        [ Shell str -> Lwt_process.shell str
        ]
      in

      let process_full = Lwt_process.open_process_full lwt_command in

      let sender_factory _ctx =
        IO.return sender
        where rec sender msg =
         match msg with
         [ Msg `Go ->
          let send_ch mapf ch =
            let buf = String.make bufsz '\x00' in
            let rec loop () =
              IO.read_into ch buf 0 bufsz >>= fun has_read ->
              let () = dbg "port: sender: has_read=%i" has_read in
              let () = assert (0 <= has_read && has_read <= bufsz) in
              sendt addrt (mapf (String.sub buf 0 has_read)) >>= fun () ->
              if has_read = 0
              then return_unit
              else loop ()
            in
              loop ()
          in
            IO.catch
              (fun () ->
                 let () = dbg "port: sender: join" in
                 Lwt.join
                   [ (send_ch (fun x -> Msg (Stdout x))
                       process_full#stdout)
                   ; (send_ch (fun x -> Msg (Stderr x))
                       process_full#stderr)
                   ; (process_full#status >>= fun st ->
                       sendt addrt (Msg (Exited st))
                     )
                   ]
                 >>= fun () ->
                 let () = dbg "port: sender: exitting" in
                 process_exit ()
              )
              (fun e -> process_exit_error (`Exn e))
         | Cmd (_ : [> ]) ->
             process_exit ()
         ]
      in

      create_process sender_factory >>= fun sender_process ->
      send sender_process (Msg `Go) >>= fun () ->

      let receiver_factory _ctx =
        let process'_stdin = process_full#stdin
        and stdin_closed = ref False in
        let rec receiver_loop msg =
          (match msg with
           [ Msg (Stdin txt) ->
               let () = dbg "port: receiver: closed=%b, txt=%S"
                 stdin_closed.val
                 txt in
               match (txt, stdin_closed.val) with
               [ ("", True) -> return_unit
               | ("", False) ->
                   IO.close_out process'_stdin >>= fun () ->
                   ( stdin_closed.val := True
                   ; dbg "port: receiver: closed."
                   ; return_unit
                   )
               | (_, True) -> return_unit
               | (txt, False) ->
                   IO.write process'_stdin txt >>= fun () ->
                   IO.flush process'_stdin
               ]
           | Msg (Kill signum) ->
               let () = process_full#kill signum in
               return_unit
           | Cmd `Shutdown ->
               (* send sp (Cmd `Shutdown) *)
               (* тут надо убивать, потому что оно блокируется на Lwt.join
                  при чтении out+err *)
               return_unit
           | Cmd (_ : [> ]) ->
               return_unit
           ]
          ) >>= fun () ->
          process_continue receiver_loop
        in
          IO.return receiver_loop
      in

      create_process receiver_factory >>= fun rp ->

      IO.return rp
    ;

  end
;