ysulsky avatar ysulsky committed fc10377

auto export

Comments (0)

Files changed (305)

+# bin/make-hgignore processes this .hgignore.in, along with all the other .hgignore.in
+# files in the hg repo to produce a single .hgignore file at the root.  Please do not add
+# directory-specific files to ignore here.  Instead, add them to the .hgignore.in file in
+# the directory where the file you want to ignore lives.
+
+syntax: regexp
+\.cm(x|xa|a|o|i)$
+\.exe$
+\.bc$
+\.annot$
+\.o$
+\.so(.\d)?$
+\.s$
+\.a$
+\.depend$
+\._.+$
+\.sp.t$
+config.status$
+mycaml$
+\.DS_Store$
+tmtags$
+
+^\.git/.*$
+^\.transdeps/.*$
+^chroot_test_tmp/.*$
+
+^lib/packs(/.+\.pack)?$
+^lib/.+/deps
+^lib/.+/.+\.mli
+^test-results.*$
+
+syntax: glob
+personal_compilation_ignores
+*.cp4c
+*~
+*.auto.mli
+*.aux
+*.digest
+*.fls
+*.haux
+*.htoc
+*.log
+*.omc
+*.orig
+*.rej
+*.spot
+*.toc
+.*.sw[pabcdef]
+.mydeps
+inline_tests.ml
+inline_tests_runner.ml
+TAGS
+order_file_*_common.ml
+*_db_pnl_common.ml
+order_file_*_eval.ml
+
+
+# for new omake rules
+*.deps
+*.ml.d
+*.mli.d
+*.libdeps
+*.objdeps
+*.hg_version.c
+*.build_info.c
+hg_version.out
+
+# specific files
+.last_build_env
+.omakedb
+.omakedb.lock
+spec.hg
+libdeps.sexp
+libdeps.dot
+libdeps.ps
+libdeps.corrected
+ocaml_path
+.ocaml-3.10
+\#*
+.\#*
+
+# Please do not add directory-specific files to ignore here.  Instead, add them to the
+# .hgignore.in file in the directory where the file you want to ignore lives.

base/async/.hgignore.in

+INSTALL
+Makefile
+myocamlbuild.ml
+configure
+lib/async.mlpack
+lib/async.odocl
+setup.ml
+setup.log
+setup.data
+_oasis
+_build
+_tags
+lib/META
+lib/libasync.clib

base/async/core/.hgignore.in

+INSTALL
+Makefile
+_oasis
+_tags
+configure
+lib/META
+lib/async_core.mlpack
+myocamlbuild.ml
+setup.data
+setup.ml
+_build
+

base/async/core/lib/basic.ml

 
   let equal (t : _ t) t' = phys_equal t t'
 
-  let rec sexp_of_t sexp_of_a i =
-    match !i with
-    | Full v -> Sexp.List [Sexp.Atom "Full"; sexp_of_a v]
-    | Empty
-    | Empty_one_handler _
-    | Empty_many_handlers _ -> Sexp.Atom "Empty"
-    | Indir v -> sexp_of_t sexp_of_a v
+  (* Returns a non-indirected ivar, and path compresses any indirections. *)
+  let rec tail_recursive_squash t ac =
+    match !t with
+    | Indir t' -> tail_recursive_squash t' (t :: ac)
+    | _ ->
+      let indir = Indir t in
+      List.iter ac ~f:(fun ancestor ->
+        match !ancestor with
+        | Indir already_points_to ->
+          (* Don't bother mutating if it didn't change. *)
+          if not (phys_equal t already_points_to) then ancestor := indir;
+        | _ -> assert false (* We only put [Indir]s on [ac] *)
+      );
+      t
   ;;
 
-  (* Returns a non-indirected ivar, and path compresses any indirections.  May overflow
-     stack if you pass it an ivar chain that is too long. *)
-  let rec squash t =
-    match !t with
-    | Indir r ->
-      let r' = squash r in
-      (* Don't bother mutating if it didn't change. *)
-      if not (phys_equal r r') then t := Indir r';
-      r'
-    | _ -> t
+  let rec squash t = tail_recursive_squash t []
+
+  let rec sexp_of_t sexp_of_a t =
+    match !(squash t) with
+    | Indir _ -> assert false (* fulfilled by squash *)
+    | Full v -> Sexp.List [Sexp.Atom "Full"; sexp_of_a v]
+    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> Sexp.Atom "Empty"
   ;;
 
   let peek t =
     match !(squash t) with
     | Indir _ -> assert false (* fulfilled by squash *)
     | Full a -> Some a
-    | Empty
-    | Empty_one_handler _
-    | Empty_many_handlers _ -> None
+    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> None
   ;;
 
   let is_empty t =
     match !(squash t) with
     | Indir _ -> assert false (* fulfilled by squash *)
     | Full _ -> false
-    | Empty
-    | Empty_one_handler _
-    | Empty_many_handlers _ -> true
+    | Empty | Empty_one_handler _ | Empty_many_handlers _ -> true
   ;;
 
   let is_full t = not (is_empty t)
 
   module Pretty = struct
     type one =
-      { name : string option;
+      { name_opt : string option;
         id : int;
         has_seen_error : bool;
         someone_is_listening : bool
   end
 
   let rec to_pretty t =
-    let rec loop t ac =
-      let ac =
-        { Pretty.
-          name = t.name_opt;
-          id = t.id;
-          has_seen_error = t.has_seen_error;
-          someone_is_listening = t.someone_is_listening;
-        }
-        :: ac
-      in
+    let rec loop
+        { name_opt; id; parent = _; errors = _; has_seen_error; someone_is_listening }
+        ac =
+      let ac = { Pretty. name_opt; id; has_seen_error; someone_is_listening } :: ac in
       match t.parent with
       | None -> List.rev ac
       | Some t -> loop t ac
       mutable num_jobs_run : int;
       mutable cycle_count : int;
       mutable cycle_start : Time.t;
-      mutable jobs_left : bool;
       cycle_times : Time.Span.t Tail.t;
       cycle_num_jobs : int Tail.t;
       events : Clock_event.t Events.t;
       cycle_times = Tail.create ();
       cycle_num_jobs = Tail.create ();
       events = Events.create ~now;
-      jobs_left = false;
     }
   ;;
 
   let invariant () =
+    assert (t.num_jobs_run >= 0);
+    assert (t.cycle_count >= 0);
     Events.invariant t.events;
     try
       Events.iter t.events ~f:(fun events_event ->
       t.current_execution_context <- execution_context;
   ;;
 
-  let add_job execution_context work =
+  let add_job execution_context f a =
     let job = Job.create (fun () ->
       set_execution_context execution_context;
-      work ())
+      f a)
     in
     let priority = execution_context.priority in
-    if debug then
-      Debug.print "enqueing job %d with priority %s" (Job.id job)
-        (Priority.to_string priority);
+    if debug then Debug.log "enqueing job" priority <:sexp_of< Priority.t >>;
     Jobs.add t.jobs priority job;
   ;;
 
     }
   ;;
 
-  let schedule t v = Scheduler.add_job t.execution_context (fun () -> t.run v)
+  let schedule t v = Scheduler.add_job t.execution_context t.run v
 
   let priority t = t.execution_context.priority
 
     | Empty -> repr := Full v
     | Empty_one_handler (run, context) ->
       repr := Full v;
-      Scheduler.add_job context (fun () -> run v)
+      Scheduler.add_job context run v;
     | Empty_many_handlers handlers ->
       repr := Full v;
       Bag.iter handlers ~f:(fun h -> Handler.schedule h v);
   ;;
 
-  let fill_if_empty t v = if not (is_full t) then fill t v
+  let fill_if_empty t v = if is_empty t then fill t v
 
   include Bin_prot.Utils.Make_binable1 (struct
     module Binable = struct
       let cell = repr t in
       match !cell with
       | Indir _ -> assert false (* fulfilled by repr *)
-      | Full v -> Scheduler.add_job current_execution_context (fun () -> run v)
+      | Full v -> Scheduler.add_job current_execution_context run v
       | Empty -> cell := Empty_one_handler (run, current_execution_context)
       | Empty_one_handler (run', execution_context') ->
         let bag = Bag.create () in

base/async/core/lib/basic.mli

       mutable num_jobs_run : int;
       mutable cycle_count : int;
       mutable cycle_start : Time.t;
-      mutable jobs_left : bool;
       cycle_times : Time.Span.t Tail.t;
       cycle_num_jobs : int Tail.t;
       events : Clock_event.t Events.t;
     }
 
   val t : t
-  val add_job                    : Execution_context.t -> (unit -> unit) -> unit
+  val add_job                    : Execution_context.t -> ('a -> unit) -> 'a -> unit
   val current_execution_context  : unit -> Execution_context.t
   val set_execution_context      : Execution_context.t -> unit
   val invariant : unit -> unit

base/async/core/lib/block_group.ml

 (* Each block group reserves a certain number of threads that it may concurrently use.
    Async guarantees the threads will be available by keeping the sum of the
    "num_reserved_threads" over all block groups less than some maximum, which is a small
-   constant (like 100) for which we are confident that we can simultaneously have that many
-   threads. *)
+   constant (like 100) for which we are confident that we can simultaneously have that
+   many threads. *)
 
 module Work = struct
   type t =
-    {
-      (* When this work starts running, the name of the thread will be set
+    { (* When this work starts running, the name of the thread will be set
          (via Linux_ext.pr_set_name) to this name if provided. *)
       set_thread_name_to : string option;
       doit : unit -> [ `Stop | `Continue ];

base/async/core/lib/debug.ml

 
 (* Calls to [Debug.print] should look like [if debug then Debug.print ...]. *)
 
-let log (type a) message a sexp_of_a =
+let log message a sexp_of_a =
   eprintf "%s\n%!"
     (Sexp.to_string_mach
        (<:sexp_of< Pid.t * int * Time.t * string * a >>
 ;;
 
 let log_string message = log message () <:sexp_of< unit >>
-
-let print f = Printf.ksprintf log_string f

base/async/core/lib/deferred.ml

 
 module Map = struct
 
-  type ('a, 'b) t = ('a, 'b) Map.t
+  type ('a, 'b) t = ('a, 'b) Map.Poly.t
 
   let filter_mapi ~f t =
-    List.fold (Map.to_alist t) ~init:Map.empty ~f:(fun map (key, data) ->
+    List.fold (Map.to_alist t) ~init:Map.Poly.empty ~f:(fun map (key, data) ->
       f ~key ~data >>= function
       | Some data2 -> return (Map.add ~key ~data:data2 map)
       | None -> return map)

base/async/core/lib/deferred.mli

 
 module Queue : Deferred_sequence with type 'a t = 'a Queue.t
 
-module Map : Deferred_map with type ('k, 'v) t = ('k, 'v) Map.t
+module Map : Deferred_map with type ('k, 'v) t = ('k, 'v) Map.Poly.t
 
 (** [whenever t] ignores t completely.  It is like [Fn.ignore], but is
     more constrained because it requires a deferred.

base/async/core/lib/events.ml

   type 'a t =
     { at : Time.t;
       value : 'a;
-      mutable heap_element : 'a t Heap.heap_el option sexp_opaque;
+      mutable heap_element : 'a t Heap.heap_el sexp_opaque option;
     }
   with fields, sexp_of
 
 
 let is_ready t event = Time.(event.Event.at <= t.now)
 
-let invariant (type a) t =
+let invariant t =
   try
-    Heap.iter t.events ~f:(fun event -> assert (not (is_ready t event)));
+    Heap.iter t.events ~f:(fun event ->
+      begin match event.Event.heap_element with
+      | None -> assert false
+      | Some heap_el -> assert (Heap.heap_el_mem t.events heap_el)
+      end;
+      assert (not (is_ready t event)));
   with
   | exn -> fail "invariant failed" (exn, t) <:sexp_of< exn * a t >>
 ;;

base/async/core/lib/job.ml

 open Core.Std
 
-type t =
-  { id : int;
-    work : unit -> unit;
-  }
-with sexp_of
+type t = unit -> unit
 
-let id t = t.id
+let sexp_of_t = sexp_of_a
 
-let run t = t.work ()
+let run t = t ()
 
-let next_id =
-  let r = ref 0 in
-  (fun () -> incr r; !r);
-;;
-
-let create work =
-  { id = next_id ();
-    work;
-  }
-;;
+let create work = work

base/async/core/lib/job.mli

 
 val create : (unit -> unit) -> t
 
-val id : t -> int
-
 val run : t -> unit

base/async/core/lib/raw_monitor.ml

       someone_is_listening = false;
     }
   in
-  if debug then
-    Debug.print "creating monitor %s with parent %s"
-      (name t) (match parent with None -> "<none>" | Some p -> name p);
+  if debug then Debug.log "creating monitor" t <:sexp_of< t >>;
   t
 ;;
 
   Scheduler.add_job
     (Execution_context.create_like (Scheduler.current_execution_context ())
        ?block_group ?monitor ?priority)
-    work
+    work ()
 ;;
 
 let schedule' ?block_group ?monitor ?priority work =
     Scheduler.add_job
       (Execution_context.create_like (Scheduler.current_execution_context ())
          ?block_group ?monitor ?priority)
-      (fun () -> upon (work ()) (fun a -> Ivar.fill i a)))
-
+      (fun () -> upon (work ()) (fun a -> Ivar.fill i a))
+      ())
 ;;
 
 let current () = (Scheduler.current_execution_context ()).Execution_context.monitor

base/async/core/lib/scheduler.ml

 
 let cycle_count () = t.cycle_count
 
-let jobs_left () = t.jobs_left
-
 let num_jobs_run () = t.num_jobs_run
 
 let set_max_num_jobs_per_priority_per_cycle int =
 ;;
 
 let start_cycle ~now =
-  if debug then Debug.print "start_cycle";
+  if debug then Debug.log_string "start_cycle";
   t.cycle_count <- t.cycle_count + 1;
   t.cycle_start <- now;
 ;;
       match jobs with
       | [] -> ()
       | job::jobs ->
-        if debug then Debug.print "running job %d" (Job.id job);
+        if debug then Debug.log_string "running job";
         begin
           try
             t.num_jobs_run <- t.num_jobs_run + 1;
     | `Ok events -> List.iter events ~f:Clock_event.fire
     end;
     do_one_cycle ();
-    t.jobs_left <- not (Jobs.is_empty t.jobs);
+    let result = if Jobs.is_empty t.jobs then `No_jobs_remain else `Jobs_remain in
     if debug then
       Debug.log "cycle finished"
-        (t.jobs_left, t.uncaught_exception, is_some (Events.next_upcoming t.events))
-        <:sexp_of< bool * exn option * bool >>;
+        (t.uncaught_exception, is_some (Events.next_upcoming t.events))
+        <:sexp_of< exn option * bool >>;
     (* This can potentially add more jobs if somebody is listening to cycle_times
        stream, so we have to check [Jobs.is_empty] before. *)
     Tail.extend t.cycle_times (Time.diff now t.cycle_start);
     Tail.extend t.cycle_num_jobs (t.num_jobs_run - num_jobs_run_before_cycle);
+    result
 ;;
-

base/async/core/lib/scheduler.mli

 val set_block_group            : Block_group.t -> unit
 val set_main_execution_context : Execution_context.t -> unit
 val with_execution_context     : Execution_context.t -> f:(unit -> 'a) -> 'a
-val add_job                    : Execution_context.t -> (unit -> unit) -> unit
+val add_job                    : Execution_context.t -> ('a -> unit) -> 'a -> unit
 val main_execution_context     : unit -> Execution_context.t
 val cycle_start : unit -> Time.t
 val start_cycle : now:Time.t -> unit
-val finish_cycle : now:Time.t -> unit
-val jobs_left : unit -> bool
+val finish_cycle : now:Time.t -> [ `Jobs_remain | `No_jobs_remain ]
 val next_upcoming_event : unit -> Time.t option
 val uncaught_exception : unit -> exn option
 val num_pending_jobs : unit -> int

base/async/core/lib/std.ml

 let at           = Clock    .at
 let catch        = Monitor  .catch
 let every        = Clock    .every
-let protect      = Monitor  .protect
 let schedule     = Scheduler.schedule
 let schedule'    = Scheduler.schedule'
 let try_with     = Monitor  .try_with

base/async/examples/countdown.ml

     shutdown 0
   else begin
     printf "%d\n" i; 
-    upon (after (Time.Span.of_sec 1.)) (fun _ -> loop (i - 1))
+    upon (after (sec 1.)) (fun _ -> loop (i - 1))
   end
 
 let () = loop 10

base/async/examples/finalizer.ml

   if i = 0 then
     shutdown 0
   else
-    upon (Clock.after (Time.Span.of_sec 1.)) (fun () ->
+    upon (Clock.after (sec 1.)) (fun () ->
       compact ();
       loop (i - 1))
 ;;

base/async/examples/monitors.ml

   Stream.iter (Monitor.errors m2) ~f:(fun _ -> printf "caught error\n");
   schedule ~monitor:m2 (fun () ->
     let m1 = Monitor.create ~name:"test monitor 1" () in
-    Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0))
+    Stream.iter (Clock.at_intervals (sec 1.0))
       ~f:(fun _ ->
         try failwith "error!"
         with _ -> Monitor.send_exn m1 (Failure "error!") ~backtrace:`Get));

base/async/examples/priority.ml

 
 let () = one "low" low
 let () = one "normal" normal
-let () = upon (Clock.after (Time.Span.of_sec 1.)) (fun () -> shutdown 0)
+let () = upon (Clock.after (sec 1.)) (fun () -> shutdown 0)
 
 let () = never_returns (Scheduler.go ())

base/async/examples/process.ml

     upon (In_process.run (fun () -> 1 + 1)) (fun x ->
       printf "the answer is %d\n" x)
   in
-  Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0)) ~f:(fun _ -> f ());
+  Stream.iter (Clock.at_intervals (sec 1.0)) ~f:(fun _ -> f ());
   never_returns (Scheduler.go ())

base/async/examples/process_stream.ml

   let (_, out, _) = 
     In_process.full 
       (fun ~input:_ ~output ->
-         Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0)) ~f:(fun _ ->
+         Stream.iter (Clock.at_intervals (sec 1.0)) ~f:(fun _ ->
            Tail.extend output (Time.now ()));
          Deferred.never ())
   in

base/async/examples/socket.ml

               loop bytes_read)
       in
       loop 0));
-  upon (Clock.after (Time.Span.of_sec 2.)) (fun () ->
+  upon (Clock.after (sec 2.)) (fun () ->
     let s = Socket.create Socket.Type.tcp in
     upon (Socket.connect s (Socket.Address.inet
                                (Unix.Inet_addr.of_string "127.0.0.1") ~port))

base/async/examples/write_forever.ml

       let rec loop i =
         eprintf "about to write %d\n" i;
         Writer.write writer buf;
-        upon (Clock.after (Time.Span.of_sec 1.)) (fun () -> loop (i + 1));
+        upon (Clock.after (sec 1.)) (fun () -> loop (i + 1));
       in
       loop 0
     )

base/async/extra/.hgignore.in

+INSTALL
+Makefile
+_oasis
+_tags
+configure
+lib/META
+lib/async_extra.mlpack
+myocamlbuild.ml
+setup.data
+setup.ml
+_build
+

base/async/extra/lib/file_tail.ml

         for i = 0 to Buffer.length buffer - 1 do
           assert (Buffer.nth buffer i <> newline)
         done;
-    with exn ->
-      let module E = Core.Std.Error in
-      E.fail "invariant failed" (exn, t) <:sexp_of< exn * t >>
+    with exn -> failwiths "invariant failed" (exn, t) <:sexp_of< exn * t >>
   ;;
 
   let create ~break_on_lines =
     (* It is possible to have [t.file_pos > t.file_len], since we read as much as
        we can into [t.read_buf] and the file may have grown in between the time we stat
        and the time we read. *)
-  with exn ->
-    let module E = Core.Std.Error in
-    E.fail "invariant failed" (exn, t) <:sexp_of< exn * t >>
+  with exn -> failwiths "invariant failed" (exn, t) <:sexp_of< exn * t >>
 ;;
 
 let need_to_read t = t.file_pos < t.file_len

base/async/extra/lib/rpc.ml

   let v1 = [ 1 ]
 
   let negotiate_version t1 t2 =
-    Set.max_elt (Set.inter (Set.of_list t1) (Set.of_list t2))
+    Set.max_elt (Set.inter (Int.Set.of_list t1) (Int.Set.of_list t2))
 end
 
 module Query = struct
   let create ~implementations:i's ~on_unknown_rpc =
     let module I = Implementation in
     (* Make sure the tags are unique. *)
-    let _, dups = List.fold i's ~init:(Set.empty, Set.empty) ~f:(fun (s, dups) i ->
+    let _, dups = List.fold i's ~init:(Set.Poly.empty, Set.Poly.empty) ~f:(fun (s, dups) i ->
       let tag =
         { Implementation.Description.name = Rpc_tag.to_string i.I.tag;
           version = i.I.version;
     Deferred.whenever (Writer.close t.writer >>= fun () -> Reader.close t.reader)
 
   let closed t = Ivar.read t.eof
+  let already_closed t = Ivar.is_full t.eof
 
   let create ?server ~connection_state ?max_message_size:max_len reader writer =
     let server = match server with None -> Server.null () | Some s -> s in
           in
           let last_heartbeat = ref (Time.now ()) in
           let heartbeat () =
-            if Time.diff (Time.now ()) !last_heartbeat > sec 30. then begin
-              close t;
-              Error.raise Error.Connection_closed
-            end;
-            Writer.write_bin_prot t.writer Message.bin_writer_t Message.Heartbeat
+            if not (already_closed t) then begin
+              if Time.diff (Time.now ()) !last_heartbeat > sec 30. then begin
+                close t;
+                Error.raise Error.Connection_closed
+              end;
+              Writer.write_bin_prot t.writer Message.bin_writer_t Message.Heartbeat
+            end
           in
           let rec loop current_read =
             Deferred.enabled [
                         })));
               Error.raise error);
           Scheduler.within ~monitor (fun () ->
-               Clock.every ~stop:(Ivar.read t.eof) (Span.of_sec 10.0) heartbeat;
+               Clock.every ~stop:(Ivar.read t.eof) (sec 10.) heartbeat;
                loop (read_message ()))
              end;
           Ok t
             | Ok initial ->
               let pipe_r, pipe_w = Pipe.create () in
               Ivar.fill ivar (Ok (Ok (query_id, initial, pipe_r)));
+              Connection.closed conn >>> (fun () -> Pipe.close pipe_w);
               `Replace (read_updates pipe_w)
           end
         end

base/async/extra/lib/tcp.ml

 module Host = Unix.Host
 module Socket = Unix.Socket
 
-let create_socket () =
-  let s = Socket.create Socket.Type.tcp in
+let create_socket sock_type =
+  let s = Socket.create sock_type in
   Unix.set_close_on_exec (Unix.Socket.fd s);
   s
 ;;
     Unix.Inet_addr.of_string_or_getbyname host
     >>> fun inet_addr ->
     let addr = Socket.Address.inet inet_addr ~port in
-    let s = create_socket () in
+    let s = create_socket Socket.Type.tcp in
     close_sock_on_error s (fun () ->
       Socket.connect_interruptible s addr ~interrupt)
     >>> function
 
 exception Tcp_server_negative_max_connections of int with sexp
 
-let serve ?(max_connections=10_000) ?max_pending_connections ?max_buffer_age ~port
+let serve_gen ?(max_connections=10_000) ?max_pending_connections ?max_buffer_age 
+    ~sock_type
+    ~sock_addr
     ~on_handler_error handler =
   Deferred.create (fun ready ->
     if max_connections <= 0 then
       raise (Tcp_server_negative_max_connections max_connections);
-    let s = create_socket () in
+    let s = create_socket sock_type in
     close_sock_on_error s (fun () ->
       Socket.setopt s Socket.Opt.reuseaddr true;
-      Socket.bind s (Socket.Address.inet_addr_any ~port)
+      Socket.bind s sock_addr
       >>| Socket.listen ?max_pending_connections)
     >>> fun s ->
     Ivar.fill ready ();
     accept_loop ())
 ;;
 
+let serve ?max_connections ?max_pending_connections ?max_buffer_age ~port
+    ~on_handler_error handler = 
+  serve_gen ?max_connections ?max_pending_connections ?max_buffer_age
+    ~sock_type:Socket.Type.tcp
+    ~sock_addr:(Socket.Address.inet_addr_any ~port)
+    ~on_handler_error
+    handler
+
+let serve_unix ?max_connections ?max_pending_connections ?max_buffer_age ~file
+    ~on_handler_error handler = 
+  serve_gen ?max_connections ?max_pending_connections ?max_buffer_age
+    ~sock_type:Socket.Type.unix
+    ~sock_addr:(Socket.Address.unix file)
+    ~on_handler_error
+    handler
+
 let connect_sock ~host ~port = connect_sock ~host ~port ()
+;;

base/async/extra/lib/tcp.mli

 open Core.Std
 open Import
 
+
+(** [with_connection ~host ~port f] looks up host from a string (using DNS as needed),
+    connects, then calls [f] passing in a reader and a writer for the connected socket.
+    When the deferred returned by [f] is determined, or any exception is thrown, the
+    socket (and reader and writer) are closed.  The return deferred is fulfilled after f
+    has finished processing and the file descriptor for the socket is closed. *)
+val with_connection
+  :  ?interrupt: unit Deferred.t
+  -> ?max_buffer_age:Time.Span.t
+  -> host:string
+  -> port:int
+  -> (Reader.t -> Writer.t -> 'a Deferred.t)
+  -> 'a Deferred.t
+
 (** [connect_sock ~host ~port] opens a TCP connection to the specified hostname
     and port, returning the socket.
 
     Any errors in the connection will be reported to the monitor that was current
     when connect was called. *)
-val connect_sock :
-  host:string -> port:int -> ([ `Active ], Socket.inet) Socket.t Deferred.t
+val connect_sock
+  : host:string -> port:int -> ([ `Active ], Socket.inet) Socket.t Deferred.t
 
-(** [connect ~host ~port] is a convenience wrapper around [connect_sock] that
-    returns a reader and writer on the socket. *)
+(** [connect ~host ~port] is a convenience wrapper around [connect_sock] that returns a
+    reader and writer on the socket.  The reader and writer share a file descriptor, and
+    so closing one will affect the other.  In particular, closing the reader before
+    closing the writer will cause the writer to subsequently raise an exception when it
+    attempts to flush internally-buffered bytes to the OS, due to a closed fd.  You should
+    close the [Writer] first to avoid this problem.
+
+    If possible, use [with_conenection], which automatically handles closing. *)
 val connect
   :  ?max_buffer_age:Time.Span.t
   -> ?interrupt:unit Deferred.t
   -> unit
   -> (Reader.t * Writer.t) Deferred.t
 
-(** [with_connection ~host ~port f] looks up host from a string (using DNS as
-    needed), connects, then calls [f] passing in a reader and a writer for the
-    connected socket.  When the deferred returned by [f] is determined, or any
-    exception is thrown, the socket (and reader and writer) are closed.  The
-    return deferred is fulfilled after f has finished processing and the file
-    descriptor for the socket is closed. *)
-val with_connection :
-  ?interrupt: unit Deferred.t
-  -> ?max_buffer_age:Time.Span.t
-  -> host:string
-  -> port:int
-  -> (Reader.t -> Writer.t -> 'a Deferred.t)
-  -> 'a Deferred.t
-
 (** [serve ~port handler] starts a server on the specified port.  The return
     value becomes determined once the socket is ready to accept connections.
     [serve] calls [handler (address, reader, writer)] for each client that
 
     [on_handler_error] determines what happens if the handler throws an
     exception. *)
-val serve :
-  ?max_connections:int
+val serve
+  :  ?max_connections:int
   -> ?max_pending_connections:int
   -> ?max_buffer_age:Time.Span.t
   -> port:int
                       ]
   -> (Socket.inet -> Reader.t -> Writer.t -> unit Deferred.t)
   -> unit Deferred.t
+
+(** [serve_unix ~file handler] starts a server on the specified
+    file (unix domain socket).  Otherwise it behaves like [serve].
+*)
+val serve_unix
+  :  ?max_connections:int
+  -> ?max_pending_connections:int
+  -> ?max_buffer_age:Time.Span.t
+  -> file:string
+  -> on_handler_error:[ `Raise
+                      | `Ignore
+                      | `Call of (Socket.unix -> exn -> unit)
+                      ]
+  -> (Socket.unix -> Reader.t -> Writer.t -> unit Deferred.t)
+  -> unit Deferred.t

base/async/extra/lib/tcp_file.ml

     | `No      -> Deferred.return 0
     | `Unknown -> failwithf "unable to open file: %s" filename ()
     | `Yes ->
-      (* There is no strong case for using [exclusive:true] since locks are advisory, but
-         it expresses something in the code that we want to be true, and shouldn't hurt.
-      *)
+      (* There is no strong case for using [exclusive:true] here, since locks are
+         advisory, but it expresses something in the code that we want to be true, and
+         shouldn't hurt. *)
       Reader.with_file ~exclusive:true filename
         ~f:(fun r -> Pipe.drain_and_count (Reader.lines r))
   ;;
   let stop_serving = stop_serving_internal
 
   let close ?(stop_serving=true) t =
-    if t.File.closed then
-      Deferred.unit
+    if t.File.closed
+    then Deferred.unit
     else begin
       t.File.closed <- true;
       if stop_serving then stop_serving_internal t;
     gen_message t (fun writer -> Atomic_operations.schedule_message t msg writer)
 
   let write_sexp =
+    (* We use strings for Sexps whose string representations can fit on the minor heap and
+       Bigstring.t's for those that can't.  As of late 2011, the max size for something on
+       the minor heap is 128 words.  I think there may be a header, so we reduce the 1024
+       chars by a few to account for the possibility of a header. *)
+    let use_bigstring_if_longer_than = 1024 - 64 in
     let buf = Bigbuffer.create 1024 in
-    (fun t sexp ->
+    fun t sexp ->
       Bigbuffer.clear buf;
       Sexp.to_buffer_gen sexp ~buf ~add_char:Bigbuffer.add_char
         ~add_string:Bigbuffer.add_string;
-      schedule_message t (Bigbuffer.big_contents buf))
+      if Bigbuffer.length buf <= use_bigstring_if_longer_than then
+        write_message t (Bigbuffer.contents buf)
+      else
+        schedule_message t (Bigbuffer.big_contents buf);
+  ;;
 
   let with_file ?append filename ~f =
     open_file ?append filename >>= fun t ->

base/async/extra/lib/typed_tcp.ml

       >>> function
         | Error exn ->
           Monitor.send_exn (Monitor.current ()) exn;
-          Clock.after (Time.Span.of_sec 0.5) >>> loop
+          Clock.after (sec 0.5) >>> loop
         | Ok (sock, `Inet (addr, port)) ->
             (* Go ahead and accept more connections. *)
             loop ();

base/async/extra/lib/versioned_typed_tcp.ml

       raise (Bigsubstring_allocator_got_invalid_requested_size requested_size);
     if requested_size > Bigstring.length !buf then
       buf := (Bigstring.create
-                 (Int.max requested_size (2 * Bigstring.length !buf)));
+                (Int.max requested_size (2 * Bigstring.length !buf)));
     Bigsubstring.create !buf ~pos:0 ~len:requested_size
   in
   alloc
   val of_int : int -> t
   val to_int : t -> int
   val add : t -> int -> t
+
+  include Hashable with type t := t
 end = struct
   include Int
   let add = (+)
     client to the server.  *)
 module type Datumable = sig
   type datum
-
   include Versions
 
   (** [lookup_marshal_fun v] This function takes a version [v], and returns a
   include Arg
 
   type logfun =
-       [ `Recv of Recv.t | `Send of Send.t ]
+  [ `Recv of Recv.t | `Send of Send.t ]
     -> Remote_name.t
     -> time_sent_received:Time.t
     -> unit
 
     (** create a new server, and start listening *)
     val create :
-         ?logfun:logfun
+      ?logfun:logfun
       -> ?now:(unit -> Time.t) (** defualt: Scheduler.cycle_start *)
       -> ?enforce_unique_remote_name:bool (** remote names must be unique, default true *)
       -> ?is_client_ip_authorized:(string -> bool)
     val send_to_all : t
       -> Send.t
       -> [ `Sent (** sent successfuly to all clients *)
-         | `Dropped (** not sent successfully to any client *)
-         | `Partial_success (** sent to some clients *)] Deferred.t
+           | `Dropped (** not sent successfully to any client *)
+           | `Partial_success (** sent to some clients *)] Deferred.t
 
     (** [send_to_all_ignore_errors t msg] Just like [send_to_all] but with no error
         reporting. *)
 
     (** create a new (initially disconnected) client *)
     val create :
-         ?logfun:logfun
+      ?logfun:logfun
       -> ?now:(unit -> Time.t) (** defualt: Scheduler.cycle_start *)
       -> ?check_remote_name:bool (** remote name must match expected remote name. default true *)
       -> ip:string
   end
 end
 
-module Make (Z : Arg) : S
-  with module Send = Z.Send and
-       module Recv = Z.Recv and
-       module My_name = Z.My_name and
-       module Remote_name = Z.Remote_name =
-struct
+module Make (Z : Arg) :
+  S with
+    module Send = Z.Send and
+    module Recv = Z.Recv and
+    module My_name = Z.My_name and
+    module Remote_name = Z.Remote_name = struct
   include Z
 
   module Constants = struct
   open Constants
 
   type logfun =
-    [ `Recv of Recv.t | `Send of Send.t ]
+  [ `Recv of Recv.t | `Send of Send.t ]
     -> Remote_name.t
     -> time_sent_received:Time.t
     -> unit
 
   module Connection = struct
     type t = {
-      writer: Writer.t;
-      reader: Reader.t;
-      marshal_fun: Send.t marshal_fun;
-      unmarshal_fun: Recv.t unmarshal_fun;
+      writer : Writer.t;
+      reader : Reader.t;
+      marshal_fun : Send.t marshal_fun;
+      unmarshal_fun : Recv.t unmarshal_fun;
+      send_version : Version.t;
+      name : Remote_name.t;
       kill: unit -> unit;
     }
 
     choose
       [ choice (Clock.after span) (fun () -> `Timeout);
         choice (try_with f) (function
-          | Ok x -> `Ok x
-          | Error x -> `Error x)
+        | Ok x -> `Ok x
+        | Error x -> `Error x)
       ]
   ;;
 
                  })
   ;;
 
+  let send_raw ~writer ~hdr ~msg =
+    let module H = Message_header in
+    wrap_write_bin_prot
+      ~sexp:H.sexp_of_t ~tc:H.bin_t.Bin_prot.Type_class.writer ~writer
+      ~name:"send" hdr;
+    Writer.write_bigsubstring writer msg
+  ;;
+
   let send_no_flush =
     let module C = Connection in
     let module H = Message_header in
-    let send ~now ~con msg =
-      let hdr = {H.time_stamp = now; body_length = Bigsubstring.length msg} in
-      wrap_write_bin_prot
-        ~sexp:H.sexp_of_t ~tc:H.bin_t.Bin_prot.Type_class.writer ~writer:con.C.writer
-        ~name:"send" hdr;
-      Writer.write_bigsubstring con.C.writer msg
-    in
     let maybe_log ~logfun ~name ~now d =
       match logfun with
       | None -> ()
       | Some f -> f (`Send d) name ~time_sent_received:now
     in
     fun ~logfun ~name ~now con d ->
-      let now = now () in
       match con.C.marshal_fun d with
       | None -> `Not_sent
       | Some msg ->
-          send ~now ~con msg;
-          maybe_log ~logfun ~name ~now d;
-          `Sent
+        let now = now () in
+        let hdr = {H.time_stamp = now; body_length = Bigsubstring.length msg} in
+        send_raw ~writer:con.C.writer ~hdr ~msg;
+        maybe_log ~logfun ~name ~now d;
+        `Sent
+  ;;
 
   let send ~logfun ~name ~now con d =
     match send_no_flush ~logfun:None ~name ~now con d with
     | `Sent ->
-        Writer.flushed_time con.Connection.writer >>| fun tm ->
-          begin match logfun with
-          | None -> ()
-          | Some f -> f (`Send d) name ~time_sent_received:(now ())
-          end;
-          `Sent tm
-    | `Not_sent -> return (`Sent (now ()))
+      Writer.flushed_time con.Connection.writer >>| fun tm ->
+      begin match logfun with
+      | None -> ()
+      | Some f -> f (`Send d) name ~time_sent_received:(now ())
+      end;
+      `Sent tm
+    | `Not_sent -> return `Dropped
+  ;;
 
   let negotiate ~reader ~writer ~my_name ~auth_error =
     let (recv_version, send_version) =
     wrap_write_bin_prot ~sexp:Hello.sexp_of_t ~tc:Hello.bin_writer_t
       ~writer ~name:"negotiate" h;
     Reader.read_bin_prot reader Hello.bin_reader_t >>| (function
-      | `Eof -> `Eof
-      | `Ok h ->
-          match auth_error h with
-          | Some e -> `Auth_error e
-          | None ->
-              let recv_version = Version.min recv_version h.Hello.send_version in
-              let send_version = Version.min send_version h.Hello.recv_version in
-              match Recv.lookup_unmarshal_fun recv_version with
-              | Error _ -> `Version_error
-              | Ok unmarshal_fun ->
-                  match Send.lookup_marshal_fun send_version with
-                  | Error _ -> `Version_error
-                  | Ok marshal_fun -> `Ok (h, marshal_fun, unmarshal_fun))
+    | `Eof -> `Eof
+    | `Ok h ->
+      match auth_error h with
+      | Some e -> `Auth_error e
+      | None ->
+        let recv_version = Version.min recv_version h.Hello.send_version in
+        let send_version = Version.min send_version h.Hello.recv_version in
+        match Recv.lookup_unmarshal_fun recv_version with
+        | Error _ -> `Version_error
+        | Ok unmarshal_fun ->
+          match Send.lookup_marshal_fun send_version with
+          | Error _ -> `Version_error
+          | Ok marshal_fun -> `Ok (h, send_version, marshal_fun, unmarshal_fun))
   ;;
 
   exception Eof with sexp
       extend_disconnect remote_name e);
     let upon_ok a f =
       upon a (function
-        | Error e ->
-          con.C.kill ();
-          extend_disconnect remote_name e
-        | Ok `Eof ->
-          con.C.kill ();
-          extend_disconnect remote_name Eof
-        | Ok (`Ok msg) -> f msg)
+      | Error e ->
+        con.C.kill ();
+        extend_disconnect remote_name e
+      | Ok `Eof ->
+        con.C.kill ();
+        extend_disconnect remote_name Eof
+      | Ok (`Ok msg) -> f msg)
     in
     let alloc = bigsubstring_allocator () in
     let read_header () = Reader.read_bin_prot con.C.reader H.bin_reader_t in
     let get_substring hdr = alloc hdr.H.body_length in
     let read_ss ss =
       Reader.really_read_bigsubstring con.C.reader ss >>| function
-        | `Ok -> `Ok ()
-        | `Eof _ -> `Eof
+      | `Ok -> `Ok ()
+      | `Eof _ -> `Eof
     in
     let extend ~time_sent ~time_received data =
       begin match logfun with
   ;;
 
   module Server = struct
+
+    module Connections : sig
+      type t
+      val create : unit -> t
+      val mem : t -> Remote_name.t -> bool
+      val find : t -> Remote_name.t -> Connection.t option
+      val add : t -> name:Remote_name.t -> conn:Connection.t -> unit
+      val remove : t -> Remote_name.t -> unit
+
+      val fold :
+        t
+        -> init:'a
+        -> f:(name:Remote_name.t -> conn:Connection.t -> 'a-> 'a)
+        -> 'a
+
+      val send_to_all : t
+      -> logfun:logfun option
+      -> now:(unit -> Time.t)
+      -> Send.t
+      -> [ `Sent (** sent successfuly to all clients *)
+         | `Dropped (** not sent successfully to any client *)
+         | `Partial_success (** sent to some clients *)] Deferred.t
+
+      val send_to_all_ignore_errors : t
+        -> logfun:logfun option
+        -> now:(unit -> Time.t)
+        -> Send.t
+        -> unit
+
+    end = struct
+
+      module C = Connection
+
+      type t = {
+        by_name : (C.t Bag.t * C.t Bag.Elt.t) Remote_name.Table.t;
+        by_send_version : (C.t Bag.t * Send.t marshal_fun) Version.Table.t;
+      }
+
+      let create () =
+        { by_name = Remote_name.Table.create ();
+          by_send_version =
+            Version.Table.create
+              ~size:(Version.to_int Send.test_version) ();
+        }
+      ;;
+
+      let fold t ~init ~f =
+        Hashtbl.fold t.by_name ~init ~f:(fun ~key ~data:(_, bag_elt) acc ->
+          f ~name:key ~conn:(Bag.Elt.value bag_elt) acc)
+      ;;
+
+      let mem t name = Hashtbl.mem t.by_name name
+
+      let add t ~name ~conn =
+        let bag, _marshal_fun =
+          Hashtbl.find_or_add t.by_send_version conn.C.send_version
+            ~default:(fun () -> Bag.create (), conn.C.marshal_fun)
+        in
+        let bag_elt = Bag.add bag conn in
+        Hashtbl.replace t.by_name ~key:name ~data:(bag, bag_elt);
+      ;;
+
+      let remove t name =
+        match Hashtbl.find t.by_name name with
+        | None -> ()
+        | Some (bag, bag_elt) ->
+          Bag.remove bag bag_elt;
+          Hashtbl.remove t.by_name name;
+      ;;
+
+      let find t name =
+        match Hashtbl.find t.by_name name with
+        | None -> None
+        | Some (_, bag_elt) -> Some (Bag.Elt.value bag_elt)
+      ;;
+
+      let maybe_log ~logfun ~name ~now d =
+        match logfun with
+        | None -> ()
+        | Some f -> f (`Send d) name ~time_sent_received:now
+      ;;
+
+      let send_to_all' t d ~logfun ~now =
+        let module C = Connection in
+        let module H = Message_header in
+        let now = now () in
+        let res =
+          Hashtbl.fold t.by_send_version ~init:`Init
+            ~f:(fun ~key:_ ~data:(bag, marshal_fun) acc ->
+              if not (Bag.is_empty bag) then begin
+                let res =
+                  match marshal_fun d with
+                  | None -> `Dropped
+                  | Some msg ->
+                    let hdr =
+                      {H.time_stamp = now; body_length = Bigsubstring.length msg}
+                    in
+                    Bag.iter bag ~f:(fun conn ->
+                      send_raw ~writer:conn.C.writer ~hdr ~msg;
+                      maybe_log ~logfun ~now ~name:conn.C.name d);
+                    `Sent
+                in
+                match acc, res with
+                | `Partial_success, _
+                | `Sent           , `Dropped
+                | `Dropped        , `Sent
+                  -> `Partial_success
+                | `Sent           , `Sent
+                  -> `Sent
+                | `Dropped        , `Dropped
+                  -> `Dropped
+                | `Init           , _
+                  -> res
+              end
+              else
+                acc)
+        in
+        match res with
+        | `Init -> `Sent
+        | `Partial_success | `Dropped | `Sent as x -> x
+      ;;
+
+      let send_to_all t ~logfun ~now d =
+        let res = send_to_all' t d ~logfun ~now in
+        Deferred.all_unit (fold t ~init:[] ~f:(fun ~name:_ ~conn acc ->
+          Writer.flushed conn.C.writer :: acc))
+        >>| fun () -> res
+      ;;
+
+      let send_to_all_ignore_errors t ~logfun ~now d =
+        ignore (send_to_all' t d ~logfun ~now : [`Partial_success | `Dropped | `Sent])
+      ;;
+
+    end
+
     type t = {
       tail : (Remote_name.t, Recv.t) Server_msg.t Tail.t;
       logfun : logfun option;
-      connections : Connection.t Remote_name.Table.t;
+      connections : Connections.t;
       mutable am_listening : bool;
       socket : ([ `Bound ], Socket.inet) Socket.t;
       warn_free_connections_pct: float;
 
     let flushed t ~cutoff =
       let flushes =
-        Hashtbl.fold t.connections ~init:[]
-          ~f:(fun ~key:client ~data:conn acc ->
+        Connections.fold t.connections ~init:[]
+          ~f:(fun ~name:client ~conn acc ->
             (choose [ choice (Writer.flushed conn.Connection.writer)
                         (fun _ -> `Flushed client);
                       choice cutoff (fun () -> `Not_flushed client);
                     ]) :: acc)
       in
       Deferred.all flushes >>| fun results ->
-        let (flushed, not_flushed) =
-          List.partition_map results
-            ~f:(function `Flushed c -> `Fst c | `Not_flushed c -> `Snd c)
-        in
-        `Flushed flushed, `Not_flushed not_flushed
+      let (flushed, not_flushed) =
+        List.partition_map results
+          ~f:(function `Flushed c -> `Fst c | `Not_flushed c -> `Snd c)
+      in
+      `Flushed flushed, `Not_flushed not_flushed
     ;;
 
     let send t name d =
-      match Hashtbl.find t.connections name with
+      match Connections.find t.connections name with
       | None -> return `Dropped
       | Some c -> send ~logfun:t.logfun ~name ~now:t.now c d
+    ;;
 
     let send_ignore_errors t name d =
-      match Hashtbl.find t.connections name with
+      match Connections.find t.connections name with
       | None -> ()
       | Some c -> ignore (send_no_flush ~logfun:t.logfun ~name ~now:t.now c d)
+    ;;
 
     let send_to_all t d =
-      let res =
-        Hashtbl.fold t.connections ~init:[]
-          ~f:(fun ~key ~data:_ acc -> (send t key d) :: acc)
-      in
-      Deferred.all res >>| (fun l ->
-        let f = function `Dropped -> true | `Sent _ -> false in
-        match List.partition_tf l ~f with
-        | ([], _) -> `Sent
-        | (_, []) -> `Dropped
-        | (_, _) -> `Partial_success)
+      Connections.send_to_all t.connections d ~now:t.now ~logfun:t.logfun
+    ;;
 
     let send_to_all_ignore_errors t d =
-      Hashtbl.iter t.connections
-        ~f:(fun ~key ~data:_ -> send_ignore_errors t key d)
+      Connections.send_to_all_ignore_errors t.connections d ~now:t.now ~logfun:t.logfun
+    ;;
 
     let client_is_authorized t ip =
       t.is_client_ip_authorized (Unix.Inet_addr.to_string ip)
     ;;
 
     let close t name =
-      Option.iter (Hashtbl.find t.connections name)
-        ~f:Connection.kill
+      Option.iter (Connections.find t.connections name) ~f:Connection.kill
+    ;;
 
     let handle_client t addr port fd =
       let module S = Server_msg in
                   None))
         in
         upon res (function
-          | `Error _
-          | `Timeout
-          | `Ok `Eof
-          | `Ok `Version_error -> Lazy.force close
-          | `Ok (`Auth_error e) -> die_error e
-          | `Ok (`Ok (h, marshal_fun, unmarshal_fun)) ->
-              let name =
-                if t.enforce_unique_remote_name then
-                  h.Hello.name
-                else
-                  sprintf "%s:%s:%d:%s"
-                    h.Hello.name
-                    (Unix.Inet_addr.to_string addr)
-                    port
-                    (Int63.to_string t.num_accepts)
+        | `Error _
+        | `Timeout
+        | `Ok `Eof
+        | `Ok `Version_error -> Lazy.force close
+        | `Ok (`Auth_error e) -> die_error e
+        | `Ok (`Ok (h, send_version, marshal_fun, unmarshal_fun)) ->
+          let name =
+            if t.enforce_unique_remote_name then
+              h.Hello.name
+            else
+              sprintf "%s:%s:%d:%s"
+                h.Hello.name
+                (Unix.Inet_addr.to_string addr)
+                port
+                (Int63.to_string t.num_accepts)
+          in
+          match Result.try_with (fun () -> Remote_name.of_string name) with
+          | Error exn ->
+            die_error
+              (C.Protocol_error
+                 (sprintf "error constructing name: %s, error: %s"
+                    name (Exn.to_string exn)))
+          | Ok remote_name ->
+            let module T = Remote_name.Table in
+            if Connections.mem t.connections remote_name then
+              die_error (C.Duplicate remote_name)
+            else begin
+              let close =
+                lazy
+                  (Lazy.force close;
+                   Connections.remove t.connections remote_name)
               in
-              match Result.try_with (fun () -> Remote_name.of_string name) with
-              | Error exn ->
-                  die_error
-                    (C.Protocol_error
-                        (sprintf "error constructing name: %s, error: %s"
-                            name (Exn.to_string exn)))
-              | Ok remote_name ->
-                  let module T = Remote_name.Table in
-                  if T.mem t.connections remote_name then
-                    die_error (C.Duplicate remote_name)
-                  else begin
-                    let close =
-                      lazy
-                        (Lazy.force close;
-                         Hashtbl.remove t.connections remote_name)
-                    in
-                    kill := (fun () -> Lazy.force close);
-                    let con =
-                      { Connection.
-                        writer = w;
-                        reader = r;
-                        unmarshal_fun;
-                        marshal_fun;
-                        kill = !kill }
-                    in
-                    Tail.extend t.tail (S.Control (C.Connect remote_name));
-                    Hashtbl.replace t.connections ~key:remote_name ~data:con;
-                    handle_incoming
-                      ~logfun:t.logfun ~remote_name
-                      ~ip:(Unix.Inet_addr.to_string addr) ~con
-                      ~extend_disconnect:(fun n e ->
-                        Tail.extend t.tail (S.Control (C.Disconnect (n, Exn.sexp_of_t e))))
-                      ~extend_parse_error:(fun n e ->
-                        Tail.extend t.tail (S.Control (C.Parse_error (n, e))))
-                      ~extend_data:(fun x -> Tail.extend t.tail (S.Data x))
-                  end)
+              kill := (fun () -> Lazy.force close);
+              let conn =
+                { Connection.
+                  writer = w;
+                  reader = r;
+                  unmarshal_fun;
+                  marshal_fun;
+                  send_version;
+                  name = remote_name;
+                  kill = !kill }
+              in
+              Tail.extend t.tail (S.Control (C.Connect remote_name));
+              Connections.add t.connections ~name:remote_name ~conn;
+              handle_incoming
+                ~logfun:t.logfun ~remote_name
+                ~ip:(Unix.Inet_addr.to_string addr) ~con:conn
+                ~extend_disconnect:(fun n e ->
+                  Tail.extend t.tail (S.Control (C.Disconnect (n, Exn.sexp_of_t e))))
+                ~extend_parse_error:(fun n e ->
+                  Tail.extend t.tail (S.Control (C.Parse_error (n, e))))
+                ~extend_data:(fun x -> Tail.extend t.tail (S.Data x))
+            end)
       end
 
     let listen t =
           | None ->
             Monitor.try_with (fun () -> Socket.accept socket)
             >>> function
-              | Error exn ->
-                Monitor.send_exn (Monitor.current ()) exn;
-                upon (Clock.after wait_after_exn) loop
-              | Ok (sock, `Inet (addr, port)) ->
-                assert (Socket.getopt sock Socket.Opt.nodelay);
-                assert (t.free_connections > 0);
-                t.num_accepts <- Int63.succ t.num_accepts;
-                t.free_connections <- t.free_connections - 1;
-                if t.free_connections = 0 then begin
-                  t.when_free <- Some (Ivar.create ());
-                  control (C.Too_many_clients "zero free connections")
-                end else if t.free_connections <= warn_thres then begin
-                  control (C.Almost_full t.free_connections)
-                end;
-                let fd = Socket.fd sock in
-                upon (Fd.close_finished fd) incr_free_connections;
-                Core.Std.protect ~f:(fun () -> handle_client t addr port fd)
-                  ~finally:loop
+            | Error exn ->
+              Monitor.send_exn (Monitor.current ()) exn;
+              upon (Clock.after wait_after_exn) loop
+            | Ok (sock, `Inet (addr, port)) ->
+              assert (Socket.getopt sock Socket.Opt.nodelay);
+              assert (t.free_connections > 0);
+              t.num_accepts <- Int63.succ t.num_accepts;
+              t.free_connections <- t.free_connections - 1;
+              if t.free_connections = 0 then begin
+                t.when_free <- Some (Ivar.create ());
+                control (C.Too_many_clients "zero free connections")
+              end else if t.free_connections <= warn_thres then begin
+                control (C.Almost_full t.free_connections)
+              end;
+              let fd = Socket.fd sock in
+              upon (Fd.close_finished fd) incr_free_connections;
+              protect ~f:(fun () -> handle_client t addr port fd) ~finally:loop
         in
         loop ()
       end;
     let listen_ignore_errors ?(stop = Deferred.never ()) t =
       let s = Stream.take_until (listen t) stop in
       Stream.filter_map s ~f:(function
-        | Server_msg.Control _ -> None
-        | Server_msg.Data x -> Some x.Read_result.data)
+      | Server_msg.Control _ -> None
+      | Server_msg.Data x -> Some x.Read_result.data)
     ;;
 
     let create
                 socket;
                 am_listening = false;
                 logfun;
-                connections = Remote_name.Table.create ();
+                connections = Connections.create ();
                 is_client_ip_authorized;
                 my_name;
                 enforce_unique_remote_name;
       queue : (Send.t * [`Sent of Time.t | `Dropped] Ivar.t) Queue.t;
       mutable con :
         [ `Disconnected
-        | `Connected of Connection.t
-        | `Connecting of unit -> unit ];
+          | `Connected of Connection.t
+          | `Connecting of unit -> unit ];
       mutable trying_to_connect : bool;
       mutable connect_complete : unit Ivar.t;
       mutable ok_to_connect : unit Ivar.t;
     let raise_after_timeout span f =
       try_with_timeout span f
       >>| function
-        | `Ok a -> a
-        | `Error e -> raise e
-        | `Timeout -> failwith "timeout"
+      | `Ok a -> a
+      | `Error e -> raise e
+      | `Timeout -> failwith "timeout"
     ;;
 
     exception Hello_name_is_not_expected_remote_name of string * string with sexp
           raise_after_timeout connect_timeout (fun () ->
             Socket.connect s address))
         >>= function
-          | Error e ->
-            close e;
-            return (Error e)
-          | Ok s ->
-            close_this := `Active_socket s;
-            Monitor.try_with (fun () ->
-              assert (Socket.getopt s Socket.Opt.nodelay);
-              let fd = Socket.fd s in
-              let reader = Reader.create fd in
-              let writer = Writer.create ~syscall:`Per_cycle fd in
-              close_this := `Writer writer;
-              Stream.iter (Monitor.errors (Writer.monitor writer))
-                ~f:(fun e -> close (Write_error e));
-              let my_name = My_name.to_string t.my_name in
-              raise_after_timeout negotiate_timeout (fun () ->
-                negotiate ~reader ~writer
-                  ~my_name ~auth_error:(fun _ -> None))
-              >>| (function
-                | `Eof -> failwith "eof"
-                | `Auth_error _ -> assert false
-                | `Version_error ->
-                  failwith "cannot negotiate a common version"
-                | `Ok (h, marshal_fun, unmarshal_fun) ->
-                  let expected_remote_name =
-                    Remote_name.to_string t.expected_remote_name
-                  in
-                  if t.check_remote_name
-                    && h.Hello.name <> expected_remote_name
-                  then
-                    raise (Hello_name_is_not_expected_remote_name
-                             (h.Hello.name, expected_remote_name))
-                  else begin
-                    let con =
-                      { Connection.
-                        writer;
-                        reader;
-                        marshal_fun;