ocaml_monad_io / iO_Lwt.ml

type m +'a = Lwt.t 'a;

value return = Lwt.return;

value bind = Lwt.(=<<);

value bind_rev = Lwt.bind;
value ( >>= ) = Lwt.bind;


value error = Lwt.fail;

value catch = Lwt.catch;


(*************************)


value wrap_exn _place = error
;


value wrap1 place f = fun a ->
  catch (fun () -> f a)
  (wrap_exn place)
;

value wrap2 place f = fun a b ->
  catch (fun () -> f a b)
  (wrap_exn place)
;

value wrap4 place f = fun a b c d ->
  catch (fun () -> f a b c d)
  (wrap_exn place)
;

value read_into = wrap4 "read_into" Lwt_io.read_into;

value error = Lwt.fail;

type output_channel = Lwt_io.output_channel;
value stdout = Lwt_io.stdout;
value write = wrap2 "write" Lwt_io.write;

type input_channel = Lwt_io.input_channel;

value open_in = wrap1 "open_in" (
  fun fn ->
    Lwt_io.open_file
      ~mode:Lwt_io.input
      ~flags:[Unix.O_RDONLY]
     fn
  )
;

value close_in x = wrap1 "close_in" Lwt_io.close x;

value runIO x : [= `Ok of 'a | `Error of exn ] =
  try `Ok (Lwt_main.run x)
  with [e -> `Error e]
;

value with_file_in_bin filename func =
  Lwt_io.with_file ~mode:Lwt_io.input filename func
;

value with_file_out_bin filename func =
  Lwt_io.with_file ~mode:Lwt_io.output filename func
;

(********************************************************)

value stdin = Lwt_io.stdin;

value printf fmt = Printf.ksprintf (write stdout) fmt;

value flush = Lwt_io.flush;

value close_out = Lwt_io.close;


(********************************************************)

(* copypasted from lwt_io.ml, modified:
   - to revised syntax,
   - more strict code (no more warnings),
   - added "wait_server"
 *)

type server =
  { shutdown : Lazy.t unit
  ; wait_server : Lwt.t unit
  }
;

value shutdown_server server = Lazy.force server.shutdown
;

value wait_server server = server.wait_server
;

value establish_server ?buffer_size ?(backlog=5) sockaddr f =
  let sock = Lwt_unix.socket (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0 in
  let () =
    ( Lwt_unix.setsockopt sock Unix.SO_REUSEADDR True
    ; Lwt_unix.bind sock sockaddr
    ; Lwt_unix.listen sock backlog
    ) in
  let (abort_waiter, abort_wakener) = Lwt.wait () in
  let (finish_waiter, finish_wakener) = Lwt.wait () in
  let abort_waiter = abort_waiter >>= fun [`Shutdown -> return `Shutdown] in
  let rec loop () =
    Lwt.pick
      [ Lwt_unix.accept sock >>= fun x -> return (`Accept x)
      ; abort_waiter
      ]
    >>= fun
      [ `Accept(fd, _addr) ->
          let () =
            (try Lwt_unix.set_close_on_exec fd
             with [Invalid_argument _ -> ()]) in
          let close = lazy begin
            Lwt_unix.shutdown fd Unix.SHUTDOWN_ALL;
            Lwt_unix.close fd
          end in
          let () =
            f ( Lwt_io.of_fd ?buffer_size ~mode:Lwt_io.input
                  ~close:(fun () -> Lazy.force close) fd
              , Lwt_io.of_fd ?buffer_size ~mode:Lwt_io.output
                  ~close:(fun () -> Lazy.force close) fd
              )
          in
          loop ()
      | `Shutdown ->
          Lwt_unix.close sock >>= fun () ->
          (match sockaddr with
            [ Unix.ADDR_UNIX path when path <> "" && path.[0] <> '\x00' ->
                ( Unix.unlink path
                ; return ()
                )
            | Unix.ADDR_UNIX _ | Unix.ADDR_INET _ ->
                return ()
            ]
          ) >>= fun () ->
          return (Lwt.wakeup finish_wakener ())
      ]
  in
  ( ignore (loop ())
  ; { shutdown = lazy (Lwt.wakeup abort_wakener `Shutdown)
    ; wait_server = finish_waiter
    }
  )
;

(* end of copypaste *)


(********************************************************)

(*
value establish_server ?buffer_size ?backlog sa func =
  Lwt_io.establish_server ?buffer_size ?backlog sa func
;

value shutdown_server = Lwt_io.shutdown_server
;
*)


value run_and_ignore_result = Lwt.ignore_result
;


(********************************************************)

module Sequence_Sequential
 =
  struct
    type io_m +'a = m 'a;
    type m +'a = io_m 'a;

    value ( >>= ) = Lwt.bind;
    value bind = bind;
    value return = return;
    value bind_rev = bind_rev;

    value (sequence_array : array (m 'a) -> m (array 'a)) arr_m_a =
      let len = Array.length arr_m_a in
      if len = 0
      then return [| |]
      else
        arr_m_a.(0) >>= fun a0 ->
        let res = Array.make len a0 in
        inner 1
        where rec inner i =
          if i = len
          then return res
          else
            arr_m_a.(i) >>= fun a ->
            ( res.(i) := a
            ; inner (i + 1)
            )
    ;

  end
;


module Sequence_Parallel
 =
  struct
    type io_m +'a = m 'a;
    type m +'a = io_m 'a;

    value ( >>= ) = Lwt.bind;
    value bind = bind;
    value return = return;
    value bind_rev = bind_rev;

    value (sequence_array : array (m 'a) -> m (array 'a)) arr_m_a =
      let len = Array.length arr_m_a in
      if len = 0
      then return [| |]
      else
        let res_opt = Array.make len None in
        let writer i m_a =
          m_a >>= fun a ->
          (res_opt.(i) := Some a; return ())
        in
        let writers = inner [] 0
          where rec inner acc i =
            if i = len
            then acc
            else inner [writer i arr_m_a.(i) :: acc] (i + 1)
        in
        Lwt.join writers >>= fun () ->
        let res = Array.map
          (fun [None -> assert False | Some x -> x])
          res_opt
        in
          return res
    ;

  end
;

value sequence_array = Sequence_Parallel.sequence_array;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.