js-elisp / process.ml

(* CRv201108 mshinwell: Move this module to code-reviewed Async. *)

open Core.Std
open Async.Std

module Exit_or_signal_or_stop = Unix.Exit_or_signal_or_stop
module Fd = Unix.Fd
module Async_unix = Unix
module Unix = Core.Std.Unix

module Output = struct
  type 'a t = {
    stdout: 'a;
    stderr: 'a
  }
end

let term_or_kill pid =
  let pid_spec = `Pid pid in
  match Signal.send Signal.term pid_spec with
  | `No_such_process -> ()
  | `Ok ->
      after (sec 1.) >>> fun () ->
      (* CR till: ignore without a cast *)
      ignore (Signal.send Signal.kill pid_spec)
;;

module Kill_at_shutdown : sig
  val register : Pid.t -> unit
  val unregister : Pid.t -> unit
end = struct
  let kill_me = ref Set.Poly.empty

  let register pid = kill_me := Set.add !kill_me pid
  let unregister pid = kill_me := Set.remove !kill_me pid

  let () =
    Shutdown.at_shutdown (fun () ->
      Set.iter !kill_me ~f:term_or_kill;
      Deferred.unit)
  ;;
end

(*
   As it turns out, forking is tricky... here are some of the special cases that
   this function attempts to take out of the hands of the user.  Read this list carefully
   before messing around with this function.  In particular, there are non obvious cases
   where the order of calls is important:
   - if [f ()] throws an exception, make sure the file descriptors are closed and the
     forked program is killed
   - if the forked program exits before [f ()], make sure it is reaped quickly, and that
     the pipes are closed, regardless of the state of f
   - after you fork a child, it will have duped the endpoints of your pipes, and the
     parent should close the local endpoints
   - if the program calling [f ()] exits while f is running, make sure that the forked
     child is killed
*)
let create ?kill:kill_def ~prog ~args ~env ?working_dir ?(stdin = "") ~f () =
  let stdin_text = stdin in
  let process_exited = ref false in
  let kill ~pid = if not !process_exited then term_or_kill pid in
  In_thread.syscall_exn (fun () ->
    Unix.create_process_env ~prog ~args ~env ?working_dir ())
  >>= fun proc_info ->
    let pid    = proc_info.Unix.Process_info.pid in
    (* CR till: We are most likely the process leader; this seems like
       unwelcomed extra complexity. What exactly are we guarding against? *)
    Kill_at_shutdown.register pid;
    let stdin =
      Fd.create Fd.Kind.Fifo proc_info.Unix.Process_info.stdin ~name:"<stdin>"
    in
    let stdout =
      Fd.create Fd.Kind.Fifo proc_info.Unix.Process_info.stdout ~name:"<stdout>"
    in
    let stderr =
      Fd.create Fd.Kind.Fifo proc_info.Unix.Process_info.stderr ~name:"<stderr>"
    in
    let stdin  = Writer.create stdin  in
    let stdout = Reader.create stdout in
    let stderr = Reader.create stderr in
    let cleanup () =
      Writer.close stdin                 >>= fun () ->
      Reader.close stdout                >>= fun () ->
      Reader.close stderr                >>| fun () ->
      after (sec 1.0) >>> fun () ->
      kill ~pid;
      Kill_at_shutdown.unregister pid
    in
    let process_status =
      (* XCR till: this will block a thread from the thread pool for an
         indeterminate amount of time..
         mbacarella: fixed by implementing an async-friendly wait *)
      Async_unix.wait (`Pid pid)
      >>| fun (_, status) ->
      process_exited := true;
      status
    in
    (* CR till: I can think of only EPIPE as a valid error.
       Why do we discard the rest?*)
    Stream.iter (Monitor.errors (Writer.monitor stdin)) ~f:(fun _ -> ());
    (* CR till: the underlying fd should be closed when we are done
       writing to it to let the process know that it shouldn't wait for any more
       data. I guess we can rely on [f] to do that; but it just seems simpler
       to either:
       _ not pass stdin to f
       _ not pipe anything in stdin
       There's a very careful dance that [f] must peform: it should close
       [stdin] but if it does close [std{err,out}] it needs to tell the caller
       of that function to deal with SIGPIPE.
    *)
    Writer.write stdin stdin_text;
    Monitor.try_with (fun () ->
      Option.iter kill_def ~f:(fun v -> v >>> fun () -> kill ~pid);
      (* CR till: I don't see the point of this. We can start reading the output
         as soon as the process is dispatched.
         amosca: I think it's the other way round, this is waiting for whatever we passed
         into ?stdin to be flushed before it starts calling f.
      *)
      Writer.flushed stdin >>= fun _ -> (
        f pid ~stdin ~stdout ~stderr
      )
    ) >>=
    (fun r ->
      cleanup ()
      >>= fun _ ->
      match r with
      | Ok v -> process_status >>| (fun ps -> (v, ps))
      | Error e ->
        (* CR sweeks: [raise e] if no one cares about the form of the exception. *)
        raise (Monitor.extract_exn e))
;;

type file_descr = Unix.File_descr.t

let create_fds ~kill:kill_def ~prog ~args ~stdin ~stdout ~stderr ~f:caller_f =
  match Unix.fork () with
  | `In_the_child ->
    begin
      try
        Unix.dup2 ~src:stdin ~dst:Unix.stdin;
        Unix.dup2 ~src:stdout ~dst:Unix.stdout;
        Unix.dup2 ~src:stderr ~dst:Unix.stderr;
        Unix.execvp ~prog ~args:(Array.of_list (prog::args))
      with
      | _ ->
        Printf.eprintf "exec failed: %s\n%!" prog;
        Unix.exit_immediately 1
    end
  | `In_the_parent pid ->
    Kill_at_shutdown.register pid;
    let process_exited = ref false in
    let kill_child () =
      if not !process_exited then term_or_kill pid;
      Kill_at_shutdown.unregister pid
    in
    (kill_def >>> kill_child);
    caller_f pid;
    let process_status =
      (* XCR till: this will block a thread from the thread pool for an
         indeterminate amount of time..
         mbacarella: fixed by implementing an async-friendly wait *)
      Async_unix.wait (`Pid pid)
      >>| fun (_pid, status) ->
      process_exited := true;
      status
    in
    process_status
;;

let open_in ?(is_ok = Result.is_ok) ?kill ~prog ~args () =
  let cmd = sprintf "%s %s" prog (String.concat args ~sep:" ") in
  let readers = Ivar.create () in
  let env = `Extend [] in
  let status =
    create ?kill ~prog ~args ~env () ~f:(fun _pid ~stdin ~stdout ~stderr ->
      Ivar.fill readers (stdout, stderr);
      Writer.close stdin >>= fun () ->
        Deferred.all_unit [Reader.closed stdout; Reader.closed stderr]
    )
  in
  upon status (fun ((), status) ->
    let killed =
      match kill with
      | Some v -> Deferred.peek v = Some ()
      | None -> false
    in
    if not (is_ok status) && (not killed) then
      failwithf "Process.open_in '%s' exited with status '%s'"
        cmd (Unix.Exit_or_signal.to_string_hum status) ());
  Ivar.read readers >>| fun (stdout, stderr) ->
    {Output.
     stdout = stdout;
     stderr = stderr
    }
;;

type 'a backtick =
  ?kill:unit Deferred.t
  -> ?env:[`Replace of (string * string) list | `Extend of (string * string) list]
  -> prog:string
  -> args:string list
  -> ?working_dir:string
  -> ?stdin:string
  -> unit
  -> 'a

module Backtick : sig
  type 'a t = 'a backtick

  val map : 'a t -> f:('a -> 'b) -> 'b t
end = struct
  type 'a t = 'a backtick

  let map t ~f ?kill ?env ~prog ~args ?working_dir ?stdin () =
    f (t ?kill ?env ~prog ~args ?working_dir ?stdin ())
end

(* CR estokes for mshinwell: Should this have is_ok, like open_in? *)
let backtick_status ?kill ?(env=`Extend []) ~prog ~args ?working_dir ?(stdin = "") () =
  create () ?kill ~prog ~args ~env ?working_dir ~stdin ~f:(fun _pid ~stdin ~stdout ~stderr ->
    Writer.close stdin
    >>= fun () ->
    Deferred.both (Reader.contents stdout) (Reader.contents stderr)
  ) >>| fun ((stdout, stderr), status) ->
  ({ Output.stdout; stderr; }, status)
;;

let backtick =
  Backtick.map backtick_status
    ~f:(fun d -> d >>| fun (output, _status) -> output)
;;

module Lines_or_sexp = struct
  type t =
  | Lines of string list
  | Sexp of Sexp.t
  with sexp_of

  let create string =
    try Sexp (Sexp.of_string (String.strip string))
    with _ -> Lines (String.split ~on:'\n' string)
  ;;
end

module Command_failed = struct
  type t =
    { prog : string;
      args : string list;
      status: Unix.Exit_or_signal.t;
      stdout : Lines_or_sexp.t;
      stderr : Lines_or_sexp.t;
    }
  with sexp_of

  exception E of t with sexp
end

let backtick_new ?kill ?env ~prog ~args ?working_dir ?stdin () =
  backtick_status ?kill ?env ~prog ~args ?working_dir ?stdin ()
  >>| fun (output, status) ->
  let stdout = output.Output.stdout in
  if Result.is_ok status then
    Ok stdout
  else
    Error (Command_failed.E
             { Command_failed.
               prog = prog;
               args = args;
               stdout = Lines_or_sexp.create stdout;
               stderr = Lines_or_sexp.create output.Output.stderr;
               status = status;
             })
;;

let backtick_new_exn =
  Backtick.map backtick_new ~f:(fun res ->
    res >>| function
      | Ok stdout -> stdout
      | Error exn -> raise exn)
;;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.