ysulsky avatar ysulsky committed a6325ca

auto export

Comments (0)

Files changed (410)

+# THIS FILE IS AUTOMATICALLY GENERATED BY make-hgignore.  DO NOT EDIT.
+# bin/make-hgignore processes this .hgignore.in, along with all the other .hgignore.in
+# files in the hg repo to produce a single .hgignore file at the root.  Please do not add
+# directory-specific files to ignore here.  Instead, add them to the .hgignore.in file in
+# the directory where the file you want to ignore lives.
+
+syntax: regexp
+\.cm(x|xa|a|o|i)$
+\.exe$
+\.bc$
+\.annot$
+\.o$
+\.so(.\d)?$
+\.s$
+\.a$
+\.depend$
+\._.+$
+\.sp.t$
+config.status$
+mycaml$
+\.DS_Store$
+tmtags$
+
+^\.git/.*$
+^\.transdeps/.*$
+^chroot_test_tmp/.*$
+
+^lib/packs(/.+\.pack)?$
+^lib/.+/deps
+^lib/.+/.+\.mli
+^test-results.*$
+
+syntax: glob
+personal_compilation_ignores
+*.cp4c
+*~
+*.auto.mli
+*.aux
+*.digest
+*.haux
+*.htoc
+*.fls
+*.log
+*.omc
+*.toc
+*.orig
+*.spot
+.*.sw[pabcdef]
+.mydeps
+inline_tests.ml
+inline_tests_runner.ml
+TAGS
+order_file_*_common.ml
+*_db_pnl_common.ml
+order_file_*_eval.ml
+
+
+# for new omake rules
+*.deps
+*.ml.d
+*.mli.d
+*.libdeps
+*.objdeps
+*.hg_version.c
+*.build_info.c
+hg_version.out
+
+# specific files
+.last_build_env
+.omakedb
+.omakedb.lock
+spec.hg
+libdeps.sexp
+libdeps.dot
+libdeps.ps
+libdeps.corrected
+ocaml_path
+.ocaml-3.10
+\#*
+.\#*
+
+glob:base/async/INSTALL
+glob:base/async/Makefile
+glob:base/async/_build
+glob:base/async/_oasis
+glob:base/async/_tags
+glob:base/async/configure
+glob:base/async/core/INSTALL
+glob:base/async/core/Makefile
+glob:base/async/core/_build
+glob:base/async/core/_oasis
+glob:base/async/core/_tags
+glob:base/async/core/configure
+glob:base/async/core/lib/META
+glob:base/async/core/lib/async_core.mlpack
+glob:base/async/core/myocamlbuild.ml
+glob:base/async/core/setup.data
+glob:base/async/core/setup.ml
+glob:base/async/extra/INSTALL
+glob:base/async/extra/Makefile
+glob:base/async/extra/_build
+glob:base/async/extra/_oasis
+glob:base/async/extra/_tags
+glob:base/async/extra/configure
+glob:base/async/extra/lib/META
+glob:base/async/extra/lib/async_extra.mlpack
+glob:base/async/extra/myocamlbuild.ml
+glob:base/async/extra/setup.data
+glob:base/async/extra/setup.ml
+glob:base/async/lib/META
+glob:base/async/lib/async.mlpack
+glob:base/async/lib/async.odocl
+glob:base/async/lib/libasync.clib
+glob:base/async/myocamlbuild.ml
+glob:base/async/scheduler/INSTALL
+glob:base/async/scheduler/Makefile
+glob:base/async/scheduler/_build
+glob:base/async/scheduler/_oasis
+glob:base/async/scheduler/_tags
+glob:base/async/scheduler/configure
+glob:base/async/scheduler/lib/META
+glob:base/async/scheduler/lib/async_scheduler.mlpack
+glob:base/async/scheduler/myocamlbuild.ml
+glob:base/async/scheduler/setup.data
+glob:base/async/scheduler/setup.ml
+glob:base/async/setup.data
+glob:base/async/setup.log
+glob:base/async/setup.ml
+glob:base/bin-prot/INSTALL
+glob:base/bin-prot/Makefile
+glob:base/bin-prot/_build
+glob:base/bin-prot/_oasis
+glob:base/bin-prot/_tags
+glob:base/bin-prot/configure
+glob:base/bin-prot/doc/README.pdf
+glob:base/bin-prot/doc/README.txt
+glob:base/bin-prot/lib/META
+glob:base/bin-prot/lib/bin_prot.mlpack
+glob:base/bin-prot/lib/libbin_prot.clib
+glob:base/bin-prot/lib_test/example.pp.ml
+glob:base/bin-prot/lib_test/mac_test
+glob:base/bin-prot/lib_test/test_runner
+glob:base/bin-prot/myocamlbuild.ml
+glob:base/bin-prot/setup.data
+glob:base/bin-prot/setup.log
+glob:base/bin-prot/setup.ml
+glob:base/bin-prot/syntax/pa_bin_prot.mllib
+glob:base/compare/INSTALL
+glob:base/compare/Makefile
+glob:base/compare/_build
+glob:base/compare/_oasis
+glob:base/compare/_tags
+glob:base/compare/configure
+glob:base/compare/lib/META
+glob:base/compare/lib/comparelib.mllib
+glob:base/compare/myocamlbuild.ml
+glob:base/compare/sample_output/generated_test.ml
+glob:base/compare/sample_output/generated_test.mli
+glob:base/compare/setup.data
+glob:base/compare/setup.log
+glob:base/compare/setup.ml
+glob:base/compare/syntax/pa_compare.mllib
+glob:base/compare/syntax/pa_fields_conv.mllib
+glob:base/core/INSTALL
+glob:base/core/Makefile
+glob:base/core/_build
+glob:base/core/_oasis
+glob:base/core/_tags
+glob:base/core/configure
+glob:base/core/extended/INSTALL
+glob:base/core/extended/Makefile
+glob:base/core/extended/_build
+glob:base/core/extended/_oasis
+glob:base/core/extended/_tags
+glob:base/core/extended/configure
+glob:base/core/extended/lib/META
+glob:base/core/extended/lib/core-extended.mllib
+glob:base/core/extended/lib/core-extended.odocl
+glob:base/core/extended/lib/core_extended.mlpack
+glob:base/core/extended/lib/sexp_pos_parser.ml
+glob:base/core/extended/lib/sexp_pos_parser.mli
+glob:base/core/extended/lib/version_defaults.mlh
+glob:base/core/extended/myocamlbuild.ml
+glob:base/core/extended/setup.data
+glob:base/core/extended/setup.log
+glob:base/core/extended/setup.ml
+glob:base/core/lib/META
+glob:base/core/lib/config.h
+glob:base/core/lib/config.mlh
+glob:base/core/lib/core.mlpack
+glob:base/core/lib/core.odocl
+glob:base/core/lib/libcore.clib
+glob:base/core/myocamlbuild.ml
+glob:base/core/setup.data
+glob:base/core/setup.log
+glob:base/core/setup.ml
+glob:base/fieldslib/INSTALL
+glob:base/fieldslib/Makefile
+glob:base/fieldslib/_build
+glob:base/fieldslib/_oasis
+glob:base/fieldslib/_tags
+glob:base/fieldslib/configure
+glob:base/fieldslib/lib/META
+glob:base/fieldslib/lib/fieldslib.mlpack
+glob:base/fieldslib/myocamlbuild.ml
+glob:base/fieldslib/sample/generated_test.ml
+glob:base/fieldslib/sample/generated_test.mli
+glob:base/fieldslib/setup.data
+glob:base/fieldslib/setup.log
+glob:base/fieldslib/setup.ml
+glob:base/fieldslib/syntax/pa_fields_conv.mllib
+glob:base/pa_ounit/INSTALL
+glob:base/pa_ounit/Makefile
+glob:base/pa_ounit/_build
+glob:base/pa_ounit/_oasis
+glob:base/pa_ounit/_tags
+glob:base/pa_ounit/configure
+glob:base/pa_ounit/myocamlbuild.ml
+glob:base/pa_ounit/setup.data
+glob:base/pa_ounit/setup.log
+glob:base/pa_ounit/setup.ml
+glob:base/pa_ounit/syntax/META
+glob:base/pa_ounit/syntax/pa_ounit.mllib
+glob:base/pipebang/INSTALL
+glob:base/pipebang/Makefile
+glob:base/pipebang/_build
+glob:base/pipebang/_oasis
+glob:base/pipebang/_tags
+glob:base/pipebang/configure
+glob:base/pipebang/lib/META
+glob:base/pipebang/lib/pa_pipebang.mllib
+glob:base/pipebang/myocamlbuild.ml
+glob:base/pipebang/sample/generated_test.ml
+glob:base/pipebang/sample/generated_test.mli
+glob:base/pipebang/setup.data
+glob:base/pipebang/setup.ml
+glob:base/sexplib/INSTALL
+glob:base/sexplib/Makefile
+glob:base/sexplib/_build
+glob:base/sexplib/_oasis
+glob:base/sexplib/_tags
+glob:base/sexplib/configure
+glob:base/sexplib/lib/META
+glob:base/sexplib/lib/lexer.ml
+glob:base/sexplib/lib/parser.ml
+glob:base/sexplib/lib/parser.mli
+glob:base/sexplib/lib/sexplib.mlpack
+glob:base/sexplib/lib/sexplib.odocl
+glob:base/sexplib/lib_test/conv_test
+glob:base/sexplib/lib_test/sexp_test
+glob:base/sexplib/myocamlbuild.ml
+glob:base/sexplib/setup.data
+glob:base/sexplib/setup.log
+glob:base/sexplib/setup.ml
+glob:base/sexplib/syntax/pa_sexp_conv.mllib
+glob:base/sexplib/top/sexplib_top.mllib
+glob:base/type-conv/INSTALL
+glob:base/type-conv/Makefile
+glob:base/type-conv/_build
+glob:base/type-conv/_oasis
+glob:base/type-conv/_tags
+glob:base/type-conv/configure
+glob:base/type-conv/lib/META
+glob:base/type-conv/lib/core.mlpack
+glob:base/type-conv/lib/libcore.clib
+glob:base/type-conv/myocamlbuild.ml
+glob:base/type-conv/setup.data
+glob:base/type-conv/setup.log
+glob:base/type-conv/setup.ml
+glob:base/typehash/INSTALL
+glob:base/typehash/Makefile
+glob:base/typehash/_build
+glob:base/typehash/_oasis
+glob:base/typehash/_tags
+glob:base/typehash/configure
+glob:base/typehash/lib/META
+glob:base/typehash/lib/typehashlib.mllib
+glob:base/typehash/myocamlbuild.ml
+glob:base/typehash/setup.data
+glob:base/typehash/setup.ml
+glob:base/typehash/syntax/pa_typehash.mllib
+glob:base/variantslib/INSTALL
+glob:base/variantslib/Makefile
+glob:base/variantslib/_build
+glob:base/variantslib/_oasis
+glob:base/variantslib/_tags
+glob:base/variantslib/configure
+glob:base/variantslib/myocamlbuild.ml
+glob:base/variantslib/sample/generated_test.ml
+glob:base/variantslib/sample/generated_test.mli
+glob:base/variantslib/setup.data
+glob:base/variantslib/setup.ml

base/async/_oasis

-#AUTOGENERATED FILE; EDIT oasis.sh INSTEAD
-
-OASISFormat:  0.2
-OCamlVersion: >= 3.12
-Name:         async
-Version:      107.01
-Synopsis:     Jane Street Capital's asynchronous execution library
-Authors:      Jane street capital
-Copyrights:   (C) 2008-2011 Jane Street Capital LLC
-License:      LGPL-2.1 with OCaml linking exception
-LicenseFile:  LICENSE
-Plugins:      StdFiles (0.2),
-              DevFiles (0.2),
-              META (0.2)
-BuildTools:   ocamlbuild
-Description:  Jane Street Capital's asynchronous execution library
-FindlibVersion: >= 1.2.7
-XStdFilesAUTHORS: false
-XStdFilesINSTALLFilename: INSTALL
-XStdFilesREADME: false
-
-
-Library async
-  Path:               lib
-  FindlibName:        async
-  Pack:               true
-  Modules:Std
-  BuildDepends:       sexplib.syntax,
-                      sexplib,
-                      fieldslib.syntax,
-                      fieldslib,
-                      bin_prot,
-                      bin_prot.syntax,
-                      bigarray,
-                      res,
-                      unix,
-                      threads
-                      core
-
-
-Document "async"
-  Title:                Jane street's async library
-  Type:                 ocamlbuild (0.2)
-  BuildTools+:          ocamldoc
-  XOCamlbuildPath:      lib
-  XOCamlbuildLibraries: async
-

base/async/core/lib/async_stream.ml

+open Core.Std
+open Import
+open Deferred_std
+
+include Basic.Stream
+
+(* This line is needed in to avoid a dependency cycle when building with
+   ocamlbuild, which considers both the '.ml' and the '.mli' as part of
+   a single compilation unit. Without this line, async_stream.ml would depend
+   on Monitor (while monitor.mli depends on Async_stream). *)
+module Monitor = Raw_monitor
+
+let next d = d
+
+let first_exn t =
+  next t >>| function
+    | Nil -> failwith "Stream.first of empty stream"
+    | Cons (x, _) -> x
+;;
+
+let fold' t ~init ~f =
+  Deferred.create
+    (fun result ->
+      let rec loop t b =
+        upon (next t) (function
+          | Nil -> Ivar.fill result b
+          | Cons (v, t) -> upon (f b v) (loop t))
+      in
+      loop t init)
+;;
+
+(* [fold] is implemented to avoid per-stream-element deferred overhead in the case when
+   multiple stream elements are available simultaneously. *)
+let fold t ~init ~f =
+  Deferred.create
+    (fun result ->
+      let rec loop t b =
+        match Deferred.peek t with
+        | None -> upon t (fun next -> loop_next next b)
+        | Some next -> loop_next next b
+      and loop_next next b =
+        match next with
+        | Nil -> Ivar.fill result b
+        | Cons (v, t) -> loop t (f b v)
+      in
+      loop t init)
+;;
+
+let length t = fold t ~init:0 ~f:(fun n _ -> n + 1)
+
+let iter' t ~f = fold' t ~init:() ~f:(fun () v -> f v)
+
+let closed t = iter' t ~f:(fun _ -> Deferred.unit)
+
+let iter t ~f = whenever (iter' t ~f:(fun a -> f a; Deferred.unit))
+
+let create f =
+  let tail = Tail.create () in
+  (* collect before calling [f], in case [f] immediately extends. *)
+  let t = Tail.collect tail in
+  f tail;
+  t
+;;
+
+let unfold b ~f =
+  create (fun tail ->
+    let rec loop b =
+      upon (f b) (function
+        | None -> Tail.close_exn tail
+        | Some (a, b) -> Tail.extend tail a; loop b)
+    in
+    loop b)
+;;
+
+let of_list l =
+  create (fun tail ->
+    List.iter l ~f:(fun x -> Tail.extend tail x);
+    Tail.close_exn tail)
+;;
+
+let to_list s = fold' s ~init:[] ~f:(fun b a -> return (a :: b)) >>| List.rev
+
+let copy_to_tail t tail = iter' t ~f:(fun a -> return (Tail.extend tail a))
+
+let append t1 t2 =
+  create (fun tail ->
+    upon (copy_to_tail t1 tail) (fun () ->
+      upon (copy_to_tail t2 tail) (fun () ->
+        Tail.close_exn tail)))
+;;
+
+let concat t =
+  create (fun tail ->
+    upon (iter' t ~f:(fun t -> copy_to_tail t tail))
+      (fun () -> Tail.close_exn tail))
+;;
+
+let filter' t ~f =
+  create (fun tail ->
+    upon (iter' t ~f:(fun v ->
+      f v >>| (function false -> () | true -> Tail.extend tail v)))
+      (fun () -> Tail.close_exn tail))
+;;
+
+let filter t ~f = filter' t ~f:(fun a -> return (f a))
+
+let filter_map' t ~f =
+  create (fun tail ->
+    upon (iter' t ~f:(fun v ->
+      f v >>| (function None -> () | Some v -> Tail.extend tail v)))
+      (fun () -> Tail.close_exn tail))
+;;
+
+let filter_map t ~f = filter_map' t ~f:(fun a -> return (f a))
+
+let map' t ~f =
+  create (fun tail ->
+    upon (iter' t ~f:(fun v -> f v >>| Tail.extend tail))
+      (fun () -> Tail.close_exn tail))
+;;
+
+let map t ~f = map' t ~f:(fun a -> return (f a))
+
+let first_n s n =
+  create (fun tail ->
+    let rec loop s n =
+      if n = 0 then
+        Tail.close_exn tail
+      else
+        upon (next s) (function
+          | Nil -> Tail.close_exn tail
+          | Cons (x, s) -> Tail.extend tail x; loop s (n - 1))
+    in
+    loop s n)
+;;
+
+let available_now t =
+  let rec loop t ac =
+    match Deferred.peek (next t) with
+    | None | Some Nil -> (List.rev ac, t)
+    | Some (Cons (x, t)) -> loop t (x :: ac)
+  in
+  loop t []
+;;
+
+let split ?(stop = Deferred.never ()) ?(f = (fun _ -> `Continue)) t =
+  let reason_for_stopping = Ivar.create () in
+  let prefix = Tail.create () in
+  let finish v = Tail.close_exn prefix; Ivar.fill reason_for_stopping v in
+  let rec loop t =
+    choose [ choice stop (fun () -> `Stopped);
+             choice (next t) (fun o -> `Next o);
+           ]
+    >>> function
+      | `Stopped -> finish (`Stopped t)
+      | `Next o ->
+        match o with
+        | Nil -> finish `End_of_stream
+        | Cons (a, t) ->
+          match f a with
+          | `Continue -> Tail.extend prefix a; loop t
+          | `Found b -> finish (`Found (b, t))
+  in
+  loop t;
+  (Tail.collect prefix, Ivar.read reason_for_stopping)
+;;
+
+let find t ~f =
+  let (_, found) = split t ~f:(fun a -> if f a then `Found a else `Continue) in
+  found >>| (function
+    | `Stopped _ -> assert false
+    | `End_of_stream | `Found _ as x -> x)
+;;
+
+let ungroup t =
+  create (fun tail ->
+    upon (iter' t ~f:(fun l ->
+      List.iter l ~f:(fun x -> Tail.extend tail x); Deferred.unit))
+      (fun () -> Tail.close_exn tail))
+;;
+
+let interleave ts =
+  create (fun tail ->
+    (* The interleaved stream should be closed when the outer stream and all of
+       the inner streams have been closed.  Keep a count of the number of open
+       streams and close the interleaved stream when that count becomes
+       zero. *)
+    let num_open = ref 1 in (* 1 for the outer stream that is open *)
+    let close () =
+      num_open := !num_open - 1;
+      if !num_open = 0 then Tail.close_exn tail;
+    in
+    let outer_closed =
+      iter' ts ~f:(fun t ->
+        num_open := !num_open + 1;
+        upon (copy_to_tail t tail) close;
+        Deferred.unit)
+    in
+    upon outer_closed close)
+;;
+
+let take_until t d =
+  create (fun tail ->
+    let rec loop t =
+      upon (choose [choice d (fun () -> `Stop);
+                    choice (next t) (fun z -> `Next z)])
+        (function
+          | `Stop | `Next Nil -> Tail.close_exn tail
+          | `Next (Cons (x, t)) -> Tail.extend tail x; loop t)
+    in
+    loop t)
+;;
+
+let iter_durably' t ~f =
+  Deferred.create (fun result ->
+    let rec loop t =
+      next t
+      >>> function
+        | Nil -> Ivar.fill result ()
+        | Cons (x, t) ->
+          Monitor.try_with_raise_rest (fun () -> f x)
+          >>> fun z ->
+          loop t;
+          match z with
+          | Ok () -> ()
+          | Error e -> Monitor.send_exn (Monitor.current ()) e
+    in
+    loop t)
+;;
+
+let iter_durably_report_end t ~f =
+  Deferred.create (fun result ->
+    let rec loop t =
+      next t
+      >>> function
+        | Nil -> Ivar.fill result ()
+        | Cons (x, t) -> loop t; f x
+    in
+    loop t)
+;;
+
+let iter_durably t ~f = Deferred.whenever (iter_durably_report_end t ~f)
+
+let of_fun f = unfold () ~f:(fun () -> f () >>| fun a -> Some (a, ()))

base/async/core/lib/async_stream.mli

+(** A stream is an immutable sequence of values, with a possibly incomplete tail that may
+    be extended asynchronously.
+
+    The basic primitive operation for getting the next element out of stream is
+    Stream.next, which (asynchronously) returns the element and the rest of the stream. *)
+open Core.Std
+open Basic
+
+type 'a t = 'a Basic.Stream.t with sexp_of
+(** [sexp_of_t t f] returns a sexp of all of the elements currently available in the
+    stream. It is just for display purposes.  There is no [t_of_sexp]. *)
+
+(** [create f] returns a stream [t] and calls [f tail], where the elements of the stream
+    are determined as the tail is extended, and the end of the stream is reached when the
+    tail is closed. *)
+val create : ('a Tail.t -> unit) -> 'a t
+
+(** [next t] returns a deferred that will become determined when the next part of the
+    stream is determined.  This is [Const (v, t')], where v is the next element of the
+    stream and t' is the rest of the stream, or with Nil at the end of the stream. *)
+type 'a next = Nil | Cons of 'a * 'a t
+val next : 'a t -> 'a next Deferred.t
+
+(** [first_exn t] returns a deferred that becomes determined with the first element of
+    [t]. *)
+val first_exn : 'a t -> 'a Deferred.t
+
+(** Streams can be converted to and from lists.  Although, conversion to a list returns a
+    deferred, because the stream is determined asynchronously. *)
+
+(** [of_list l] returns a stream with the elements of list l. *)
+val of_list : 'a list -> 'a t
+
+(** [to_list t] returns a deferred that will become determined with the list
+   of elements in t, if the end of t is reached. *)
+val to_list : 'a t -> 'a list Deferred.t
+
+(** [of_fun f] returns a stream whose elements are determined by calling [f] forever. *)
+val of_fun : (unit -> 'a Deferred.t) -> 'a t
+
+(** [copy_to_tail t tail] reads elements from [t] and puts them in [tail], until
+    the end of [t] is reached.
+*)
+val copy_to_tail : 'a t -> 'a Tail.t -> unit Deferred.t
+
+(** Sequence operations
+    ----------------------------------------------------------------------
+    There are the usual sequence operations:
+
+      append, fold, iter, map, filter_map, take
+
+    There are also deferred variants:
+
+      iter', map', filter_map'
+
+    These take anonymous functions that return deferreds generalizing the usual sequence
+    operation and allowing the client to control the rate at which the sequence is
+    processed.
+*)
+
+(** [append t1 t2] returns a stream with all the values of t1, in order, and if t1 ends,
+    these values are followed by all the values of t2. *)
+val append : 'a t -> 'a t -> 'a t
+
+(** [concat t] takes a stream of streams and produces a stream that is the concatenation
+    of each stream in order (you see all of stream 1, then all of stream 2... etc.) *)
+val concat : 'a t t -> 'a t
+
+(** [available_now t] returns t prefix of t that is available now, along with the rest of
+    the stream. *)
+val available_now : 'a t -> 'a list * 'a t
+
+(** [filter' s ~f] returns a stream with one element, v, for each v in s such with f v =
+    true. *)
+val filter' : 'a t -> f:('a -> bool Deferred.t) -> 'a t
+
+(** [filter s ~f] returns a stream with one element, v, for each v in s such with f v =
+    true. *)
+val filter : 'a t -> f:('a -> bool) -> 'a t
+
+(** [filter_map' s ~f] returns a stream with one element, v', for each v in s such with f
+    v = Some v'. *)
+val filter_map' : 'a t -> f:('a -> 'b option Deferred.t) -> 'b t
+
+(** [filter_map s ~f] returns a stream with one element, v', for each v in s such with f v
+    = Some v'. *)
+val filter_map : 'a t -> f:('a -> 'b option) -> 'b t
+
+(** [fold' t ~init ~f] is like list fold, walking over the elements of the stream in
+    order, as they become available.  [fold'] returns a deferred that will yield the final
+    value of the accumulator, if the end of the stream is reached. *)
+val fold' : 'a t -> init:'b -> f:('b -> 'a -> 'b Deferred.t) -> 'b Deferred.t
+
+(** [fold t ~init ~f] is a variant of [fold'] in which [f] does not return a deferred.
+*)
+val fold : 'a t -> init:'b -> f:('b -> 'a -> 'b) -> 'b Deferred.t
+
+(** [iter' t ~f] applies [f] to each element of the stream in turn, as they become
+    available.  It continues onto the next element only after the deferred returned by [f]
+    becomes determined. *)
+val iter' : 'a t -> f:('a -> unit Deferred.t) -> unit Deferred.t
+
+(** [closed t] returns a deferred that becomes determined when the end of [t] is
+    reached.  *)
+val closed : 'a t -> unit Deferred.t
+
+(** [iter t ~f] = [whenever (iter' t ~f:(fun a -> f a; Deferred.unit))] *)
+val iter : 'a t -> f:('a -> unit) -> unit
+
+(** [take_until t d] returns a stream [t'] that has the same elements as [t] up until [d]
+    becomes determined. *)
+val take_until : 'a t -> unit Deferred.t -> 'a t
+
+(** [iter_durably' t ~f] is like [iter' t ~f], except if [f] raises an exception it
+    continues with the next element of the stream *and* reraises the exception (to the
+    monitor in scope when iter_durably was called). *)
+val iter_durably' : 'a t -> f:('a -> unit Deferred.t) -> unit Deferred.t
+
+(** [iter_durably t ~f] is like [iter t ~f], except if [f] raises an exception it
+    continues with the next element of the stream *and* reraises the exception (to the
+    monitor in scope when iter_durably was called). *)
+val iter_durably : 'a t -> f:('a -> unit) -> unit
+
+(** [iter_durably_report_end t ~f] is equivalent to [iter_durably' t ~f:(fun x -> return
+    (f x))] but it is more efficient *)
+val iter_durably_report_end : 'a t -> f:('a -> unit) -> unit Deferred.t
+
+(** [length s] returns a deferred that is determined when the end of s is reached, taking
+    the value of the number of elements in s *)
+val length : 'a t -> int Deferred.t
+
+(** [map' t f] creates a new stream that with one element, (f v), for each element v of
+    t. *)
+val map' : 'a t -> f:('a -> 'b Deferred.t) -> 'b t
+
+(** [map t ~f] creates a new stream that with one element, (f v), for each element v of t.
+    [map t f] = [map' t ~f:(fun a -> return (f a))]. *)
+val map : 'a t -> f:('a -> 'b) -> 'b t
+
+(** [first_n t n] returns a stream with the first n elements of t, if t has n or more
+    elements, or it returns t. *)
+val first_n : 'a t -> int -> 'a t
+
+
+(** Stream generation
+    ----------------------------------------------------------------------
+*)
+
+
+(** [unfold b f] returns a stream [a1; a2; ...; an] whose elements are
+    determined by the equations:
+      b0 = b
+      Some (a1, b1) = f b0
+      Some (a2, b2) = f b1
+      ...
+      None = f bn
+*)
+val unfold : 'b -> f:('b -> ('a * 'b) option Deferred.t) -> 'a t
+
+
+(** Miscellaneous operations
+    ----------------------------------------------------------------------
+*)
+
+(** [split ~stop ~f t] returns a pair (p, d), where p is a prefix of t that ends
+   for one of three reasons:
+   1. t ends
+   2. stop becomes determined
+   3. f returns `Found
+   The deferred d describes why the prefix ended, and returns the suffix of the
+   stream in case (2) or (3).
+*)
+val split :
+  ?stop:unit Deferred.t
+  -> ?f:('a -> [ `Continue | `Found of 'b ])
+  -> 'a t
+  -> 'a t * [ `End_of_stream
+            | `Stopped of 'a t
+            | `Found of 'b * 'a t ] Deferred.t
+
+(** [find ~f t] returns a deferred that becomes determined when [f x] is true for some
+    element of [t], or if the end of the stream is reached *)
+val find :
+  'a t -> f:('a -> bool)
+  -> [ `End_of_stream
+     | `Found of 'a * 'a t ] Deferred.t
+
+(** [ungroup t] takes a stream of lists and unpacks the items from each list into a single
+    stream *)
+val ungroup : 'a list t -> 'a t
+
+(** [interleave list] takes a stream of streams and returns a stream of their items
+    interleaved as they become determined. The interleaved stream will be closed when the
+    outer stream and all of the inner streams have been closed. *)
+val interleave : 'a t t -> 'a t

base/async/core/lib/basic.ml

+open Core.Std
+open Import
+
+let debug = Debug.debug
+
+module Priority = Jobs.Priority
+
+type +'a deferred
+
+type 'a handler =
+  { execution_context : execution_context;
+    run : 'a -> unit;
+  }
+and execution_context =
+  { block_group : Block_group.t;
+    monitor : monitor;
+    priority : Priority.t;
+  }
+and 'a ivar = 'a ivar_cell ref
+and 'a ivar_cell =
+| Empty
+| Empty_one_handler of ('a -> unit) * execution_context
+| Empty_many_handlers of 'a handler Bag.t
+| Indir of 'a ivar
+| Full of 'a
+and monitor =
+  { name_opt : string option;
+    id : int;
+    parent : monitor option;
+    errors : exn tail;
+    mutable has_seen_error : bool;
+    mutable someone_is_listening : bool;
+  }
+and 'a tail =
+  { (* [next] points at the tail of the stream *)
+    mutable next: 'a next ivar;
+  }
+and 'a stream = 'a next deferred
+and 'a next = Nil | Cons of 'a * 'a stream
+;;
+
+module Ivar0 = struct
+  type 'a t = 'a ivar
+
+  let create () = ref Empty
+  let create_full a = ref (Full a)
+
+  let equal (t : _ t) t' = phys_equal t t'
+
+  let rec sexp_of_t sexp_of_a i =
+    match !i with
+    | Full v -> Sexp.List [Sexp.Atom "Full"; sexp_of_a v]
+    | Empty
+    | Empty_one_handler _
+    | Empty_many_handlers _ -> Sexp.Atom "Empty"
+    | Indir v -> sexp_of_t sexp_of_a v
+  ;;
+
+  (* Returns a non-indirected ivar, and path compresses any indirections.  May overflow
+     stack if you pass it an ivar chain that is too long. *)
+  let rec squash t =
+    match !t with
+    | Indir r ->
+      let r' = squash r in
+      (* Don't bother mutating if it didn't change. *)
+      if not (phys_equal r r') then t := Indir r';
+      r'
+    | _ -> t
+  ;;
+
+  let peek t =
+    match !(squash t) with
+    | Indir _ -> assert false (* fulfilled by squash *)
+    | Full a -> Some a
+    | Empty
+    | Empty_one_handler _
+    | Empty_many_handlers _ -> None
+  ;;
+
+  let is_empty t =
+    match !(squash t) with
+    | Indir _ -> assert false (* fulfilled by squash *)
+    | Full _ -> false
+    | Empty
+    | Empty_one_handler _
+    | Empty_many_handlers _ -> true
+  ;;
+
+  let is_full t = not (is_empty t)
+
+end
+
+module Tail = struct
+  type 'a t = 'a tail = { mutable next: 'a next ivar }
+
+  let create () = { next = Ivar0.create () }
+end
+
+module Monitor = struct
+  type t = monitor =
+    { name_opt : string option;
+      id : int;
+      parent : monitor option;
+      errors : exn tail;
+      mutable has_seen_error : bool;
+      mutable someone_is_listening : bool;
+    }
+  with fields
+
+  let bogus =
+    { name_opt = None;
+      id = -1;
+      parent = None;
+      errors = Tail.create ();
+      has_seen_error = false;
+      someone_is_listening = false;
+    }
+  ;;
+
+  module Pretty = struct
+    type one =
+      { name : string option;
+        id : int;
+        has_seen_error : bool;
+        someone_is_listening : bool
+      }
+    with sexp_of
+
+    type t = one list
+    with sexp_of
+  end
+
+  let rec to_pretty t =
+    let rec loop t ac =
+      let ac =
+        { Pretty.
+          name = t.name_opt;
+          id = t.id;
+          has_seen_error = t.has_seen_error;
+          someone_is_listening = t.someone_is_listening;
+        }
+        :: ac
+      in
+      match t.parent with
+      | None -> List.rev ac
+      | Some t -> loop t ac
+    in
+    loop t []
+  ;;
+
+  let sexp_of_t t = Pretty.sexp_of_t (to_pretty t)
+
+end
+
+module Unregister = struct
+  type t = unit -> unit
+
+  let noop () = ()
+  let create f = f
+  let unregister f = f ()
+end
+
+module Clock_event = struct
+  (* Clock events start in the [Uninitialized] state just for their creation (because of
+     cyclic types), and then are immediately marked as [Waiting] or [Happened], depending
+     on the time.  A [Waiting] event will then either [Happen] at the appropriate time,
+     or be [Aborted] prior to when it would have [Happened].
+
+     Uninitialized
+     |           |
+     v           v
+     Waiting --> Happened
+     |
+     v
+     Aborted *)
+  type t =
+    { mutable state : state;
+    }
+  and state =
+  | Uninitialized
+  | Aborted
+  | Happened
+  | Waiting of waiting
+  and waiting =
+    { event : t Events.Event.t;
+      ready : [ `Happened | `Aborted ] Ivar0.t;
+    }
+  with sexp_of
+end
+
+module Scheduler = struct
+  type t =
+    { jobs : Jobs.t sexp_opaque;
+      mutable current_execution_context : execution_context sexp_opaque;
+      mutable main_execution_context : execution_context sexp_opaque;
+      mutable max_num_jobs_per_priority_per_cycle : int;
+      mutable uncaught_exception : exn option;
+      mutable cycle_count : int;
+      mutable cycle_start : Time.t;
+      mutable jobs_left : bool;
+      cycle_times : Time.Span.t Tail.t;
+      events : Clock_event.t Events.t;
+    }
+
+  let t =
+    let bogus_execution_context =
+      { block_group = Block_group.bogus;
+        monitor = Monitor.bogus;
+        priority = Priority.normal;
+      }
+    in
+    let now = Time.epoch in
+    { jobs = Jobs.create ();
+      current_execution_context = bogus_execution_context;
+      main_execution_context = bogus_execution_context;
+      max_num_jobs_per_priority_per_cycle = 500;
+      uncaught_exception = None;
+      cycle_start = now;
+      cycle_count = 0;
+      cycle_times = Tail.create ();
+      events = Events.create ~now;
+      jobs_left = false;
+    }
+  ;;
+
+  let invariant () =
+    Events.invariant t.events;
+    try
+      Events.iter t.events ~f:(fun events_event ->
+        let event = Events.Event.value events_event in
+        let module E = Clock_event in
+        match event.E.state with
+        | E.Waiting { E. event = events_event'; ready } ->
+          assert (phys_equal events_event events_event');
+          assert (Ivar0.is_empty ready);
+        | _ -> assert false);
+    with exn ->
+      fail "events problem" (exn, t.events) <:sexp_of< exn * Clock_event.t Events.t >>;
+  ;;
+
+  let current_execution_context () = t.current_execution_context
+
+  let set_execution_context execution_context =
+    (* Avoid a caml_modify in most cases. *)
+    if not (phys_equal execution_context t.current_execution_context) then
+      t.current_execution_context <- execution_context;
+  ;;
+
+  let add_job execution_context work =
+    let job = Job.create (fun () ->
+      set_execution_context execution_context;
+      work ())
+    in
+    let priority = execution_context.priority in
+    if debug then
+      Debug.print "enqueing job %d with priority %s" (Job.id job)
+        (Priority.to_string priority);
+    Jobs.add t.jobs priority job;
+  ;;
+
+end
+
+module Handler = struct
+  type 'a t = 'a handler =
+    { execution_context : execution_context;
+      run : 'a -> unit;
+    }
+
+  let create run =
+    { execution_context = Scheduler.current_execution_context ();
+      run;
+    }
+  ;;
+
+  let schedule t v = Scheduler.add_job t.execution_context (fun () -> t.run v)
+
+  let priority t = t.execution_context.priority
+
+  let filter t ~f = { t with run = fun a -> if f a then t.run a }
+
+  let prepend t ~f = { t with run = fun a -> t.run (f a) }
+end
+
+module Ivar = struct
+  include Ivar0
+
+  let read (type a) (t : a t) = (Obj.magic t : a deferred)
+
+  let fill t v =
+    let repr = squash t in
+    match !repr with
+    | Indir _ -> assert false (* fulfilled by squash *)
+    | Full _ -> fail "Ivar.fill of full ivar" t <:sexp_of< a t >>
+    | Empty -> repr := Full v
+    | Empty_one_handler (run, context) ->
+      repr := Full v;
+      Scheduler.add_job context (fun () -> run v)
+    | Empty_many_handlers handlers ->
+      repr := Full v;
+      Bag.iter handlers ~f:(fun h -> Handler.schedule h v);
+  ;;
+
+  let fill_if_empty t v = if not (is_full t) then fill t v
+
+  include Bin_prot.Utils.Make_binable1 (struct
+    module Binable = struct
+      type 'a t = 'a option with bin_io
+    end
+
+    type 'a t = 'a ivar
+
+    let to_binable t = peek t
+
+    let of_binable = function
+      | None -> create ()
+      | Some a -> create_full a
+    ;;
+  end)
+
+end
+
+module Deferred = struct
+  type 'a t = 'a deferred
+
+  let repr (type a) (deferred : a deferred) = Ivar.squash (Obj.magic deferred : a ivar)
+
+  let peek t = Ivar.peek (repr t)
+
+  let sexp_of_t f t = Ivar.sexp_of_t f (repr t)
+
+  let is_determined t = is_some (peek t)
+
+  let debug_space_leaks = ref None
+
+  let debug_bag_check bag =
+    match !debug_space_leaks with
+    | None -> ()
+    | Some bound -> assert (Bag.length bag < bound)
+  ;;
+
+  let install_removable_handler =
+    (* Don't pass 'cell', because it may have indirected in the meantime. *)
+    let add_to_bag t bag handler =
+      let elt = Bag.add bag handler in
+      debug_bag_check bag;
+      (* Invariant: Empty_many_handlers is never demoted to anything but Full. *)
+      Unregister.create (fun () ->
+        let cell = repr t in
+        match !cell with
+        | Empty_many_handlers bag' -> Bag.remove bag' elt
+        | Full _ -> ()
+        | _ -> assert false
+      );
+    in
+    fun t handler ->
+      let cell = repr t in
+      match !cell with
+      | Empty ->
+        let bag = Bag.create () in
+        cell := Empty_many_handlers bag;
+        add_to_bag t bag handler;
+      | Empty_one_handler (run, execution_context) ->
+        let bag = Bag.create () in
+        cell := Empty_many_handlers bag;
+        ignore (Bag.add bag { execution_context; run });
+        add_to_bag t bag handler;
+      | Empty_many_handlers bag -> add_to_bag t bag handler
+      | Full v ->
+        let still_installed = ref true in
+        let handler = Handler.filter handler ~f:(fun _ -> !still_installed) in
+        Handler.schedule handler v;
+        Unregister.create (fun () -> still_installed := false);
+      | Indir _ -> assert false (* fulfilled by repr *)
+  ;;
+
+  let upon' t run = install_removable_handler t (Handler.create run)
+
+  (* [upon] is conceptually the same as
+
+     let upon t f = ignore (upon' t run)
+
+     However, below is a more efficient implementation, which is worth doing because
+     [upon] is very widely used and is so much more common than [upon'].  The below
+     implementation avoids the use of the bag of handlers in the extremely common case of
+     one handler for the deferred. *)
+  let upon =
+    fun t run ->
+      let current_execution_context = Scheduler.current_execution_context () in
+      let cell = repr t in
+      match !cell with
+      | Indir _ -> assert false (* fulfilled by repr *)
+      | Full v -> Scheduler.add_job current_execution_context (fun () -> run v)
+      | Empty -> cell := Empty_one_handler (run, current_execution_context)
+      | Empty_one_handler (run', execution_context') ->
+        let bag = Bag.create () in
+        ignore (Bag.add bag { run;
+                              execution_context = current_execution_context;
+                            });
+        ignore (Bag.add bag { run = run';
+                              execution_context = execution_context';
+                            });
+        cell := Empty_many_handlers bag;
+      | Empty_many_handlers bag ->
+        ignore (Bag.add bag { run;
+                              execution_context = current_execution_context;
+                            });
+   ;;
+
+  (* [connect] takes an Ivar [ivar] and a Deferred [tdef] and, subject to some side
+     conditions, makes [tdef] point to the destination of [ivar] as an [Indirection]. *)
+  let connect ivar tdef =
+    (* ivar and tdef can be chains upon entry to connect, since tdef is an arbitrary
+       user-supplied deferred from the right-hand side of connect, and ivar is returned to
+       the user prior to being used in the callback, and may be converted to an
+       indirection in the case of right-nested binds. *)
+    let t = repr tdef in
+    let i = Ivar.squash ivar in
+    (* i and ivar are the same internally, but have different types *)
+    if not (phys_equal t i) then begin
+      let create_two_bag handler1 handler2 =
+        let bag = Bag.create () in
+        ignore (Bag.add bag handler1);
+        ignore (Bag.add bag handler2);
+        debug_bag_check bag;
+        i := Empty_many_handlers bag;
+        t := Indir ivar in
+      (* Strange order :-/ *)
+      begin match (!t, !i) with
+      | (Empty,   _) -> t := Indir ivar
+      | (Full v,  _) -> Ivar.fill ivar v
+      | (Indir _, _) -> assert false (* fulfilled by repr *)
+      | (tc, Empty) -> (* do a swap *) i := tc; t := Indir ivar
+      (* [connect] is only used in bind, whose ivar is only ever exported as a read-only
+         deferred.  Thus, it is impossible for the client to fill the [ivar]. *)
+      | (_, Full _) -> assert false
+      | (_, Indir _) -> assert false (* fulfilled by Ivar.squash *)
+      (* Tricky cases now. Assume invariant that bags are never shared. *)
+      | (Empty_one_handler (run1, ec1), Empty_one_handler (run2, ec2)) ->
+        let handler1 = { run = run1; execution_context = ec1} in
+        let handler2 = { run = run2; execution_context = ec2} in
+        create_two_bag handler1 handler2
+      | (Empty_one_handler (run, execution_context), Empty_many_handlers bag) ->
+        let handler = { run; execution_context } in
+        ignore (Bag.add bag handler);
+        debug_bag_check bag;
+        (* no need to rewrite i, as it's a bag *)
+        t := Indir ivar
+      | (Empty_many_handlers bag as tc, Empty_one_handler (run, execution_context)) ->
+        let handler = { run; execution_context } in
+        ignore (Bag.add bag handler);
+        debug_bag_check bag;
+        i := tc; (* do a swap *)
+        t := Indir ivar
+      | (Empty_many_handlers bag1, Empty_many_handlers bag2) ->
+        Bag.transfer ~src:bag1 ~dst:bag2;
+        debug_bag_check bag2;
+        t := Indir ivar
+      end;
+      (* Squash original tdef reference; otherwise, it may grow by two indirections rather
+         than one.  This is the key step that eliminates any references to [ivar] if it
+         was an indirection. *)
+      ignore (repr tdef)
+    end;
+  ;;
+
+  let create f =
+    let result = Ivar.create () in
+    f result;
+    Ivar.read result;
+  ;;
+
+  let bind t f = create (fun i -> upon t (fun a -> connect i (f a)))
+
+  let return a = Ivar.read (Ivar.create_full a)
+
+(* PROOF OF BIND SPACE PERFORMANCE PROPERTIES
+
+   Theorem: A bind-allocated Ivar allocated by [m >>= f] is garbage
+   collected after [m] becomes available and all its callbacks are
+   executed, if this bind was executed in a context where it was in
+   tail-call position within the right-hand side of an outer bind.
+
+   We divide this proof into two lemmas, the first of which establishes
+   a tight condition for garbage collection of the bind-allocated Ivar
+   given no outside references, and the second of which establishes a
+   (mostly syntactic) program property that is sufficient (though
+   not necessary) to fulfill these conditions.
+
+   -------------------------------------------------------------------------------
+
+   Definition: An Ivar [i] is an "root" if [!i] does not pattern match with
+   [Indir _].  Otherwise, the Ivar is an "indirection".
+
+   Definition: An IVar [i] is "full" if [!i] pattern matches with [Full _].
+   Full Ivars are also roots.
+
+   Definition: A "bind-allocated Ivar" is an Ivar allocated by the call
+   to [create] in the function body of [bind].
+
+   -------------------------------------------------------------------------------
+
+   Lemma: Indirection chains are acyclic, that is, for all Ivars [i], and given
+   [f i = match !(Ivar.squash_raw i) with Indir i' = i' | _ = i], there
+   exists no natural number n such that f applied n times to i
+   (e.g. [f (f (f ... (f i)))]), is physically equal to i.
+
+   Proof: Induction on runtime invocations [Indir] allocating functions
+   [Ivar.squash] and [connect].  We show that, if there all indirections are
+   acyclic, after executing these functions, all indirections are acyclic.
+
+   Case [Ivar.squash]: We have by simple induction that [Ivar.squash t] returns
+   a root Ivar of an indirection chain (guaranteed to be acyclic by outer induction
+   hypothesis.) Mutating an Ivar to point to a root Ivar does not create a cycle unless
+   unless that Ivar was a root, but this contradicts the pattern match.
+
+   Case [connect]: All allocated indirections will point to [ivar], so we consider
+   only this case.  Can we say [ivar] is always a root Ivar?  Unfortunately no; after
+   invocation of [Ivar.squash], we are only guaranteed it is an indirection to a root
+   Ivar.  Mutating [t] (a root) to point to this Ivar does not create a cycle unless
+   [Ivar.squash ivar == t] (that is, [t] is [ivar]'s root Ivar.)  However, this contradicts
+   the [phys_equal t i] test.
+
+   For the base case, there are no Ivars at the beginning of programming execution,
+   so all indirections are trivially acyclic. QED.
+
+   -------------------------------------------------------------------------------
+
+   Lemma: If a bind-allocated Ivar [i] is an indirection or full and has no outside
+   references to it beyond the reference in the closure [fun a -> connect i (f a)],
+   it is garbage collected after this callback for [t] is finished executing.
+
+   Proof: To show that [i] is garbage collected, we must demonstrate all
+   references to it are dropped by the end of the execution of the callback.
+   As the scheduler drops references to callbacks once they are executed,
+   we only need to show that [connect i (f a)] does not introduce any new
+   references to [i].  Note that this was not the case for the old implementation
+   of bind [f a >>> fun y -> Ivar.fill i y], where [i] is further preserved
+   in a newly allocated closure.
+
+   Consider the implementation of [connect], letting [ivar = i] (for sake of
+   clarity, we will not refer to [i], which is a different local variable in
+   [connect].)  At most one new reference is constructed to [ivar], via the
+   instruction [t := Indir ivar].  We now demonstrate the invocation of
+   [repr tdef] eliminates this reference to [ivar] if [ivar] was an
+   indirection.  [repr] performs path compression, and as
+   [t = repr tdef], a call of [repr tdef] will result in
+   a recursive call of [repr t].  In the case that a reference was
+   retained to [ivar], we know [t] contains an [Indir ivar].  [t] is now overwritten
+   with a new indirection, which is the root of this indirection chain.  This
+   root cannot be [ivar], as by hypothesis, [ivar] is an indirection.  Thus,
+   this reference is eliminated. As there are no other references to [ivar], it
+   can be garbage collected. QED.
+
+   -------------------------------------------------------------------------------
+
+   Lemma: A bind-allocated Ivar is an indirection or full and has no outside
+   references if its allocating bind is executed in a context where it is in
+   tail-call position within the right-hand side of an outer bind.
+
+   Proof: We demonstrate indirection and lack of outside references separately.
+
+   Indirection or full: If the allocating bind is in tail-call position, its return value,
+   the allocated Ivar, is passed to the right-hand side of the outer bind. This value is
+   passed as the second argument [t] to [connect], which introduces an indirection from [t]
+   to [i], unless [t] is full (satisfying our other condition.)  As the indirection
+   introduction happens within the enclosing callback, there is no chance for preemption;
+   thus, when control returns to the scheduler, this property is fulfilled.
+
+   No outside references: As the bind is in tail-call position, no operations on
+   the returned bind can be performed (including saving it or adding an extra
+   callback to it) until it is passed as the right-hand argument to the outer bind.
+   As the outer bind does not return references to its right-hand argument, no
+   outside references are possible. QED. *)
+end
+
+module Stream = struct
+
+  type 'a t = 'a stream
+  type 'a next_ = 'a next = Nil | Cons of 'a * 'a stream
+  type 'a next = 'a next_ = Nil | Cons of 'a * 'a stream
+
+  let next t = t
+
+  let sexp_of_t sexp_of_a t =
+    let rec loop d ac =
+      match Deferred.peek d with
+      | None -> Sexp.List (List.rev (Sexp.Atom "..." :: ac))
+      | Some Nil -> Sexp.List (List.rev ac)
+      | Some (Cons (a, t)) -> loop t (sexp_of_a a :: ac)
+    in
+    loop t []
+  ;;
+
+  let iter t ~f =
+    let rec loop t =
+      Deferred.upon (next t) (function
+        | Nil -> ()
+        | Cons (v, t) -> f v; loop t)
+    in
+    loop t
+  ;;
+
+end
+
+module Execution_context = struct
+  type t = execution_context =
+    { block_group : Block_group.t;
+      monitor : Monitor.t;
+      priority : Priority.t;
+    }
+  with fields, sexp_of
+end

base/async/core/lib/basic.mli

+(** Basic contains the type declarations and basic functions for most of the Async
+    types: deferred, stream, ivar, tail, monitor, scheduler. *)
+
+open Core.Std
+
+type 'a ivar
+type 'a handler
+
+module Unregister : sig
+  type t
+
+  (** no-operation unregister *)
+  val noop : t
+
+  (** [create f] creates an unregister of the given function [f]. *)
+  val create : (unit -> unit) -> t
+
+  (** [unregister t] executes the unregistration function in [t]. *)
+  val unregister : t -> unit
+end
+
+module Deferred : sig
+  type +'a t with sexp_of
+
+  val bind          : 'a t -> ('a -> 'b t) -> 'b t
+  val create        : ('a ivar -> unit) -> 'a t
+  val is_determined : _ t -> bool
+  val peek          : 'a t -> 'a option
+  val return        : 'a -> 'a t
+  val upon          : 'a t -> ('a -> unit) -> unit
+  val upon'         : 'a t -> ('a -> unit) -> Unregister.t
+  val install_removable_handler : 'a t -> 'a handler -> Unregister.t
+
+  val debug_space_leaks   : int option ref
+end
+
+module Ivar : sig
+  type 'a t = 'a ivar with bin_io, sexp_of
+
+  val equal : 'a t -> 'a t -> bool
+  val create : unit -> 'a t
+  val fill : 'a t -> 'a -> unit
+  val fill_if_empty : 'a t -> 'a -> unit
+  val is_empty : 'a t -> bool
+  val is_full : 'a t -> bool
+  val read : 'a t -> 'a Deferred.t
+end
+
+module Stream : sig
+  type 'a t = 'a next Deferred.t
+  and 'a next = Nil | Cons of 'a * 'a t
+
+  val sexp_of_t : ('a -> Sexp.t) -> 'a t -> Sexp.t
+
+  val next : 'a t -> 'a next Deferred.t
+  val iter : 'a t -> f:('a -> unit) -> unit
+end
+
+module Tail : sig
+  type 'a t =
+    { (* [next] points at the tail of the stream *)
+      mutable next: 'a Stream.next Ivar.t;
+    }
+
+  val create : unit -> _ t
+end
+
+module Monitor : sig
+  type t =
+    { name_opt : string option;
+      id : int;
+      parent : t option;
+      errors : exn Tail.t;
+      mutable has_seen_error : bool;
+      mutable someone_is_listening : bool;
+    }
+  with fields, sexp_of
+end
+
+module Execution_context : sig
+  type t =
+    { block_group : Block_group.t;
+      monitor : Monitor.t;
+      priority : Jobs.Priority.t;
+    }
+  with fields, sexp_of
+end
+
+module Handler : sig
+  type 'a t = 'a handler
+
+  val create : ('a -> unit) -> 'a t
+  val prepend : 'a t -> f:('b -> 'a) -> 'b t
+  val filter : 'a t -> f:('a -> bool) -> 'a t
+  val schedule : 'a t -> 'a -> unit
+end
+
+module Clock_event : sig
+  type t =
+    { mutable state : state;
+    }
+  and state =
+  | Uninitialized
+  | Aborted
+  | Happened
+  | Waiting of waiting
+  and waiting =
+    { event : t Events.Event.t;
+      ready : [ `Happened | `Aborted ] Ivar.t;
+    }
+  with sexp_of
+end
+
+module Scheduler : sig
+  type t =
+    { jobs : Jobs.t;
+      mutable current_execution_context : Execution_context.t;
+      mutable main_execution_context : Execution_context.t;
+      mutable max_num_jobs_per_priority_per_cycle : int;
+      mutable uncaught_exception : exn option;
+      mutable cycle_count : int;
+      mutable cycle_start : Time.t;
+      mutable jobs_left : bool;
+      cycle_times : Time.Span.t Tail.t;
+      events : Clock_event.t Events.t;
+    }
+
+  val t : t
+  val add_job                    : Execution_context.t -> (unit -> unit) -> unit
+  val current_execution_context  : unit -> Execution_context.t
+  val set_execution_context      : Execution_context.t -> unit
+  val invariant : unit -> unit
+end

base/async/core/lib/block_group.ml

+open Core.Std
+
+(* Each block group reserves a certain number of threads that it may concurrently use.
+   Async guarantees the threads will be available by keeping the sum of the
+   "num_reserved_threads" over all block groups less than some maximum, which is a small
+   constant (like 100) for which we are confident that we can simultaneously have that many
+   threads. *)
+
+module Work = struct
+  type t =
+    {
+      (* When this work starts running, the name of the thread will be set
+         (via Linux_ext.pr_set_name) to this name if provided. *)
+      set_thread_name_to : string option;
+      doit : unit -> [ `Stop | `Continue ];
+    }
+  with sexp_of
+end
+
+type t =
+  { mutable num_blocked_threads : int;
+    min_reserved_threads : int;
+    mutable num_reserved_threads : int;
+    max_reserved_threads : int;
+    work : Work.t Queue.t;
+  }
+with sexp_of
+
+let create ~min_reserved_threads ~max_reserved_threads =
+  { num_blocked_threads = 0;
+    min_reserved_threads;
+    num_reserved_threads = min_reserved_threads;
+    max_reserved_threads;
+    work = Queue.create ();
+  }
+;;
+
+let bogus = create ~min_reserved_threads:0 ~max_reserved_threads:max_int
+
+let invariant t =
+  assert (0 <= t.num_blocked_threads);
+  assert (t.num_blocked_threads <= t.num_reserved_threads);
+  assert (0 <= t.min_reserved_threads);
+  assert (t.min_reserved_threads <= t.num_reserved_threads);
+  assert (t.num_reserved_threads <= t.max_reserved_threads);
+  assert (Queue.is_empty t.work
+           || t.num_blocked_threads = t.num_reserved_threads);
+;;

base/async/core/lib/clock.ml

+open Core.Std
+open Import
+open Deferred_std
+
+module Monitor = Raw_monitor
+module Stream = Async_stream
+
+module Event = Clock_event
+
+let can_not_abort (d, _event) = d >>| function `Aborted -> assert false | `Happened -> ()
+
+let at_event time = let (d, e) = Event.at time in (e, d)
+
+let at time = can_not_abort (at_event time)
+
+let after_event span =
+  (* We use [Time.now()] rather than [cycle_start] for the base time.  There can be
+     substantial difference between the two when people do long running computations or
+     mix blocking code with async.  And humans expect that [after] is based on the current
+     time, not some artifact of async implementation. *)
+  at_event (Time.add (Time.now ()) span)
+;;
+
+let after span = can_not_abort (after_event span)
+
+let at_varying_intervals ?(stop = Deferred.never ()) compute_span =
+  let m = Tail.create () in
+  let rec loop () =
+    upon
+      (choose
+         [choice stop (fun () -> `Stop);
+          choice (at (Time.add (Time.now ()) (compute_span ())))
+            (fun () -> `Tick)])
+      (function
+        | `Stop -> ()
+        | `Tick ->
+          Tail.extend m ();
+          loop ())
+  in
+  loop ();
+  Tail.collect m;
+;;
+
+let at_intervals ?stop span = at_varying_intervals ?stop (fun () -> span)
+
+let every' ?(start = Deferred.unit) ?(stop = Deferred.never ())
+    ?(continue_on_error = true) span f =
+  if Time.Span.(<=) span Time.Span.zero then
+    fail "Clock.every got nonpositive span" span <:sexp_of< Time.Span.t >>;
+  start
+  >>> fun () ->
+    (* We use an extra monitor so we can specially handle errors in [f]. *)
+    let saw_error = ref false in
+    let monitor = Monitor.create ~name:"Clock.every'" () in
+    Stream.iter (Monitor.errors monitor) ~f:(fun e ->
+      Monitor.send_exn (Monitor.current ()) e;
+      saw_error := true);
+    let rec loop () =
+      upon (choose [choice stop (fun () -> `Stop);
+                    choice (after span) (fun () -> `Continue)])
+        (function