Source

ocaml_monad_io / iO_Lwt.ml

Dmitry Grebeniuk 722f204 






Dmitry Grebeniuk 46ffdd3 

Dmitry Grebeniuk 722f204 



Dmitry Grebeniuk 2554b68 
























































Dmitry Grebeniuk e0f1655 









Dmitry Grebeniuk 80c546c 



Dmitry Grebeniuk 46ffdd3 




Dmitry Grebeniuk 80c546c 
Dmitry Grebeniuk 46ffdd3 






































































Dmitry Grebeniuk 80c546c 





Dmitry Grebeniuk 46ffdd3 

Dmitry Grebeniuk 80c546c 


Dmitry Grebeniuk 3ee622d 











































































type m +'a = Lwt.t 'a;

value return = Lwt.return;

value bind = Lwt.(=<<);

value bind_rev = Lwt.bind;
value ( >>= ) = Lwt.bind;


value error = Lwt.fail;

value catch = Lwt.catch;


(*************************)


value wrap_exn _place = error
;


value wrap1 place f = fun a ->
  catch (fun () -> f a)
  (wrap_exn place)
;

value wrap2 place f = fun a b ->
  catch (fun () -> f a b)
  (wrap_exn place)
;

value wrap4 place f = fun a b c d ->
  catch (fun () -> f a b c d)
  (wrap_exn place)
;

value read_into = wrap4 "read_into" Lwt_io.read_into;

value error = Lwt.fail;

type output_channel = Lwt_io.output_channel;
value stdout = Lwt_io.stdout;
value write = wrap2 "write" Lwt_io.write;

type input_channel = Lwt_io.input_channel;

value open_in = wrap1 "open_in" (
  fun fn ->
    Lwt_io.open_file
      ~mode:Lwt_io.input
      ~flags:[Unix.O_RDONLY]
     fn
  )
;

value close_in x = wrap1 "close_in" Lwt_io.close x;

value runIO x : [= `Ok of 'a | `Error of exn ] =
  try `Ok (Lwt_main.run x)
  with [e -> `Error e]
;

value with_file_in_bin filename func =
  Lwt_io.with_file ~mode:Lwt_io.input filename func
;

value with_file_out_bin filename func =
  Lwt_io.with_file ~mode:Lwt_io.output filename func
;

(********************************************************)

value stdin = Lwt_io.stdin;

value printf fmt = Printf.ksprintf (write stdout) fmt;

value flush = Lwt_io.flush;

value close_out = Lwt_io.close;


(********************************************************)

(* copypasted from lwt_io.ml, modified:
   - to revised syntax,
   - more strict code (no more warnings),
   - added "wait_server"
 *)

type server =
  { shutdown : Lazy.t unit
  ; wait_server : Lwt.t unit
  }
;

value shutdown_server server = Lazy.force server.shutdown
;

value wait_server server = server.wait_server
;

value establish_server ?buffer_size ?(backlog=5) sockaddr f =
  let sock = Lwt_unix.socket (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0 in
  let () =
    ( Lwt_unix.setsockopt sock Unix.SO_REUSEADDR True
    ; Lwt_unix.bind sock sockaddr
    ; Lwt_unix.listen sock backlog
    ) in
  let (abort_waiter, abort_wakener) = Lwt.wait () in
  let (finish_waiter, finish_wakener) = Lwt.wait () in
  let abort_waiter = abort_waiter >>= fun [`Shutdown -> return `Shutdown] in
  let rec loop () =
    Lwt.pick
      [ Lwt_unix.accept sock >>= fun x -> return (`Accept x)
      ; abort_waiter
      ]
    >>= fun
      [ `Accept(fd, _addr) ->
          let () =
            (try Lwt_unix.set_close_on_exec fd
             with [Invalid_argument _ -> ()]) in
          let close = lazy begin
            Lwt_unix.shutdown fd Unix.SHUTDOWN_ALL;
            Lwt_unix.close fd
          end in
          let () =
            f ( Lwt_io.of_fd ?buffer_size ~mode:Lwt_io.input
                  ~close:(fun () -> Lazy.force close) fd
              , Lwt_io.of_fd ?buffer_size ~mode:Lwt_io.output
                  ~close:(fun () -> Lazy.force close) fd
              )
          in
          loop ()
      | `Shutdown ->
          Lwt_unix.close sock >>= fun () ->
          (match sockaddr with
            [ Unix.ADDR_UNIX path when path <> "" && path.[0] <> '\x00' ->
                ( Unix.unlink path
                ; return ()
                )
            | Unix.ADDR_UNIX _ | Unix.ADDR_INET _ ->
                return ()
            ]
          ) >>= fun () ->
          return (Lwt.wakeup finish_wakener ())
      ]
  in
  ( ignore (loop ())
  ; { shutdown = lazy (Lwt.wakeup abort_wakener `Shutdown)
    ; wait_server = finish_waiter
    }
  )
;

(* end of copypaste *)


(********************************************************)

(*
value establish_server ?buffer_size ?backlog sa func =
  Lwt_io.establish_server ?buffer_size ?backlog sa func
;

value shutdown_server = Lwt_io.shutdown_server
;
*)


value run_and_ignore_result = Lwt.ignore_result
;


(********************************************************)

module Sequence_Sequential
 =
  struct
    type io_m +'a = m 'a;
    type m +'a = io_m 'a;

    value ( >>= ) = Lwt.bind;
    value bind = bind;
    value return = return;
    value bind_rev = bind_rev;

    value (sequence_array : array (m 'a) -> m (array 'a)) arr_m_a =
      let len = Array.length arr_m_a in
      if len = 0
      then return [| |]
      else
        arr_m_a.(0) >>= fun a0 ->
        let res = Array.make len a0 in
        inner 1
        where rec inner i =
          if i = len
          then return res
          else
            arr_m_a.(i) >>= fun a ->
            ( res.(i) := a
            ; inner (i + 1)
            )
    ;

  end
;


module Sequence_Parallel
 =
  struct
    type io_m +'a = m 'a;
    type m +'a = io_m 'a;

    value ( >>= ) = Lwt.bind;
    value bind = bind;
    value return = return;
    value bind_rev = bind_rev;

    value (sequence_array : array (m 'a) -> m (array 'a)) arr_m_a =
      let len = Array.length arr_m_a in
      if len = 0
      then return [| |]
      else
        let res_opt = Array.make len None in
        let writer i m_a =
          m_a >>= fun a ->
          (res_opt.(i) := Some a; return ())
        in
        let writers = inner [] 0
          where rec inner acc i =
            if i = len
            then acc
            else inner [writer i arr_m_a.(i) :: acc] (i + 1)
        in
        Lwt.join writers >>= fun () ->
        let res = Array.map
          (fun [None -> assert False | Some x -> x])
          res_opt
        in
          return res
    ;

  end
;

value sequence_array = Sequence_Parallel.sequence_array;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.