Commits

Yaron Minsky committed d04fcd8 Merge

auto merge

Comments (0)

Files changed (300)

 mycaml$
 \.DS_Store$
 tmtags$
-\.rnc$
 
 ^\.git/.*$
 ^\.transdeps/.*$
 *.rej
 *.spot
 *.toc
+*.cmp
 .*.sw[pabcdef]
 .mydeps
 inline_tests.ml
 order_file_*_common.ml
 *_db_pnl_common.ml
 order_file_*_eval.ml
-
+_build
 
 # for new omake rules
 *.deps
 *.hg_version.c
 *.build_info.c
 hg_version.out
+stub.names
 
 # specific files
 .last_build_env
 libdeps.corrected
 ocaml_path
 .ocaml-3.10
+bisect{.itarget,.mlpack,.odocl,_pp.mlpack}
+bisect-1.1/Makefile.config
 \#*
 .\#*
 

base/async/.hgignore.in

 configure
 lib/async.mlpack
 lib/async.odocl
+scheduler
 setup.ml
 setup.log
 setup.data

base/async/core/lib/async_stream.ml

 open Import
 open Deferred_std
 
-include Basic.Stream
-
-(* This line is needed in to avoid a dependency cycle when building with
-   ocamlbuild, which considers both the '.ml' and the '.mli' as part of
-   a single compilation unit. Without this line, async_stream.ml would depend
-   on Monitor (while monitor.mli depends on Async_stream). *)
-module Monitor = Raw_monitor
-
-let next d = d
+include Raw_async_stream
 
 let first_exn t =
-  next t >>| function
-    | Nil -> failwith "Stream.first of empty stream"
-    | Cons (x, _) -> x
+  next t
+  >>| function
+  | Nil -> failwith "Stream.first of empty stream"
+  | Cons (x, _) -> x
 ;;
 
 let fold' t ~init ~f =
     (fun result ->
       let rec loop t b =
         upon (next t) (function
-          | Nil -> Ivar.fill result b
-          | Cons (v, t) -> upon (f b v) (loop t))
+        | Nil -> Ivar.fill result b
+        | Cons (v, t) -> upon (f b v) (loop t))
       in
       loop t init)
 ;;
 let filter' t ~f =
   create (fun tail ->
     upon (iter' t ~f:(fun v ->
-      f v >>| (function false -> () | true -> Tail.extend tail v)))
+      f v
+      >>| (function false -> () | true -> Tail.extend tail v)))
       (fun () -> Tail.close_exn tail))
 ;;
 
       next t
       >>> function
         | Nil -> Ivar.fill result ()
-        | Cons (x, t) -> loop t; f x
+        | Cons (x, t) ->
+          (* We immediately call [loop], thus making the iter durable.  Any exceptions
+             raised by [f] will not prevent the loop from continuing, and will go to the
+             monitor of whomever called [iter_durably_report_end]. *)
+          loop t; f x
     in
     loop t)
 ;;

base/async/core/lib/async_stream.mli

-(** A stream is an immutable sequence of values, with a possibly incomplete tail that may
+(** For most applications one should use [Pipe] instead of Stream.
+
+    A stream is an immutable sequence of values, with a possibly incomplete tail that may
     be extended asynchronously.
 
     The basic primitive operation for getting the next element out of stream is
     Stream.next, which (asynchronously) returns the element and the rest of the stream. *)
 open Core.Std
-open Basic
 
-type 'a t = 'a Basic.Stream.t with sexp_of
+type 'a t = ('a, Execution_context.t) Raw_stream.t with sexp_of
 (** [sexp_of_t t f] returns a sexp of all of the elements currently available in the
-    stream. It is just for display purposes.  There is no [t_of_sexp]. *)
+    stream.  It is just for display purposes.  There is no [t_of_sexp]. *)
 
 (** [create f] returns a stream [t] and calls [f tail], where the elements of the stream
     are determined as the tail is extended, and the end of the stream is reached when the
 val create : ('a Tail.t -> unit) -> 'a t
 
 (** [next t] returns a deferred that will become determined when the next part of the
-    stream is determined.  This is [Const (v, t')], where v is the next element of the
+    stream is determined.  This is [Cons (v, t')], where v is the next element of the
     stream and t' is the rest of the stream, or with Nil at the end of the stream. *)
-type 'a next = Nil | Cons of 'a * 'a t
+type ('a, 'execution_context) next_ = Nil | Cons of 'a * ('a, 'execution_context) Raw_stream.t
+type 'a next = ('a, Execution_context.t) next_
 val next : 'a t -> 'a next Deferred.t
 
 (** [first_exn t] returns a deferred that becomes determined with the first element of
 (** [iter_durably' t ~f] is like [iter' t ~f], except if [f] raises an exception it
     continues with the next element of the stream *and* reraises the exception (to the
     monitor in scope when iter_durably was called). *)
-val iter_durably' : 'a t -> f:('a -> unit Deferred.t) -> unit Deferred.t
-
 (** [iter_durably t ~f] is like [iter t ~f], except if [f] raises an exception it
     continues with the next element of the stream *and* reraises the exception (to the
     monitor in scope when iter_durably was called). *)
-val iter_durably : 'a t -> f:('a -> unit) -> unit
-
 (** [iter_durably_report_end t ~f] is equivalent to [iter_durably' t ~f:(fun x -> return
     (f x))] but it is more efficient *)
-val iter_durably_report_end : 'a t -> f:('a -> unit) -> unit Deferred.t
+val iter_durably'           : 'a t -> f:('a -> unit Deferred.t) -> unit Deferred.t
+val iter_durably            : 'a t -> f:('a -> unit           ) -> unit
+val iter_durably_report_end : 'a t -> f:('a -> unit           ) -> unit Deferred.t
 
 (** [length s] returns a deferred that is determined when the end of s is reached, taking
     the value of the number of elements in s *)

base/async/core/lib/basic.ml

-open Core.Std
-open Import
-
-let debug = Debug.debug
-
-module Priority = Jobs.Priority
-
-type +'a deferred
-
-type 'a handler =
-  { execution_context : execution_context;
-    run : 'a -> unit;
-  }
-and execution_context =
-  { block_group : Block_group.t;
-    monitor : monitor;
-    priority : Priority.t;
-  }
-and 'a ivar = 'a ivar_cell ref
-and 'a ivar_cell =
-| Empty
-| Empty_one_handler of ('a -> unit) * execution_context
-| Empty_many_handlers of 'a handler Bag.t
-| Indir of 'a ivar
-| Full of 'a
-and monitor =
-  { name_opt : string option;
-    id : int;
-    parent : monitor option;
-    errors : exn tail;
-    mutable has_seen_error : bool;
-    mutable someone_is_listening : bool;
-  }
-and 'a tail =
-  { (* [next] points at the tail of the stream *)
-    mutable next: 'a next ivar;
-  }
-and 'a stream = 'a next deferred
-and 'a next = Nil | Cons of 'a * 'a stream
-;;
-
-module Ivar0 = struct
-  type 'a t = 'a ivar
-
-  let create () = ref Empty
-  let create_full a = ref (Full a)
-
-  let equal (t : _ t) t' = phys_equal t t'
-
-  (* Returns a non-indirected ivar, and path compresses any indirections. *)
-  let rec tail_recursive_squash t ac =
-    match !t with
-    | Indir t' -> tail_recursive_squash t' (t :: ac)
-    | _ ->
-      let indir = Indir t in
-      List.iter ac ~f:(fun ancestor ->
-        match !ancestor with
-        | Indir already_points_to ->
-          (* Don't bother mutating if it didn't change. *)
-          if not (phys_equal t already_points_to) then ancestor := indir;
-        | _ -> assert false (* We only put [Indir]s on [ac] *)
-      );
-      t
-  ;;
-
-  let rec squash t = tail_recursive_squash t []
-
-  let rec sexp_of_t sexp_of_a t =
-    match !(squash t) with
-    | Indir _ -> assert false (* fulfilled by squash *)
-    | Full v -> Sexp.List [Sexp.Atom "Full"; sexp_of_a v]
-    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> Sexp.Atom "Empty"
-  ;;
-
-  let peek t =
-    match !(squash t) with
-    | Indir _ -> assert false (* fulfilled by squash *)
-    | Full a -> Some a
-    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> None
-  ;;
-
-  let is_empty t =
-    match !(squash t) with
-    | Indir _ -> assert false (* fulfilled by squash *)
-    | Full _ -> false
-    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> true
-  ;;
-
-  let is_full t = not (is_empty t)
-
-end
-
-module Tail = struct
-  type 'a t = 'a tail = { mutable next: 'a next ivar }
-
-  let create () = { next = Ivar0.create () }
-end
-
-module Monitor = struct
-  type t = monitor =
-    { name_opt : string option;
-      id : int;
-      parent : monitor option;
-      errors : exn tail;
-      mutable has_seen_error : bool;
-      mutable someone_is_listening : bool;
-    }
-  with fields
-
-  let bogus =
-    { name_opt = None;
-      id = -1;
-      parent = None;
-      errors = Tail.create ();
-      has_seen_error = false;
-      someone_is_listening = false;
-    }
-  ;;
-
-  module Pretty = struct
-    type one =
-      { name_opt : string option;
-        id : int;
-        has_seen_error : bool;
-        someone_is_listening : bool
-      }
-    with sexp_of
-
-    type t = one list
-    with sexp_of
-  end
-
-  let rec to_pretty t =
-    let rec loop
-        { name_opt; id; parent = _; errors = _; has_seen_error; someone_is_listening }
-        ac =
-      let ac = { Pretty. name_opt; id; has_seen_error; someone_is_listening } :: ac in
-      match t.parent with
-      | None -> List.rev ac
-      | Some t -> loop t ac
-    in
-    loop t []
-  ;;
-
-  let sexp_of_t t = Pretty.sexp_of_t (to_pretty t)
-
-end
-
-module Unregister = struct
-  type t = unit -> unit
-
-  let noop () = ()
-  let create f = f
-  let unregister f = f ()
-end
-
-module Clock_event = struct
-  (* Clock events start in the [Uninitialized] state just for their creation (because of
-     cyclic types), and then are immediately marked as [Waiting] or [Happened], depending
-     on the time.  A [Waiting] event will then either [Happen] at the appropriate time,
-     or be [Aborted] prior to when it would have [Happened].
-
-     Uninitialized
-     |           |
-     v           v
-     Waiting --> Happened
-     |
-     v
-     Aborted *)
-  type t =
-    { mutable state : state;
-    }
-  and state =
-  | Uninitialized
-  | Aborted
-  | Happened
-  | Waiting of waiting
-  and waiting =
-    { event : t Events.Event.t;
-      ready : [ `Happened | `Aborted ] Ivar0.t;
-    }
-  with sexp_of
-end
-
-module Scheduler = struct
-  type t =
-    { jobs : Jobs.t sexp_opaque;
-      mutable current_execution_context : execution_context sexp_opaque;
-      mutable main_execution_context : execution_context sexp_opaque;
-      mutable max_num_jobs_per_priority_per_cycle : int;
-      mutable uncaught_exception : exn option;
-      mutable num_jobs_run : int;
-      mutable cycle_count : int;
-      mutable cycle_start : Time.t;
-      cycle_times : Time.Span.t Tail.t;
-      cycle_num_jobs : int Tail.t;
-      events : Clock_event.t Events.t;
-    }
-
-  let t =
-    let bogus_execution_context =
-      { block_group = Block_group.bogus;
-        monitor = Monitor.bogus;
-        priority = Priority.normal;
-      }
-    in
-    let now = Time.epoch in
-    { jobs = Jobs.create ();
-      current_execution_context = bogus_execution_context;
-      main_execution_context = bogus_execution_context;
-      max_num_jobs_per_priority_per_cycle = 500;
-      uncaught_exception = None;
-      num_jobs_run = 0;
-      cycle_start = now;
-      cycle_count = 0;
-      cycle_times = Tail.create ();
-      cycle_num_jobs = Tail.create ();
-      events = Events.create ~now;
-    }
-  ;;
-
-  let invariant () =
-    assert (t.num_jobs_run >= 0);
-    assert (t.cycle_count >= 0);
-    Events.invariant t.events;
-    try
-      Events.iter t.events ~f:(fun events_event ->
-        let event = Events.Event.value events_event in
-        let module E = Clock_event in
-        match event.E.state with
-        | E.Waiting { E. event = events_event'; ready } ->
-          assert (phys_equal events_event events_event');
-          assert (Ivar0.is_empty ready);
-        | _ -> assert false);
-    with exn ->
-      fail "events problem" (exn, t.events) <:sexp_of< exn * Clock_event.t Events.t >>;
-  ;;
-
-  let current_execution_context () = t.current_execution_context
-
-  let set_execution_context execution_context =
-    (* Avoid a caml_modify in most cases. *)
-    if not (phys_equal execution_context t.current_execution_context) then
-      t.current_execution_context <- execution_context;
-  ;;
-
-  let add_job execution_context f a =
-    let job = Job.create (fun () ->
-      set_execution_context execution_context;
-      f a)
-    in
-    let priority = execution_context.priority in
-    if debug then Debug.log "enqueing job" priority <:sexp_of< Priority.t >>;
-    Jobs.add t.jobs priority job;
-  ;;
-
-end
-
-module Handler = struct
-  type 'a t = 'a handler =
-    { execution_context : execution_context;
-      run : 'a -> unit;
-    }
-
-  let create run =
-    { execution_context = Scheduler.current_execution_context ();
-      run;
-    }
-  ;;
-
-  let schedule t v = Scheduler.add_job t.execution_context t.run v
-
-  let priority t = t.execution_context.priority
-
-  let filter t ~f = { t with run = fun a -> if f a then t.run a }
-
-  let prepend t ~f = { t with run = fun a -> t.run (f a) }
-end
-
-module Ivar = struct
-  include Ivar0
-
-  let read (type a) (t : a t) = (Obj.magic t : a deferred)
-
-  let fill t v =
-    let repr = squash t in
-    match !repr with
-    | Indir _ -> assert false (* fulfilled by squash *)
-    | Full _ -> fail "Ivar.fill of full ivar" t <:sexp_of< a t >>
-    | Empty -> repr := Full v
-    | Empty_one_handler (run, context) ->
-      repr := Full v;
-      Scheduler.add_job context run v;
-    | Empty_many_handlers handlers ->
-      repr := Full v;
-      Bag.iter handlers ~f:(fun h -> Handler.schedule h v);
-  ;;
-
-  let fill_if_empty t v = if is_empty t then fill t v
-
-  include Bin_prot.Utils.Make_binable1 (struct
-    module Binable = struct
-      type 'a t = 'a option with bin_io
-    end
-
-    type 'a t = 'a ivar
-
-    let to_binable t = peek t
-
-    let of_binable = function
-      | None -> create ()
-      | Some a -> create_full a
-    ;;
-  end)
-
-end
-
-module Deferred = struct
-  type 'a t = 'a deferred
-
-  let repr (type a) (deferred : a deferred) = Ivar.squash (Obj.magic deferred : a ivar)
-
-  let peek t = Ivar.peek (repr t)
-
-  let sexp_of_t f t = Ivar.sexp_of_t f (repr t)
-
-  let is_determined t = is_some (peek t)
-
-  let debug_space_leaks = ref None
-
-  let debug_bag_check bag =
-    match !debug_space_leaks with
-    | None -> ()
-    | Some bound -> assert (Bag.length bag < bound)
-  ;;
-
-  let install_removable_handler =
-    (* Don't pass 'cell', because it may have indirected in the meantime. *)
-    let add_to_bag t bag handler =
-      let elt = Bag.add bag handler in
-      debug_bag_check bag;
-      (* Invariant: Empty_many_handlers is never demoted to anything but Full. *)
-      Unregister.create (fun () ->
-        let cell = repr t in
-        match !cell with
-        | Empty_many_handlers bag' -> Bag.remove bag' elt
-        | Full _ -> ()
-        | _ -> assert false
-      );
-    in
-    fun t handler ->
-      let cell = repr t in
-      match !cell with
-      | Empty ->
-        let bag = Bag.create () in
-        cell := Empty_many_handlers bag;
-        add_to_bag t bag handler;
-      | Empty_one_handler (run, execution_context) ->
-        let bag = Bag.create () in
-        cell := Empty_many_handlers bag;
-        ignore (Bag.add bag { execution_context; run });
-        add_to_bag t bag handler;
-      | Empty_many_handlers bag -> add_to_bag t bag handler
-      | Full v ->
-        let still_installed = ref true in
-        let handler = Handler.filter handler ~f:(fun _ -> !still_installed) in
-        Handler.schedule handler v;
-        Unregister.create (fun () -> still_installed := false);
-      | Indir _ -> assert false (* fulfilled by repr *)
-  ;;
-
-  let upon' t run = install_removable_handler t (Handler.create run)
-
-  (* [upon] is conceptually the same as
-
-     let upon t f = ignore (upon' t run)
-
-     However, below is a more efficient implementation, which is worth doing because
-     [upon] is very widely used and is so much more common than [upon'].  The below
-     implementation avoids the use of the bag of handlers in the extremely common case of
-     one handler for the deferred. *)
-  let upon =
-    fun t run ->
-      let current_execution_context = Scheduler.current_execution_context () in
-      let cell = repr t in
-      match !cell with
-      | Indir _ -> assert false (* fulfilled by repr *)
-      | Full v -> Scheduler.add_job current_execution_context run v
-      | Empty -> cell := Empty_one_handler (run, current_execution_context)
-      | Empty_one_handler (run', execution_context') ->
-        let bag = Bag.create () in
-        ignore (Bag.add bag { run;
-                              execution_context = current_execution_context;
-                            });
-        ignore (Bag.add bag { run = run';
-                              execution_context = execution_context';
-                            });
-        cell := Empty_many_handlers bag;
-      | Empty_many_handlers bag ->
-        ignore (Bag.add bag { run;
-                              execution_context = current_execution_context;
-                            });
-   ;;
-
-  (* [connect] takes an Ivar [ivar] and a Deferred [tdef] and, subject to some side
-     conditions, makes [tdef] point to the destination of [ivar] as an [Indirection]. *)
-  let connect ivar tdef =
-    (* ivar and tdef can be chains upon entry to connect, since tdef is an arbitrary
-       user-supplied deferred from the right-hand side of connect, and ivar is returned to
-       the user prior to being used in the callback, and may be converted to an
-       indirection in the case of right-nested binds. *)
-    let i = Ivar.squash ivar in
-    let t = repr tdef in
-    (* i and ivar are the same internally, but have different types *)
-    if not (phys_equal i t) then begin
-      (* Strange order :-/ *)
-      begin match !i, !t with
-      | _ , Empty -> t := Indir ivar
-      | _, Full v -> Ivar.fill ivar v
-      | _, Indir _ -> assert false (* fulfilled by repr *)
-      | Empty, tc -> (* do a swap *) i := tc; t := Indir ivar
-      (* [connect] is only used in bind, whose ivar is only ever exported as a read-only
-         deferred.  Thus, it is impossible for the client to fill the [ivar]. *)
-      | Full _, _ -> assert false
-      | Indir _, _ -> assert false (* fulfilled by Ivar.squash *)
-      (* Tricky cases now. Assume invariant that bags are never shared. *)
-      | Empty_one_handler (run1, ec1), Empty_one_handler (run2, ec2) ->
-        let bag = Bag.create () in
-        ignore (Bag.add bag { run = run1; execution_context = ec1});
-        ignore (Bag.add bag { run = run2; execution_context = ec2});
-        debug_bag_check bag;
-        i := Empty_many_handlers bag;
-        t := Indir ivar
-      | Empty_many_handlers bag, Empty_one_handler (run, execution_context) ->
-        ignore (Bag.add bag { run; execution_context });
-        debug_bag_check bag;
-        (* no need to rewrite [i], as it's a bag *)
-        t := Indir ivar
-      | Empty_one_handler (run, execution_context), Empty_many_handlers bag ->
-        let handler = { run; execution_context } in
-        ignore (Bag.add bag handler);
-        debug_bag_check bag;
-        i := !t; (* do a swap *)
-        t := Indir ivar
-      | Empty_many_handlers bag_i, Empty_many_handlers bag_t ->
-        Bag.transfer ~src:bag_t ~dst:bag_i;
-        debug_bag_check bag_i;
-        t := Indir ivar
-      end;
-      (* Squash original tdef reference; otherwise, it may grow by two indirections rather
-         than one.  This is the key step that eliminates any references to [ivar] if it
-         was an indirection. *)
-      ignore (repr tdef)
-    end;
-  ;;
-
-  let create f =
-    let result = Ivar.create () in
-    f result;
-    Ivar.read result;
-  ;;
-
-  let bind t f = create (fun i -> upon t (fun a -> connect i (f a)))
-
-  let return a = Ivar.read (Ivar.create_full a)
-
-(* PROOF OF BIND SPACE PERFORMANCE PROPERTIES
-
-   Theorem: A bind-allocated Ivar allocated by [m >>= f] is garbage
-   collected after [m] becomes available and all its callbacks are
-   executed, if this bind was executed in a context where it was in
-   tail-call position within the right-hand side of an outer bind.
-
-   We divide this proof into two lemmas, the first of which establishes
-   a tight condition for garbage collection of the bind-allocated Ivar
-   given no outside references, and the second of which establishes a
-   (mostly syntactic) program property that is sufficient (though
-   not necessary) to fulfill these conditions.
-
-   -------------------------------------------------------------------------------
-
-   Definition: An Ivar [i] is an "root" if [!i] does not pattern match with
-   [Indir _].  Otherwise, the Ivar is an "indirection".
-
-   Definition: An IVar [i] is "full" if [!i] pattern matches with [Full _].
-   Full Ivars are also roots.
-
-   Definition: A "bind-allocated Ivar" is an Ivar allocated by the call
-   to [create] in the function body of [bind].
-
-   -------------------------------------------------------------------------------
-
-   Lemma: Indirection chains are acyclic, that is, for all Ivars [i], and given
-   [f i = match !(Ivar.squash_raw i) with Indir i' = i' | _ = i], there
-   exists no natural number n such that f applied n times to i
-   (e.g. [f (f (f ... (f i)))]), is physically equal to i.
-
-   Proof: Induction on runtime invocations [Indir] allocating functions
-   [Ivar.squash] and [connect].  We show that, if there all indirections are
-   acyclic, after executing these functions, all indirections are acyclic.
-
-   Case [Ivar.squash]: We have by simple induction that [Ivar.squash t] returns
-   a root Ivar of an indirection chain (guaranteed to be acyclic by outer induction
-   hypothesis.) Mutating an Ivar to point to a root Ivar does not create a cycle unless
-   unless that Ivar was a root, but this contradicts the pattern match.
-
-   Case [connect]: All allocated indirections will point to [ivar], so we consider
-   only this case.  Can we say [ivar] is always a root Ivar?  Unfortunately no; after
-   invocation of [Ivar.squash], we are only guaranteed it is an indirection to a root
-   Ivar.  Mutating [t] (a root) to point to this Ivar does not create a cycle unless
-   [Ivar.squash ivar == t] (that is, [t] is [ivar]'s root Ivar.)  However, this contradicts
-   the [phys_equal t i] test.
-
-   For the base case, there are no Ivars at the beginning of programming execution,
-   so all indirections are trivially acyclic. QED.
-
-   -------------------------------------------------------------------------------
-
-   Lemma: If a bind-allocated Ivar [i] is an indirection or full and has no outside
-   references to it beyond the reference in the closure [fun a -> connect i (f a)],
-   it is garbage collected after this callback for [t] is finished executing.
-
-   Proof: To show that [i] is garbage collected, we must demonstrate all
-   references to it are dropped by the end of the execution of the callback.
-   As the scheduler drops references to callbacks once they are executed,
-   we only need to show that [connect i (f a)] does not introduce any new
-   references to [i].  Note that this was not the case for the old implementation
-   of bind [f a >>> fun y -> Ivar.fill i y], where [i] is further preserved
-   in a newly allocated closure.
-
-   Consider the implementation of [connect], letting [ivar = i] (for sake of
-   clarity, we will not refer to [i], which is a different local variable in
-   [connect].)  At most one new reference is constructed to [ivar], via the
-   instruction [t := Indir ivar].  We now demonstrate the invocation of
-   [repr tdef] eliminates this reference to [ivar] if [ivar] was an
-   indirection.  [repr] performs path compression, and as
-   [t = repr tdef], a call of [repr tdef] will result in
-   a recursive call of [repr t].  In the case that a reference was
-   retained to [ivar], we know [t] contains an [Indir ivar].  [t] is now overwritten
-   with a new indirection, which is the root of this indirection chain.  This
-   root cannot be [ivar], as by hypothesis, [ivar] is an indirection.  Thus,
-   this reference is eliminated. As there are no other references to [ivar], it
-   can be garbage collected. QED.
-
-   -------------------------------------------------------------------------------
-
-   Lemma: A bind-allocated Ivar is an indirection or full and has no outside
-   references if its allocating bind is executed in a context where it is in
-   tail-call position within the right-hand side of an outer bind.
-
-   Proof: We demonstrate indirection and lack of outside references separately.
-
-   Indirection or full: If the allocating bind is in tail-call position, its return value,
-   the allocated Ivar, is passed to the right-hand side of the outer bind. This value is
-   passed as the second argument [t] to [connect], which introduces an indirection from [t]
-   to [i], unless [t] is full (satisfying our other condition.)  As the indirection
-   introduction happens within the enclosing callback, there is no chance for preemption;
-   thus, when control returns to the scheduler, this property is fulfilled.
-
-   No outside references: As the bind is in tail-call position, no operations on
-   the returned bind can be performed (including saving it or adding an extra
-   callback to it) until it is passed as the right-hand argument to the outer bind.
-   As the outer bind does not return references to its right-hand argument, no
-   outside references are possible. QED. *)
-end
-
-module Stream = struct
-
-  type 'a t = 'a stream
-  type 'a next_ = 'a next = Nil | Cons of 'a * 'a stream
-  type 'a next = 'a next_ = Nil | Cons of 'a * 'a stream
-
-  let next t = t
-
-  let sexp_of_t sexp_of_a t =
-    let rec loop d ac =
-      match Deferred.peek d with
-      | None -> Sexp.List (List.rev (Sexp.Atom "..." :: ac))
-      | Some Nil -> Sexp.List (List.rev ac)
-      | Some (Cons (a, t)) -> loop t (sexp_of_a a :: ac)
-    in
-    loop t []
-  ;;
-
-  let iter t ~f =
-    let rec loop t =
-      Deferred.upon (next t) (function
-        | Nil -> ()
-        | Cons (v, t) -> f v; loop t)
-    in
-    loop t
-  ;;
-
-end
-
-module Execution_context = struct
-  type t = execution_context =
-    { block_group : Block_group.t;
-      monitor : Monitor.t;
-      priority : Priority.t;
-    }
-  with fields, sexp_of
-end

base/async/core/lib/basic.mli

-(** Basic contains the type declarations and basic functions for most of the Async
-    types: deferred, stream, ivar, tail, monitor, scheduler. *)
-
-open Core.Std
-
-type 'a ivar
-type 'a handler
-
-module Unregister : sig
-  type t
-
-  (** no-operation unregister *)
-  val noop : t
-
-  (** [create f] creates an unregister of the given function [f]. *)
-  val create : (unit -> unit) -> t
-
-  (** [unregister t] executes the unregistration function in [t]. *)
-  val unregister : t -> unit
-end
-
-module Deferred : sig
-  type +'a t with sexp_of
-
-  val bind          : 'a t -> ('a -> 'b t) -> 'b t
-  val create        : ('a ivar -> unit) -> 'a t
-  val is_determined : _ t -> bool
-  val peek          : 'a t -> 'a option
-  val return        : 'a -> 'a t
-  val upon          : 'a t -> ('a -> unit) -> unit
-  val upon'         : 'a t -> ('a -> unit) -> Unregister.t
-  val install_removable_handler : 'a t -> 'a handler -> Unregister.t
-
-  val debug_space_leaks   : int option ref
-end
-
-module Ivar : sig
-  type 'a t = 'a ivar with bin_io, sexp_of
-
-  val equal : 'a t -> 'a t -> bool
-  val create : unit -> 'a t
-  val fill : 'a t -> 'a -> unit
-  val fill_if_empty : 'a t -> 'a -> unit
-  val is_empty : 'a t -> bool
-  val is_full : 'a t -> bool
-  val read : 'a t -> 'a Deferred.t
-end
-
-module Stream : sig
-  type 'a t = 'a next Deferred.t
-  and 'a next = Nil | Cons of 'a * 'a t
-
-  val sexp_of_t : ('a -> Sexp.t) -> 'a t -> Sexp.t
-
-  val next : 'a t -> 'a next Deferred.t
-  val iter : 'a t -> f:('a -> unit) -> unit
-end
-
-module Tail : sig
-  type 'a t =
-    { (* [next] points at the tail of the stream *)
-      mutable next: 'a Stream.next Ivar.t;
-    }
-
-  val create : unit -> _ t
-end
-
-module Monitor : sig
-  type t =
-    { name_opt : string option;
-      id : int;
-      parent : t option;
-      errors : exn Tail.t;
-      mutable has_seen_error : bool;
-      mutable someone_is_listening : bool;
-    }
-  with fields, sexp_of
-end
-
-module Execution_context : sig
-  type t =
-    { block_group : Block_group.t;
-      monitor : Monitor.t;
-      priority : Jobs.Priority.t;
-    }
-  with fields, sexp_of
-end
-
-module Handler : sig
-  type 'a t = 'a handler
-
-  val create : ('a -> unit) -> 'a t
-  val prepend : 'a t -> f:('b -> 'a) -> 'b t
-  val filter : 'a t -> f:('a -> bool) -> 'a t
-  val schedule : 'a t -> 'a -> unit
-end
-
-module Clock_event : sig
-  type t =
-    { mutable state : state;
-    }
-  and state =
-  | Uninitialized
-  | Aborted
-  | Happened
-  | Waiting of waiting
-  and waiting =
-    { event : t Events.Event.t;
-      ready : [ `Happened | `Aborted ] Ivar.t;
-    }
-  with sexp_of
-end
-
-module Scheduler : sig
-  type t =
-    { jobs : Jobs.t;
-      mutable current_execution_context : Execution_context.t;
-      mutable main_execution_context : Execution_context.t;
-      mutable max_num_jobs_per_priority_per_cycle : int;
-      mutable uncaught_exception : exn option;
-      mutable num_jobs_run : int;
-      mutable cycle_count : int;
-      mutable cycle_start : Time.t;
-      cycle_times : Time.Span.t Tail.t;
-      cycle_num_jobs : int Tail.t;
-      events : Clock_event.t Events.t;
-    }
-
-  val t : t
-  val add_job                    : Execution_context.t -> ('a -> unit) -> 'a -> unit
-  val current_execution_context  : unit -> Execution_context.t
-  val set_execution_context      : Execution_context.t -> unit
-  val invariant : unit -> unit
-end

base/async/core/lib/clock.ml

 open Import
 open Deferred_std
 
-module Monitor = Raw_monitor
 module Stream = Async_stream
+module Event = Clock_event
 
-module Event = Clock_event
+open Monitor.Exported_for_scheduler
 
 let can_not_abort (d, _event) = d >>| function `Aborted -> assert false | `Happened -> ()
 
 let every' ?(start = Deferred.unit) ?(stop = Deferred.never ())
     ?(continue_on_error = true) span f =
   if Time.Span.(<=) span Time.Span.zero then
-    fail "Clock.every got nonpositive span" span <:sexp_of< Time.Span.t >>;
+    failwiths "Clock.every got nonpositive span" span <:sexp_of< Time.Span.t >>;
   start
   >>> fun () ->
     (* We use an extra monitor so we can specially handle errors in [f]. *)
     Stream.iter (Monitor.errors monitor) ~f:(fun e ->
       Monitor.send_exn (Monitor.current ()) e;
       saw_error := true);
-    let rec loop () =
+    let rec loop wait =
       upon (choose [choice stop (fun () -> `Stop);
-                    choice (after span) (fun () -> `Continue)])
+                    choice wait (fun () -> `Continue)])
         (function
           | `Stop -> ()
           | `Continue ->
                raised an error after having returned ok.  We check [saw_error] and don't
                proceed if it did. *)
             if continue_on_error || not !saw_error then begin
-              Monitor.within' ~monitor (fun () ->
+              within' ~monitor (fun () ->
                 Monitor.try_with_raise_rest
                   (fun () ->
                     (* We check at the last possible moment before running [f] whether
                 | Ok z ->
                   begin match z with
                   | `Stop -> ()
-                  | `Ran -> loop ()
+                  | `Ran -> loop (after span)
                   end
                 | Error error ->
                   Monitor.send_exn (Monitor.current ()) error;
-                  if continue_on_error then loop ()
+                  if continue_on_error then loop (after span)
             end)
     in
-    loop ()
+    loop Deferred.unit
 ;;
 
 let every ?start ?stop ?continue_on_error span f =

base/async/core/lib/clock.mli

-(** Clock includes functions to create deferreds that become determined at a
-    certain time.
+(** Clock includes functions to create deferreds that become determined at a certain time.
 
-    The underlying implementation uses a heap of events, one for each deferred
-    that needs to be determined at some time in the future.  It uses the timeout
-    argument to select() in the core select loop to wake up and fill in the
-    deferreds.
-*)
+    The underlying implementation uses a heap of events, one for each deferred that needs
+    to be determined at some time in the future.  It uses the timeout argument to select()
+    in the core select loop to wake up and fill in the deferreds. *)
 
 open Core.Std
 
     checking, if [d] is determined and the timeout has expired, the resulting deferred
     will be determined with [`Result].  In other words, since there is inherent race
     between [d] and the timeout, the preference is given to [d]. *)
-val with_timeout :
-  Time.Span.t
+val with_timeout
+  :  Time.Span.t
   -> 'a Deferred.t
   -> [ `Timeout
      | `Result of 'a
 val at       : Time.t -> unit Deferred.t
 val at_event : Time.t -> [ `Happened | `Aborted ] Deferred.t * Event.t
 
-(** [at_varying_intervals ~stop f] returns a stream whose next element becomes
-    determined by calling [f ()] and waiting for that amount of time, and then
-    looping to determine subsequent elements.  The stream will end after [stop]
-    becomes determined.
-*)
+(** [at_varying_intervals ~stop f] returns a stream whose next element becomes determined
+    by calling [f ()] and waiting for that amount of time, and then looping to determine
+    subsequent elements.  The stream will end after [stop] becomes determined. *)
 val at_varying_intervals :
   ?stop:unit Deferred.t -> (unit -> Time.Span.t) -> unit Async_stream.t
 
-(** [at_intervals ~stop span] returns a stream whose elements will become
-    determined span time apart.  The stream will end after stop becomes
-    determined.
+(** [at_intervals ~stop span] returns a stream whose elements will become determined span
+    time apart.  The stream will end after stop becomes determined.
 
-    [at_intervals ~stop span] = [at_varying_intervals ~stop (fun () -> span)]
-*)
+    [at_intervals ~stop span] = [at_varying_intervals ~stop (fun () -> span)] *)
 val at_intervals : ?stop:unit Deferred.t -> Time.Span.t -> unit Async_stream.t
 
 (** [every' ?start ?stop span f] runs [f()] every [span] amount of time starting when
     It is guaranteed that if [stop] becomes determined, even during evaluation of [f],
     then [f] will not be called again by a subsequent iteration of the loop.
 
-    It is an error for [span] to be nonpositive.
-*)
-val every' :
-  ?start : unit Deferred.t
+    It is an error for [span] to be nonpositive. *)
+val every'
+  :  ?start : unit Deferred.t
   -> ?stop : unit Deferred.t
   -> ?continue_on_error : bool
   -> Time.Span.t
   -> (unit -> unit Deferred.t) -> unit
 
-(** [every ?start ?stop span f]
- * is [every' ?start ?stop span (fun () -> f (); Deferred.unit)]
- *)
-val every :
-  ?start : unit Deferred.t
+(** [every ?start ?stop span f] is
+    [every' ?start ?stop span (fun () -> f (); Deferred.unit)] *)
+val every
+  :  ?start : unit Deferred.t
   -> ?stop : unit Deferred.t
   -> ?continue_on_error : bool
   -> Time.Span.t
   -> (unit -> unit) -> unit
 
-val with_timeout :
-  Time.Span.t -> 'a Deferred.t -> [ `Timeout | `Result of 'a ] Deferred.t
+val with_timeout : Time.Span.t -> 'a Deferred.t -> [ `Timeout | `Result of 'a ] Deferred.t

base/async/core/lib/clock_event.ml

 open Core.Std
 open Import
 
-module Scheduler = Basic.Scheduler
+module Scheduler = Raw_scheduler
 
-include Basic.Clock_event
+open Raw_clock_event.T
+
+type t = Execution_context.t Raw_clock_event.t with sexp_of
+
+let events = Scheduler.(t.events)
 
 let status t =
   match t.state with
 
 let fire t =
   match t.state with
-  | Uninitialized | Aborted | Happened -> fail "Event.fire" t <:sexp_of< t >>
+  | Uninitialized | Aborted | Happened -> failwiths "Event.fire" t <:sexp_of< t >>
   | Waiting waiting ->
     Ivar.fill waiting.ready `Happened;
     t.state <- Happened;
   | Happened -> `Previously_happened
   | Waiting waiting ->
     t.state <- Aborted;
-    begin match Events.remove Scheduler.(t.events) waiting.event with
+    begin match Events.remove events waiting.event with
     | `Not_present -> assert false
     | `Removed -> ()
     end;
 let at time =
   let ready = Ivar.create () in
   let t = { state = Uninitialized } in
-  begin match Events.add Scheduler.(t.events) ~at:time t with
+  begin match Events.add events ~at:time t with
   | `Not_in_the_future -> t.state <- Happened; Ivar.fill ready `Happened;
   | `Ok events_event   -> t.state <- Waiting { event = events_event; ready };
   end;

base/async/core/lib/debug.ml

 
 let debug = is_some (Sys.getenv "DEBUG_ASYNC")
 
-(* Calls to [Debug.print] should look like [if debug then Debug.print ...]. *)
+(** When [RECORD_ASYNC_BACKTRACES] is set, the Async scheduler records a backtrace
+    everytime it schedules a job, and will include the backtrace history in any exceptions
+    raised to monitors, and in particular, in an unhandled exception that reaches the main
+    monitor. *)
+let record_backtraces =
+  match Backtrace.get with
+  | Ok _ -> is_some (Sys.getenv "RECORD_ASYNC_BACKTRACES")
+  | Error _ -> (* unimplemented *) false
+
+(* Calls to [Debug.log] should look like [if Debug.debug then Debug.log ...]. *)
 
 let log message a sexp_of_a =
   eprintf "%s\n%!"
            (Unix.getpid (), Thread.id (Thread.self ()), Time.now (), message, a)))
 ;;
 
+let () = if debug then log "record_backtraces" record_backtraces <:sexp_of< bool >>
+
 let log_string message = log message () <:sexp_of< unit >>

base/async/core/lib/deferred.ml

 open Core.Std
-open Basic
+open Import
 
-module Ivar = Basic.Ivar
+module Deferred = Raw_deferred
+module Scheduler = Raw_scheduler
 
-include Basic.Deferred
+open Deferred
+
+module T = struct
+  include Raw_deferred.Scheduler_dependent (Scheduler)
+  let return = return
+end
+
+include T
+
+let create = create
+
+let peek = peek
+
+let is_determined = is_determined
+
+let debug_space_leaks = Raw_ivar.debug_space_leaks
 
 let never () = Ivar.read (Ivar.create ())
 
 let (>>>) = upon
 
-include (Monad.Make (Basic.Deferred))
+include (Monad.Make (T))
 
 (* We shadow [all] on-purpose here, since the default definition introduces a chain of
    binds as long as the list. *)
 let all = `Make_sure_to_define_all_elsewhere
 
-(* we shadow [map] from Monad with a more efficient implementation *)
+(* We shadow [map] from Monad with a more efficient implementation *)
 let map t ~f = create (fun i -> upon t (fun a -> Ivar.fill i (f a)))
 
 let unit = return ()
   | Some f -> f ()
 ;;
 
-let choose_ident choices =
-  choose (List.map choices ~f:(fun t -> choice t ident))
-;;
+let choose_ident choices = choose (List.map choices ~f:(fun t -> choice t ident))
 
 let repeat_until_finished state f =
   create (fun finished ->
 
   type ('a, 'b) t = ('a, 'b) Map.Poly.t
 
-  let filter_mapi ~f t =
+  let filter_mapi t ~f =
     List.fold (Map.to_alist t) ~init:Map.Poly.empty ~f:(fun map (key, data) ->
-      f ~key ~data >>= function
-      | Some data2 -> return (Map.add ~key ~data:data2 map)
-      | None -> return map)
-
+      f ~key ~data
+      >>| function
+      | Some data -> Map.add map ~key ~data
+      | None -> map)
 end

base/async/core/lib/deferred.mli

     "undetermined" or "determined".  A deferred that is undetermined may at some point
     become determined with value v, and will henceforth always be determined with value
     v. *)
-type +'a t = 'a Basic.Deferred.t
+type +'a t = ('a, Execution_context.t) Raw_deferred.t with sexp_of
+type 'a detailed = 'a t with sexp_of
 
 (** [sexp_of_t t f] returns a sexp of the deferred's value, if it is determined, or an
     informative string otherwise.
 
     This is just for display purposes.  There is no [t_of_sexp]. *)
-val sexp_of_t : ('a -> Sexp.t) -> 'a t -> Sexp.t
 
 (** [create f] calls [f i], where [i] is empty ivar.  [create] returns a deferred that
     becomes determined when [f] fills [i]. *)
-val create : ('a Basic.Ivar.t -> unit) -> 'a t
+val create : ('a Ivar.t -> unit) -> 'a t
 
 (** [upon t f] will run [f v] at some point after [t] becomes determined with value
     [v]. *)
 
     In general, for deferreds that are allocated by [>>=] to be garbage collected quickly,
     it is sufficient that the allocating bind be executed in tail-call position of the
-    right-hand side of an outer bind.  A rigorous statement of this property and a proof
-    can be found in the [raw_deferred.mli]. *)
+    right-hand side of an outer bind. *)
 include Monad with type 'a t := 'a t
 
 module Infix : sig

base/async/core/lib/deferred_intf.ml

 open Core.Std
 
-module Deferred = Basic.Deferred
+module Deferred = struct
+  type 'a t = ('a, Execution_context.t) Raw_deferred.t
+end
 
 type how = [ `Parallel | `Sequential ]
 
 
   type ('k, 'v) t
 
-  val filter_mapi:
-    f:(key:'k -> data:'v1 -> 'v2 option Deferred.t)
-    -> ('k, 'v1) t
+  val filter_mapi
+    :  ('k, 'v1) t
+    -> f:(key:'k -> data:'v1 -> 'v2 option Deferred.t)
     -> ('k, 'v2) t Deferred.t
 
 end

base/async/core/lib/events.ml

       end;
       assert (not (is_ready t event)));
   with
-  | exn -> fail "invariant failed" (exn, t) <:sexp_of< exn * a t >>
+  | exn -> failwiths "invariant failed" (exn, t) <:sexp_of< exn * a t >>
 ;;
 
 let create ~now =

base/async/core/lib/execution_context.ml

 open Core.Std
 open Import
 
-module Scheduler = Basic.Scheduler
+module Monitor = Raw_monitor
 
-include Basic.Execution_context
+type t =
+  { block_group : Block_group.t;
+    monitor : t Monitor.t;
+    priority : Priority.t;
+    backtrace_history : Backtrace.t list;
+  }
+with fields, sexp_of
+
+let bogus =
+  { block_group = Block_group.bogus;
+    monitor = Monitor.bogus ();
+    priority = Priority.normal;
+    backtrace_history = [];
+  }
+;;
 
 let create_like ?block_group ?monitor ?priority t =
   let get o z = match o with None -> z | Some x -> x in
   { block_group = get block_group t.block_group;
-    monitor = get monitor t.monitor;
-    priority = get priority t.priority;
+    monitor     = get monitor     t.monitor    ;
+    priority    = get priority    t.priority   ;
+    backtrace_history = t.backtrace_history;
   }
 ;;
 
+let record_backtrace =
+  match Backtrace.get with
+  | Ok get ->
+    fun t ->
+      { t with
+        backtrace_history = get () :: t.backtrace_history;
+      }
+  | Error _ -> Fn.id
+;;

base/async/core/lib/handler.ml

 open Core.Std
 open Import
 
-include Basic.Handler
+module Handler = Raw_handler
+
+open Handler.T
+
+type 'a t = ('a, Execution_context.t) Handler.t
+
+include struct
+  open Handler
+  let filter = filter
+  let prepend = prepend
+end
+
+let create run =
+  { execution_context = Scheduler.current_execution_context ();
+    run;
+  }
+;;
+
+module Deferred = Raw_deferred.Scheduler_dependent (Raw_scheduler)
 
 let install t d =
-  let u = Basic.Deferred.install_removable_handler d t in
-  fun () -> Basic.Unregister.unregister u;
+  let u = Deferred.install_removable_handler d t in
+  fun () -> Unregister.unregister u;
 ;;
+
+let schedule t a = Scheduler.add_job t.execution_context t.run a

base/async/core/lib/import.ml

 open Core.Std
 
-let sexp_of_phantom _ = assert false
-
-let fail message a sexp_of_a = Error.raise (Error.create message a sexp_of_a)
+module type Basic_scheduler = sig
+  module Execution_context : sig type t with sexp_of end
+  val current_execution_context : unit -> Execution_context.t
+  val add_job : Execution_context.t Job.t -> unit
+end

base/async/core/lib/ivar.ml

-include Basic.Ivar
+open Core.Std
+open Import
+
+module Deferred  = Raw_deferred
+module Ivar      = Raw_ivar
+module Scheduler = Raw_scheduler
+
+include struct
+  open Ivar
+  let create      = create
+  let create_full = create_full
+  let equal       = equal
+  let is_empty    = is_empty
+  let is_full     = is_full
+  let peek        = peek
+end
+
+include Ivar.Scheduler_dependent (Scheduler)
+
+type 'a ivar = 'a t
+
+type 'a deferred = ('a, Execution_context.t) Raw_deferred.t
+
+let read = Deferred.of_ivar
+
+let fill_if_empty t v = if is_empty t then fill t v
+
+include Bin_prot.Utils.Make_binable1 (struct
+  module Binable = struct
+    type 'a t = 'a option with bin_io
+  end
+
+  type 'a t = 'a ivar
+
+  let to_binable t = peek t
+
+  let of_binable = function
+    | None -> create ()
+    | Some a -> create_full a
+  ;;
+end)

base/async/core/lib/ivar.mli

+(** An ivar is a write-once cell that can be empty or full (i.e. hold a single value) that
+    one can [read] and to obtain a deferred that becomes determined when the ivar is
+    filled.  An ivar is similar to an [a option ref], except it is an error to fill an
+    already full ivar. *)
+
 open Core.Std
 
-(** An ivar is a write-once cell that can be empty or full (i.e. hold a single value).  It
-    is like the type ['a option ref], except it is an error to fill an already full
-    ivar. *)
-type 'a t = 'a Basic.Ivar.t with bin_io, sexp_of
-
-val sexp_of_t : ('a -> Sexplib.Sexp.t) -> 'a t -> Sexplib.Sexp.t
+type 'a t = ('a, Execution_context.t) Raw_ivar.t with bin_io, sexp_of
+type 'a detailed = 'a t with sexp_of
+type 'a deferred = ('a, Execution_context.t) Raw_deferred.t
 
 (** [equal t t'] is physical equality of [t] and [t']. *)
 val equal : 'a t -> 'a t -> bool
 
 (** [read t] returns a deferred that becomes enabled with value [v] after the ivar is
     filled with [v]. *)
-val read : 'a t -> 'a Deferred.t
+val read : 'a t -> 'a deferred

base/async/core/lib/job.ml

 open Core.Std
 
-type t = unit -> unit
+(* We use a first class module to represent a job because it allows us to use a single
+   4-word block to represent the job.  Without a first-class module, we would need to
+   hide the [a] type using a closure, e.g.:
 
-let sexp_of_t = sexp_of_a
+     type 'execution_context t =
+       { execution_context: 'execution_context
+         run : unit -> unit;
+       }
 
-let run t = t ()
+   But this would then require allocating a 3-word block for [t], and a 4-word block
+   for [run]. *)
 
-let create work = work
+module type T = sig
+  type a
+  type execution_context
+  val execution_context : execution_context
+  val f : a -> unit
+  val a : a
+end
+
+type 'execution_context t = (module T with type execution_context = 'execution_context)
+
+let execution_context (type execution_context) t =
+  let module T = (val t : T with type execution_context = execution_context) in
+  T.execution_context;
+;;
+
+let run (type execution_context) t =
+  let module T = (val t : T with type execution_context = execution_context) in
+  T.f T.a;
+;;
+
+let create (type a_) (type execution_context_) execution_context f a =
+  let module T = struct
+    type execution_context = execution_context_
+    type a = a_
+    let execution_context = execution_context
+    let f = f
+    let a = a
+  end in
+  (module T : T with type execution_context = execution_context_)
+;;

base/async/core/lib/job.mli

-type t with sexp_of
+open Core.Std
 
-val create : (unit -> unit) -> t
+type 'execution_context t
 
-val run : t -> unit
+val create : 'execution_context -> ('a -> unit) -> 'a -> 'execution_context t
+val execution_context : 'execution_context t -> 'execution_context
+val run : _ t -> unit

base/async/core/lib/jobs.ml

 end
 
 module Jobs_at_priority : sig
-  type t with sexp_of
+  type 'job t with sexp_of
 
-  val create : unit -> t
-  val add : t -> Job.t -> unit
-  val start_cycle : t -> max_num_jobs:int -> unit
-  val is_empty : t -> bool
-  val get : t -> Job.t list
-  val length : t -> int
+  val create : unit -> _ t
+  val add : 'job t -> 'job -> unit
+  val start_cycle : _ t -> max_num_jobs:int -> unit
+  val is_empty : _ t -> bool
+  val get : 'job t -> 'job list
+  val length : _ t -> int
 end = struct
-  type t =
-    { jobs : Job.t Queue.t;
+  type 'job t =
+    { jobs : 'job Queue.t;
       mutable jobs_left_this_cycle : int;
     }
   with sexp_of
   ;;
 end
 
-type t =
-  { normal : Jobs_at_priority.t;
-    low : Jobs_at_priority.t;
+type 'job t =
+  { normal : 'job Jobs_at_priority.t;
+    low : 'job Jobs_at_priority.t;
   }
 with sexp_of
 

base/async/core/lib/jobs.mli

 end
 
 (* type of the jobs *)
-type t with sexp_of
+type 'job t with sexp_of
 
-val create : unit -> t
+val create : unit -> _ t
 
 (** [length t] returns the number of waiting jobs *)
-val length : t -> int
+val length : _ t -> int
 
 (** [is_empty t] returns true if there are no waiting jobs. *)
-val is_empty : t -> bool
+val is_empty : _ t -> bool
 
-val add : t -> Priority.t -> Job.t -> unit
+val add : 'job t -> Priority.t -> 'job -> unit
 
 (** [start_cycle t ~max_num_jobs_per_priority] enables subsequent calls of [get] to
     return up to [max_num_jobs_per_priority] jobs of each priority level. *)
-val start_cycle : t -> max_num_jobs_per_priority:int -> unit
+val start_cycle : _ t -> max_num_jobs_per_priority:int -> unit
 
 (** [get t] gets all the jobs with the highest priority currently available, subject to
     [max_num_jobs_per_priority].  The obtained jobs are removed from [t]. *)
-val get : t -> Job.t list
+val get : 'job t -> 'job list

base/async/core/lib/monitor.ml

-include Raw_monitor
+open Core.Std
+open Import
+open Deferred_std
+
+module Monitor = Raw_monitor
+module Scheduler = Raw_scheduler
+module Stream = Raw_stream
+
+type 'execution_context t_ = 'execution_context Monitor.t =
+  { name_opt : string option;
+    id : int;
+    parent : 'execution_context t_ option;
+    errors : (exn, 'execution_context) Raw_tail.t;
+    mutable has_seen_error : bool;
+    mutable someone_is_listening : bool;
+  }
+with fields
+
+type t = Execution_context.t Monitor.t with sexp_of
+
+type monitor = t with sexp_of
+
+let debug = Debug.debug
+
+let current_execution_context () = Scheduler.current_execution_context ()
+
+let current () = (current_execution_context ()).Execution_context.monitor
+
+let name t =
+  match t.name_opt with
+  | Some s -> s
+  | None -> Int.to_string (id t)
+;;
+
+let next_id =
+  let r = ref 0 in
+  fun () -> r := !r + 1; !r
+;;
+
+let create_with_parent ?name:name_opt parent =
+  let t =
+    { name_opt;
+      id = next_id ();
+      parent;
+      errors = Tail.create ();
+      has_seen_error = false;
+      someone_is_listening = false;
+    }
+  in
+  if debug then Debug.log "created monitor" t <:sexp_of< t >>;
+  t
+;;
+
+let main = create_with_parent ~name:"main" None
+
+let errors t =
+  t.someone_is_listening <- true;
+  Tail.collect t.errors;
+;;
+
+let error t =
+  let module S = Stream in
+  S.next (Tail.collect t.errors)
+  >>| function
+  | S.Nil -> assert false
+  | S.Cons (error, _) -> error
+;;
+
+let create ?name () =
+  let parent = current () in
+  create_with_parent ?name (Some parent);
+;;
+
+module Exn_for_monitor = struct
+  type t =
+    { exn : exn;
+      backtrace : string sexp_list;
+      backtrace_history : Backtrace.t sexp_list;
+      monitor : monitor;
+    }
+  with fields, sexp_of
+end
+
+exception Error_ of Exn_for_monitor.t with sexp
+
+let extract_exn exn =
+  match exn with
+  | Error_ error -> error.Exn_for_monitor.exn
+  | exn -> exn
+;;
+
+let send_exn t ?backtrace exn =
+  let backtrace =
+    let split backtrace = String.split backtrace ~on:'\n' in
+    match backtrace with
+    | None -> []
+    | Some `Get -> split (Exn.backtrace ())
+    | Some (`This b) -> split b
+  in
+  let backtrace_history =
+    (current_execution_context ()).Execution_context.backtrace_history
+  in
+  let is_shutdown = exn = Monitor.Shutdown in
+  let exn =
+    match exn with
+    | Error_ _ -> exn
+    | _ -> Error_ { Exn_for_monitor. exn; backtrace; backtrace_history; monitor = t }
+  in
+  t.has_seen_error <- true;
+  let rec loop t =
+    if t.someone_is_listening then
+      Tail.extend t.errors exn
+    else
+      match t.parent with
+      | Some t' -> loop t'
+      | None ->
+        (* Ignore shutdown errors that reach the top. *)
+        if not is_shutdown then
+          (* Do not change this branch to print the exception or to exit.  Having the
+             scheduler raise an uncaught exception is the necessary behavior for programs
+             that call [Scheduler.go] and want to handle it. *)
+          Scheduler.t.Scheduler.uncaught_exception <-
+            Some (Error.create "unhandled exception" (`Pid (Unix.getpid ()), exn)
+                    (<:sexp_of< [ `Pid of Pid.t ] * exn >>));
+  in
+  loop t
+;;
+
+module Exported_for_scheduler = struct
+  let within_context context f =
+    Scheduler.with_execution_context context
+      ~f:(fun () ->
+        match Result.try_with f with
+        | Ok x -> Ok x
+        | Error exn ->
+          send_exn context.Execution_context.monitor exn ~backtrace:`Get;
+          Error ())
+  ;;
+
+  type 'a with_options =
+    ?block_group:Block_group.t