1. Sebastien Mondet
  2. pvem_lwt_unix

Commits

Sebastien Mondet  committed 299c4b3

lib: add module `Deferred_list`

  • Participants
  • Parent commits e865d5d
  • Branches master

Comments (0)

Files changed (1)

File pvem_lwt_unix.ml

View file
 
 open Deferred_result
 
-module Internal_pervasives = struct
-  let (|>) x f = f x
-  module String = StringLabels
-  module List = ListLabels
-  module Filename = struct
-    include Filename
-
-    let string_rexists s ~f ~from:n =
-      let rec loop n =
-        if n = 0 then
-          None
-        else if f s.[n - 1] then
-          Some n
-        else
-          loop (n - 1)
-      in
-      loop n
-
-    let skip_end_slashes s ~from =
-      match string_rexists s ~from ~f:(fun c -> c <> '/') with
-      | Some v -> `Ends_at v
-      | None   -> `All_slashes
-    let split = function
-    | "" -> ".", "."
-    | s ->
-      match skip_end_slashes s ~from:(String.length s) with
-      | `All_slashes -> "/", "/"
-      | `Ends_at basename_end ->
-        match string_rexists s ~f:(fun c -> c = '/') ~from:basename_end with
-        | None -> ".", String.sub ~pos:0 ~len:basename_end s
-        | Some basename_start ->
-          let basename =
-            String.sub s ~pos:basename_start
-              ~len:(basename_end - basename_start)
-          in
-          let dirname =
-            match skip_end_slashes s ~from:basename_start with
-            | `All_slashes -> "/"
-            | `Ends_at dirname_end -> String.sub ~pos:0 ~len:dirname_end s
-          in
-          dirname, basename
-    let parts filename =
-      let rec loop acc filename =
-        match split filename with
-        | "." as base, "." -> base :: acc
-        | "/" as base, "/" -> base :: acc
-        | rest, dir ->
-          loop (dir :: acc) rest
-      in
-      loop [] filename
-  end
-end
-open Internal_pervasives
-open Printf
 
 
 module type IO = sig
 
 end
 
-module IO : IO = struct
-
-  let wrap_deferred_io f =
-    wrap_deferred ~on_exn:(fun e -> `io_exn e) (fun () -> f ())
-
-  (******************************************************************************)
-  (* Channels *)
-
-  let with_out_channel ?buffer_size ~f out =
-    begin
-      let open_file ~flags file =
-        wrap_deferred_io (fun () ->
-            Lwt_io.open_file ~mode:Lwt_io.output ?buffer_size ~flags file)
-      in
-      begin match out with
-      | `stdout -> return Lwt_io.stdout
-      | `stderr -> return Lwt_io.stderr
-      | `channel c -> return c
-      | `append_to_file file ->
-        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_APPEND ]) file
-      | `overwrite_file file ->
-        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_TRUNC ]) file
-      | `create_file file ->
-        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_EXCL ]) file
-      end
-      >>= fun outchan ->
-      begin
-        f outchan
-        >>< begin function
-        | `Ok o ->
-          wrap_deferred_io (fun () -> Lwt_io.close outchan)
-          >>= fun () ->
-          return o
-        | `Error e ->
-          begin match out with
-          | `append_to_file _
-          | `overwrite_file _
-          | `create_file _ ->
-            wrap_deferred_io (fun () -> Lwt_io.close outchan)
-            >>= fun _ ->
-            fail e
-          | _ -> fail e
-          end
-        end
-      end
-    end
-    >>< begin function
-    | `Ok o -> return o
-    | `Error (`io_exn (Unix.Unix_error (Unix.ENOENT, _, path))) ->
-      fail (`wrong_path path)
-    | `Error (`io_exn (Unix.Unix_error (Unix.EEXIST, _, path))) ->
-      fail (`file_exists path)
-    | `Error  e -> fail e
-    end
-
-  let write out s =
-    wrap_deferred_io (fun () -> Lwt_io.fprint out s)
-
-  let flush out = wrap_deferred_io (fun () -> Lwt_io.flush out)
-
-
-  let with_in_channel inspec ?buffer_size ~f =
-    begin match inspec with
-    | `stdin -> return Lwt_io.stdin
-    | `channel c -> return c
-    | `file file ->
-      wrap_deferred_io (fun () ->
-          Lwt_io.open_file ~mode:Lwt_io.input ?buffer_size file)
-    end
-    >>= fun inchan ->
-    begin
-      f inchan
-      >>< begin function
-      | `Ok o ->
-        wrap_deferred_io (fun () -> Lwt_io.close inchan)
-        >>= fun () ->
-        return o
-      | `Error e ->
-        begin match inspec with
-        | `file _ ->
-          wrap_deferred_io (fun () -> Lwt_io.close inchan)
-          >>= fun _ ->
-          fail e
-        | _ -> fail e
-        end
-      end
-    end
-
-  let read ?count i =
-    wrap_deferred_io (fun () -> Lwt_io.read ?count i)
-
-  (******************************************************************************)
-  (* Whole Files *)
-
-  let  write_file file ~content =
-    catch_deferred Lwt_io.(fun () ->
-        with_file ~mode:output file (fun i -> write i content))
-    >>< function
-    | `Ok o -> return o
-    | `Error e -> fail (`write_file_error (file, e))
-
-  let read_file file =
-    catch_deferred Lwt_io.(fun () ->
-        with_file ~mode:input file (fun i -> read i))
-     >>< function
-    | `Ok o -> return o
-    | `Error e -> fail (`read_file_error (file, e))
-
-end
-
 module type SYSTEM = sig
   (** Basic system access functions. *)
 
                      | `list_directory of string ] *
                        [> `exn of exn
                        | `file_not_found of string] ]) t
+end
+
+
+module type DEFERRED_LIST = sig
+(** Monadic “loops” on lists. *)
+
+  (** Sequentially launch [f] on the first argument and
+      get out of the loop at the first error.
+      The function returns the list of results if all succeed, or the
+      first error.
+  *)
+  val while_sequential: 'a list -> f:('a -> ('c, 'b) t) -> ('c list, 'b) t
+
+  (** Sequentially launch [f] on the first argument and process the
+      whole list even if there are errors.
+      The function returns the list of successes and the list of errors.  *)
+  val for_sequential: 'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
+
+  (** Like [for_sequential] but all the threads are launched concurrently. *)
+  val for_concurrent: 'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
+
+  (** Like [for_concurrent] but with the index in the list passed to the
+      function. *)
+  val for_concurrent_with_index:
+    'a list -> f:(int -> 'a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
+
+end
 
 
+module Internal_pervasives = struct
+  let (|>) x f = f x
+  module String = StringLabels
+  module List = ListLabels
+  module Filename = struct
+    include Filename
+
+    let string_rexists s ~f ~from:n =
+      let rec loop n =
+        if n = 0 then
+          None
+        else if f s.[n - 1] then
+          Some n
+        else
+          loop (n - 1)
+      in
+      loop n
 
+    let skip_end_slashes s ~from =
+      match string_rexists s ~from ~f:(fun c -> c <> '/') with
+      | Some v -> `Ends_at v
+      | None   -> `All_slashes
+    let split = function
+    | "" -> ".", "."
+    | s ->
+      match skip_end_slashes s ~from:(String.length s) with
+      | `All_slashes -> "/", "/"
+      | `Ends_at basename_end ->
+        match string_rexists s ~f:(fun c -> c = '/') ~from:basename_end with
+        | None -> ".", String.sub ~pos:0 ~len:basename_end s
+        | Some basename_start ->
+          let basename =
+            String.sub s ~pos:basename_start
+              ~len:(basename_end - basename_start)
+          in
+          let dirname =
+            match skip_end_slashes s ~from:basename_start with
+            | `All_slashes -> "/"
+            | `Ends_at dirname_end -> String.sub ~pos:0 ~len:dirname_end s
+          in
+          dirname, basename
+    let parts filename =
+      let rec loop acc filename =
+        match split filename with
+        | "." as base, "." -> base :: acc
+        | "/" as base, "/" -> base :: acc
+        | rest, dir ->
+          loop (dir :: acc) rest
+      in
+      loop [] filename
+  end
 end
+open Internal_pervasives
+open Printf
+
+module IO : IO = struct
+
+  let wrap_deferred_io f =
+    wrap_deferred ~on_exn:(fun e -> `io_exn e) (fun () -> f ())
+
+  (******************************************************************************)
+  (* Channels *)
+
+  let with_out_channel ?buffer_size ~f out =
+    begin
+      let open_file ~flags file =
+        wrap_deferred_io (fun () ->
+            Lwt_io.open_file ~mode:Lwt_io.output ?buffer_size ~flags file)
+      in
+      begin match out with
+      | `stdout -> return Lwt_io.stdout
+      | `stderr -> return Lwt_io.stderr
+      | `channel c -> return c
+      | `append_to_file file ->
+        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_APPEND ]) file
+      | `overwrite_file file ->
+        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_TRUNC ]) file
+      | `create_file file ->
+        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_EXCL ]) file
+      end
+      >>= fun outchan ->
+      begin
+        f outchan
+        >>< begin function
+        | `Ok o ->
+          wrap_deferred_io (fun () -> Lwt_io.close outchan)
+          >>= fun () ->
+          return o
+        | `Error e ->
+          begin match out with
+          | `append_to_file _
+          | `overwrite_file _
+          | `create_file _ ->
+            wrap_deferred_io (fun () -> Lwt_io.close outchan)
+            >>= fun _ ->
+            fail e
+          | _ -> fail e
+          end
+        end
+      end
+    end
+    >>< begin function
+    | `Ok o -> return o
+    | `Error (`io_exn (Unix.Unix_error (Unix.ENOENT, _, path))) ->
+      fail (`wrong_path path)
+    | `Error (`io_exn (Unix.Unix_error (Unix.EEXIST, _, path))) ->
+      fail (`file_exists path)
+    | `Error  e -> fail e
+    end
+
+  let write out s =
+    wrap_deferred_io (fun () -> Lwt_io.fprint out s)
+
+  let flush out = wrap_deferred_io (fun () -> Lwt_io.flush out)
+
+
+  let with_in_channel inspec ?buffer_size ~f =
+    begin match inspec with
+    | `stdin -> return Lwt_io.stdin
+    | `channel c -> return c
+    | `file file ->
+      wrap_deferred_io (fun () ->
+          Lwt_io.open_file ~mode:Lwt_io.input ?buffer_size file)
+    end
+    >>= fun inchan ->
+    begin
+      f inchan
+      >>< begin function
+      | `Ok o ->
+        wrap_deferred_io (fun () -> Lwt_io.close inchan)
+        >>= fun () ->
+        return o
+      | `Error e ->
+        begin match inspec with
+        | `file _ ->
+          wrap_deferred_io (fun () -> Lwt_io.close inchan)
+          >>= fun _ ->
+          fail e
+        | _ -> fail e
+        end
+      end
+    end
 
+  let read ?count i =
+    wrap_deferred_io (fun () -> Lwt_io.read ?count i)
+
+  (******************************************************************************)
+  (* Whole Files *)
+
+  let  write_file file ~content =
+    catch_deferred Lwt_io.(fun () ->
+        with_file ~mode:output file (fun i -> write i content))
+    >>< function
+    | `Ok o -> return o
+    | `Error e -> fail (`write_file_error (file, e))
+
+  let read_file file =
+    catch_deferred Lwt_io.(fun () ->
+        with_file ~mode:input file (fun i -> read i))
+     >>< function
+    | `Ok o -> return o
+    | `Error e -> fail (`read_file_error (file, e))
+
+end
 
 module System : SYSTEM = struct
 
 
 
 end
+
+module Deferred_list = struct
+
+  (** Returns the list of results if all succeed, or the first error. *)
+  let while_sequential:
+    'a list -> f:('a -> ('c, 'b) t) -> ('c list, 'b) t
+    = fun (type b) (l: 'a list) ~(f: 'a -> ('c, b) t) ->
+      let module Map_sequential = struct
+        exception Local_exception of b
+        let ms l f =
+          wrap_deferred
+            (fun () ->
+               Lwt_list.map_s (fun o ->
+                   Lwt.bind (f o) (function
+                     | `Ok oo -> Lwt.return oo
+                     | `Error ee -> Lwt.fail (Local_exception ee))) l)
+            ~on_exn:(function
+              | Local_exception e -> e
+              | e ->
+                ksprintf failwith "Expecting only Local_exception, but got: %s"
+                  (Printexc.to_string e) ())
+      end in
+      Map_sequential.ms l f
+
+  let for_sequential:
+    'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
+    = fun l ~f ->
+      let oks = ref [] in
+      let errors = ref [] in
+      List.fold_left l ~init:(return ()) ~f:(fun prevm elt ->
+          prevm >>= fun () ->
+          f elt >>< function
+          | `Ok o -> oks := o :: !oks; return ()
+          | `Error e -> errors := e :: !errors; return ())
+      >>= fun () ->
+      return (List.rev !oks, List.rev !errors)
+
+  let for_concurrent:
+    'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
+    = fun l ~f ->
+      let oks = ref [] in
+      let errors = ref [] in
+      Lwt.(
+        Lwt_list.map_p (fun elt ->
+            f elt >>= function
+            | `Ok o -> oks := o :: !oks; return ()
+            | `Error e -> errors := e :: !errors; return ()) l
+        >>= fun _ ->
+        return (`Ok ())
+      )
+      >>= fun () ->
+      return (List.rev !oks, List.rev !errors)
+
+
+  let for_concurrent_with_index l ~f =
+    let with_indexes = List.mapi l ~f:(fun i a -> (i, a)) in
+    for_concurrent with_indexes ~f:(fun (i, a) -> f i a)
+
+
+  end