Commits

ysulsky committed 5c5f881

auto export

Comments (0)

Files changed (715)

base/async/_oasis

+#AUTOGENERATED FILE; EDIT oasis.sh INSTEAD
+
+OASISFormat:  0.2
+OCamlVersion: >= 3.12
+Name:         async
+Version:      107.01
+Synopsis:     Jane Street Capital's asynchronous execution library
+Authors:      Jane street capital
+Copyrights:   (C) 2008-2011 Jane Street Capital LLC
+License:      LGPL-2.1 with OCaml linking exception
+LicenseFile:  LICENSE
+Plugins:      StdFiles (0.2),
+              DevFiles (0.2),
+              META (0.2)
+BuildTools:   ocamlbuild
+Description:  Jane Street Capital's asynchronous execution library
+FindlibVersion: >= 1.2.7
+XStdFilesAUTHORS: false
+XStdFilesINSTALLFilename: INSTALL
+XStdFilesREADME: false
+
+
+Library async
+  Path:               lib
+  FindlibName:        async
+  Pack:               true
+  Modules:Std
+  BuildDepends:       sexplib.syntax,
+                      sexplib,
+                      fieldslib.syntax,
+                      fieldslib,
+                      bin_prot,
+                      bin_prot.syntax,
+                      bigarray,
+                      res,
+                      unix,
+                      threads
+                      core
+
+
+Document "async"
+  Title:                Jane street's async library
+  Type:                 ocamlbuild (0.2)
+  BuildTools+:          ocamldoc
+  XOCamlbuildPath:      lib
+  XOCamlbuildLibraries: async
+

base/async/examples/bin_prot_test.ml

+open Jane.Std
+open Async.Std
+open Print
+
+module Fd = Unix.Fd
+
+(* The type of values we want to transmit for testing *)
+type test =
+  {
+    a : int;
+    b : float;
+    c : bool array;
+    d : (string * int64) list;
+    e : nativeint array;
+    f : [`Foo | `Bar] array;
+    g : float array;
+  }
+with bin_io
+
+(* This value is very likely larger than e.g. an order, fill, etc. *)
+let test =
+  {
+    a = 42;
+    b = 3.1;
+    c = Array.init 10 ~f:(fun i -> i mod 2 = 0);
+    d = [ ("asdf", 421311L); ("foo.bar", -123412L); ("asdfkasdfkjasdf", Int64.zero) ];
+    e = Array.init 64 ~f:(fun i -> Nativeint.shift_right Nativeint.one i);
+    f = Array.init 20 ~f:(fun i -> if i mod 2 = 0 then `Foo else `Bar);
+    g = Array.init 20 ~f:float;
+  }
+
+(* Number of messages to send for testing *)
+let n_msgs = 100_000
+
+(* Reader reads binary protocol messages from a pipe *)
+let start_reader fd =
+  let reader = Reader.create fd in
+  let rec loop n =
+    (* All messages received *)
+    if n = 0 then
+      (* Make sure we now get an EOF *)
+      upon (Reader.read_bin_prot reader bin_reader_test) (function
+        | `Eof ->
+            print_endline "Reader success";
+            shutdown 0
+        | _ -> assert false)
+    else begin
+      (* Read another binary protocol message *)
+      upon (Reader.read_bin_prot reader bin_reader_test) (function
+        | `Ok v when v = test -> loop (n - 1)
+        | _ -> assert false)
+    end
+  in
+  loop n_msgs;
+  never_returns (Scheduler.go ())
+
+(* Writer writes binary protocol messages to a pipe and waits for its
+   reader child process to successfully finish reading *)
+let start_writer pid fd =
+  let writer = Writer.create fd in
+  let rec loop n =
+    if n = 0 then begin
+      (* All messages sent; make sure to flush writer buffers (easy to
+         forget!) and close the underlying file descriptor *)
+      upon (Writer.close writer) (fun () ->
+        let bytes_written = Writer.bytes_written writer in
+        printf "Writer success: %s bytes written\n%!"
+          (Int63.to_string bytes_written);
+        (* Wait for reader to terminate successfully *)
+        upon (Unix.wait (`Pid pid)) (function
+          | wpid, Ok () ->
+              assert (wpid = pid);
+              printf "All successful: transmitted %d messages\n" n_msgs;
+              shutdown 0
+          | _ -> assert false))
+    end else begin
+      (* Write a value to the writer using the binary protocol *)
+      Writer.write_bin_prot writer bin_writer_test test;
+      (* It's usually a good idea performance-wise to flush if the buffer
+         is close to full before putting in more messages, because
+         otherwise the buffer has to be grown (assuming default size of
+         131_072) - possibly without bounds. *)
+      if Writer.bytes_to_write writer > 100_000 then
+        upon (Writer.flushed writer) (fun _ -> loop (n - 1))
+      else loop (n - 1)
+    end
+  in
+  loop n_msgs;
+  never_returns (Scheduler.go ())
+
+let () =
+  let module Unix = Core.Std.Unix in
+  (* Create pipe for communicating between reader and writer process *)
+  let ifd, ofd = Unix.pipe () in
+  (* Fork off reader process *)
+  match Unix.fork () with
+  | `In_the_child ->
+    (* We are the reader *)
+    Unix.close ofd;
+    start_reader (Fd.create Fd.Kind.Fifo ifd ~name:"<parent reader>")
+  | `In_the_parent pid -> 
+    (* We are the writer *)
+    Unix.close ifd;
+    start_writer pid (Fd.create Fd.Kind.Fifo ofd ~name:"<child writer>")

base/async/examples/bind_loop.ml

+open Core.Std
+open Async.Std
+
+let bind_loop () =
+  let rec loop n =
+    if n = 0 then return 123
+    else
+      Deferred.unit >>= fun () -> loop (n - 1)
+  in
+  loop 15_000_000
+;;
+
+let main () =
+  bind_loop ()
+  >>>
+    fun x ->
+      Print.printf "done %d\n" x;
+      Shutdown.shutdown 0
+;;
+
+let () =
+  main ();
+  Exn.handle_uncaught ~exit:true (never_returns (Scheduler.go ()))

base/async/examples/cat.ml

+open Jane.Std
+open Async.Std
+
+module Fd = Unix.Fd
+
+let cat ~input ~output =
+  let reader = Reader.create input in
+  let writer = Writer.create ~raise_epipe:false output in
+  let buf = String.create 4096 in
+  let rec loop () =
+    upon
+      (choose
+        [
+          choice (Reader.read reader buf) (fun r -> `Reader r);
+          choice (Writer.got_epipe writer) (fun () -> `Epipe);
+        ])
+    (function
+      | `Reader r ->
+        begin
+          match r with
+          | `Eof -> upon (Writer.flushed writer) (fun _ ->
+            never_returns (shutdown_dont_return 0))
+          | `Ok len ->
+              Writer.write_substring writer (Substring.create buf ~pos:0 ~len);
+              loop ()
+        end
+      | `Epipe -> never_returns (shutdown_dont_return 0))
+  in
+  loop ()
+;;
+
+let () = cat ~input:(Fd.stdin ()) ~output:(Fd.stdout ())
+
+let () = never_returns (Scheduler.go ())

base/async/examples/countdown.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+let rec loop i =
+  if i < 0 then
+    shutdown 0
+  else begin
+    printf "%d\n" i; 
+    upon (after (Time.Span.of_sec 1.)) (fun _ -> loop (i - 1))
+  end
+
+let () = loop 10
+  
+let () = never_returns (Scheduler.go ())

base/async/examples/echo_server_with_typed_tcp.ml

+open Core.Std
+open Async.Std
+
+module Protocol = struct
+  module Client_message = String
+
+  module Server_message = String
+
+  module Transport = struct
+    type t = Reader.t * Writer.t
+
+    let create r w = return (r, w)
+
+    let close (r, w) = Writer.close w >>= fun () -> Reader.close r
+
+    let read (r, _) = Reader.read_line r
+
+    let write (_, w) msg = Writer.write w msg; Writer.newline w
+
+    let flushed_time (_, w) = Writer.flushed_time w
+  end
+end
+
+module Server = Typed_tcp.Make(Protocol)
+
+let main () =
+  Server.create ~port:12321 ~auth:(fun _ _ -> return `Allow) ()
+  >>> fun svr ->
+  let echo (clt, msg) = Server.send_ignore_errors svr clt msg in
+  let strm = Server.listen_ignore_errors svr in
+  whenever (Pipe.iter_without_pushback ~f:echo strm)
+
+let () =
+  main ();
+  never_returns (Scheduler.go ())

base/async/examples/file_tail.ml

+open Core.Std
+open Async.Std
+
+module File_tail = Async.Std.File_tail
+
+let () =
+  let files = List.tl_exn (Array.to_list Sys.argv) in
+  Deferred.List.iter ~how:`Parallel files ~f:(fun file ->
+    Pipe.iter_without_pushback (File_tail.create file) ~f:(fun update ->
+      printf "%s %s\n%!" file (File_tail.Update.to_string_hum update)))
+  >>> fun () ->
+  shutdown 0
+;;
+
+let () = never_returns (Scheduler.go ())

base/async/examples/finalizer.ml

+open Jane.Std
+open Async.Std
+
+let eprintf = Printf.eprintf
+
+let resurrect = ref []
+
+let rec finished x =
+  eprintf "finished\n%!";
+  resurrect := x :: !resurrect;
+  Scheduler.finalize finished x;
+;;
+
+let () =
+  let s = String.create 10 in
+  Scheduler.finalize finished s;
+;;
+
+let compact () =
+  resurrect := [];
+  eprintf "compacting\n%!";
+  Gc.compact ();
+  eprintf "done compacting\n%!";
+;;
+
+let rec loop i =
+  if i = 0 then
+    shutdown 0
+  else
+    upon (Clock.after (Time.Span.of_sec 1.)) (fun () ->
+      compact ();
+      loop (i - 1))
+;;
+
+let () = loop 5
+
+let () = never_returns (Scheduler.go ())
+

base/async/examples/hello.ml

+open Core.Std
+open Async.Std
+
+let handler _ reader writer =
+  Deferred.create (fun i ->
+    let write () = Writer.write writer
+      "HTTP/1.1 200 OK\nContent-length: 12\nContent-type: text/plain\n\nHello World!" in
+    let rec read () = Reader.read_line reader >>>
+      function
+        (* Blows up horribly if it's a POST
+         * (or anything with Content-length /= 0 *)
+        | `Ok "" -> write (); read ()
+        | `Eof   -> write (); Ivar.fill i ()
+        | _      -> read ()
+    in read () )
+let () = ignore (Tcp.serve ~port:55555 ~on_handler_error:`Ignore handler)
+let () = never_returns (Scheduler.go ())

base/async/examples/jobs_speed_test.ml

+open Core.Std
+open Async.Std
+
+let num_jobs = 4096
+let num_iters = 4096
+
+let run_test () =
+  Deferred.create (fun i ->
+    let rec loop n =
+      if n = 0 then
+        Ivar.fill i ()
+      else begin
+        Deferred.create (fun i ->
+          let finished = ref num_jobs in
+          let rec loop n =
+            if n > 0 then begin
+              upon Deferred.unit (fun () ->
+                decr finished;
+                if !finished = 0 then Ivar.fill i ());
+              loop (n - 1)
+            end
+          in
+          loop num_jobs)
+        >>> fun () -> loop (n - 1)
+      end
+    in
+    loop num_iters)
+;;
+
+let () =
+  let start = Time.now () in
+  upon (run_test ()) (fun () ->
+    let stop = Time.now () in
+    Printf.printf "elapsed time: %s\n" (Time.Span.to_string (Time.diff stop start));
+    Shutdown.shutdown 0);
+  never_returns (Scheduler.go ());
+;;
+
+(* jobs_per_cycle
+   = 2000, time = 9.3s
+   = 1200, time = 10.24s
+   = 1100, time = 10.25s
+   = 1000, time = 10.30s
+   = 900, time  = 10.3s
+   = 800, time = 9.7s
+   = 700, time = 9.7s
+   = 600, time = 10.3s
+   = 500, time = 10.5s
+   = 400, time = 10.61
+   = 100, time = 11.1s
+   = 10, time = 16.5s
+*)

base/async/examples/key_value_store.ml

+open Core.Std
+open Async.Std
+
+module Protocol = struct
+  module Client_message = struct
+    type t = Req_help
+           | Req_add    of string * string
+           | Req_find   of string
+           | Req_remove of string
+           | Req_invalid
+    with sexp
+
+
+    let from_raw msg = match String.split msg ~on:' ' with
+      | ("help" :: _          ) -> Req_help
+      | ("add"  :: k :: v :: _) -> Req_add (k, v)
+      | ("find" :: k :: _     ) -> Req_find k
+      | ("remove" :: k :: _   ) -> Req_remove k
+      | _                       -> Req_invalid
+  end
+
+  module Server_message = struct
+    type t = Rep_fail of string
+           | Rep_succ of string
+           | Rep_help
+    with sexp
+
+
+    let to_raw t = match t with
+      | Rep_fail msg -> "[ERROR] " ^ msg
+      | Rep_succ msg -> msg
+      | Rep_help     -> "Valid commands: help, add, find, remove"
+  end
+
+  module Transport = struct
+    type t = Reader.t * Writer.t
+
+    let create r w = return (r, w)
+    let close   (r, _) = Reader.close r
+    let read    (r, _) =
+      Reader.read_line r
+      >>| function
+        | `Eof -> `Eof
+        | `Ok rawmsg -> `Ok (Client_message.from_raw rawmsg)
+
+    let write (_, w) msg =
+      Writer.write w (Server_message.to_raw msg);
+      Writer.newline w
+    let flushed_time (_, w) = Writer.flushed_time w
+  end
+end
+
+module Server = Typed_tcp.Make(Protocol)
+
+module Key_value_store = struct
+
+  type t = {
+    store  : (string, string) Hashtbl.t;
+  }
+
+  let create () = {
+    store  = Hashtbl.Poly.create ();
+  }
+
+  let process t req =
+    let module R = Protocol.Client_message in
+    let module P = Protocol.Server_message in
+    match req with
+    |(R.Req_help |
+      R.Req_invalid)   -> P.Rep_help
+    | R.Req_add (k, v) ->
+        Hashtbl.replace t.store ~key:k ~data:v;
+        P.Rep_succ ("New pair stored: "^k^" -> "^v)
+    | R.Req_find k     ->
+        begin match Hashtbl.find t.store k with
+        | None   -> P.Rep_fail ("No value found for: "^k)
+        | Some v -> P.Rep_succ v
+        end
+    | R.Req_remove k   ->
+        begin match Hashtbl.find t.store k with
+        | None   -> P.Rep_fail ("No value found for: "^k)
+        | Some _ ->
+          Hashtbl.remove t.store k;
+          P.Rep_succ ("Key removed: "^k)
+        end
+end
+
+let main () =
+  let kvstore = Key_value_store.create () in
+  Server.create ~port:12321 ~auth:(fun _ _ -> return `Allow) ()
+  >>> fun svr ->
+  let echo (clt, msg) = Server.send_ignore_errors
+                          svr
+                          clt
+                          (Key_value_store.process kvstore msg) in
+  whenever (Pipe.iter_without_pushback ~f:echo (Server.listen_ignore_errors svr))
+
+let () =
+  main ();
+  never_returns (Scheduler.go ())

base/async/examples/load_file.ml

+open Core.Std
+open Async.Std
+
+let lines file =
+  Reader.with_file file
+    ~f:(fun reader ->
+      Deferred.create (fun i ->
+        let rec loop ac =
+          upon (Reader.read_line reader) (function
+            | `Eof -> Ivar.fill i (Array.of_list (List.rev ac))
+            | `Ok line -> loop (line :: ac))
+        in
+        loop []))
+(*     (fun reader ->
+      Reader.contents reader >>| fun contents ->
+        Array.of_list (String.split ~on:'\n' contents)) *)
+(*
+      let lines_stream = Reader.lines reader in
+      Stream.to_list lines_stream >>| Array.of_list) *)
+;;
+
+let main () =
+  let file = Sys.argv.(1) in
+(*  Gc.set
+      { (Gc.get ()) with Gc.Control.
+        minor_heap_size = 8_388_608;
+        space_overhead = 150;
+        max_overhead = 1_000_000;
+        major_heap_increment = 1_048_576 }; *)
+  upon (lines file) (fun _lines -> Shutdown.shutdown 0);
+  never_returns (Scheduler.go ())
+
+let () = Exn.handle_uncaught ~exit:true main

base/async/examples/monitors.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+let () =
+  let m2 = Monitor.create ~name:"test monitor 2" () in
+  Stream.iter (Monitor.errors m2) ~f:(fun _ -> printf "caught error\n");
+  schedule ~monitor:m2 (fun () ->
+    let m1 = Monitor.create ~name:"test monitor 1" () in
+    Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0))
+      ~f:(fun _ ->
+        try failwith "error!"
+        with _ -> Monitor.send_exn m1 (Failure "error!") ~backtrace:`Get));
+  never_returns (Scheduler.go ())

base/async/examples/priority.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+module Priority = Async.Std.Priority (* to avoid omake confusion *)
+
+let normal = Priority.normal
+let low = Priority.low
+
+let one name priority =
+  Scheduler.schedule ~priority (fun () ->
+    upon Deferred.unit (fun () ->
+      let rec loop i =
+        if i > 0 then begin
+          printf "%s %d\n" name i;
+          upon Deferred.unit (fun () -> loop (i -1));
+        end
+      in
+      loop 10))
+
+let () = one "low" low
+let () = one "normal" normal
+let () = upon (Clock.after (Time.Span.of_sec 1.)) (fun () -> shutdown 0)
+
+let () = never_returns (Scheduler.go ())

base/async/examples/process.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+let () = 
+  let f () = 
+    upon (In_process.run (fun () -> 1 + 1)) (fun x ->
+      printf "the answer is %d\n" x)
+  in
+  Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0)) ~f:(fun _ -> f ());
+  never_returns (Scheduler.go ())

base/async/examples/process_stream.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+let () = 
+  let (_, out, _) = 
+    In_process.full 
+      (fun ~input:_ ~output ->
+         Stream.iter (Clock.at_intervals (Time.Span.of_sec 1.0)) ~f:(fun _ ->
+           Tail.extend output (Time.now ()));
+         Deferred.never ())
+  in
+  let _died = 
+    In_thread.run (fun () ->
+      let (pid, status) = Std_unix.wait () in
+      printf "worker died! %s\n"
+        (match status with
+        | Std_unix.WEXITED s -> (string_of_int pid) ^ ": " ^ (string_of_int s)
+        | Std_unix.WSIGNALED s -> (string_of_int pid) ^ ": " ^ (string_of_int s)
+        | _ -> ""))
+  in
+  Stream.iter out ~f:(fun x ->
+    printf "the time is: %s\n" (Time.to_string x));
+  never_returns (Scheduler.go ())

base/async/examples/rpc/rpc_client.ml

+open Core.Std
+open Core_extended.Std
+open Async.Std
+
+
+let with_rpc_connection f =
+  Tcp.with_connection ~host:"127.0.0.1" ~port:8080
+    (fun reader writer ->
+      Rpc.Connection.create reader writer ~connection_state:()
+      >>= function
+        | Error exn -> raise exn
+        | Ok conn ->
+          f conn
+          >>= fun () ->
+          shutdown 0;
+          return ()
+    )
+
+let set_id_counter new_id =
+  with_rpc_connection (fun conn ->
+    Rpc.Rpc.dispatch_exn Rpc_intf.set_id_counter conn new_id
+  )
+
+let set_id_counter_v0 new_id_pair =
+  with_rpc_connection (fun conn ->
+    Rpc.Rpc.dispatch_exn Rpc_intf.set_id_counter_v0 conn new_id_pair
+  )
+
+let get_unique_id () =
+  with_rpc_connection (fun conn ->
+    Rpc.Rpc.dispatch_exn Rpc_intf.get_unique_id conn ()
+    >>= fun id ->
+    printf "UNIQUE ID: %d\n" id;
+    return ()
+  )
+
+let counter_values () =
+  with_rpc_connection (fun conn ->
+    Rpc.Pipe_rpc.dispatch_exn Rpc_intf.counter_values conn ()
+    >>= fun (reader,_id) ->
+    Pipe.iter_without_pushback reader ~f:(fun i ->
+      printf "COUNTER: %d\n%!" i)
+  )
+
+
+(* Setting up the command-line interface *)
+
+let in_async f = whenever (f ()); never_returns (Scheduler.go ())
+
+let get_unique_id_cmd =
+  Fcommand.(
+    cmd ~summary:"get unique id from server"
+      (const ())
+      (fun () -> in_async get_unique_id)
+  )
+
+let set_id_counter_cmd =
+  Fcommand.(
+    cmd ~summary:"forcibly set the unique id counter.  DANGEROUS"
+      (anon ("counter" %: int))
+      (fun i -> in_async (fun () -> set_id_counter i))
+  )
+
+(* This one is actually unsupported by the server, so using it will trigger an error. *)
+let set_id_counter_cmd_v0 =
+  Fcommand.(
+    cmd ~summary:"forcibly set the unique id counter.  DANGEROUS"
+      (anon ("counter1" %: int) ++ anon ("counter2" %: int))
+      (fun id1 id2 -> in_async (fun () -> set_id_counter_v0 (id1,id2)))
+  )
+
+
+let counter_values_cmd =
+  Fcommand.(
+    cmd ~summary:"subscribe to changes to counter id"
+      (const ())
+      (fun () -> in_async counter_values)
+  )
+
+
+let () =
+  Version_util_extended.Command.run
+    (Command.group
+       ~summary:"Client for trivial Async-RPC server"
+       [ "get-unique-id"    , get_unique_id_cmd
+       ; "set-id-counter"   , set_id_counter_cmd
+       ; "set-id-counter-v0", set_id_counter_cmd_v0
+       ; "counter-values"   , counter_values_cmd
+       ]
+    )

base/async/examples/rpc/rpc_intf.ml

+open Core.Std
+open Async.Std
+
+let get_unique_id = Rpc.Rpc.create
+  ~name:"get-unique-id"
+  ~version:0
+  ~bin_query:Unit.bin_t
+  ~bin_response:Int.bin_t
+
+let set_id_counter = Rpc.Rpc.create
+  ~name:"set-id-counter"
+  (* Note that the version number is 1, because there is an older v0 query defined below
+     around. *)
+  ~version:1
+  ~bin_query:Int.bin_t
+  ~bin_response:Unit.bin_t
+
+
+(* This type is here only for the purpose of getting the ability to bin-prot an int
+   pair. *)
+module Int_pair = struct
+  type t = int * int with bin_io
+end
+
+let set_id_counter_v0 = Rpc.Rpc.create
+  ~name:"set-id-counter"
+  ~version:0
+  ~bin_query:Int_pair.bin_t
+  ~bin_response:Unit.bin_t
+
+
+let counter_values = Rpc.Pipe_rpc.create
+  ~name:"counter-values"
+  ~version:0
+  ~bin_query:Unit.bin_t
+  ~bin_response:Int.bin_t
+  ~bin_error:Unit.bin_t

base/async/examples/rpc/rpc_intf.mli

+open Core.Std
+open Async.Std
+
+(** Query for grabbing a unique ID *)
+val get_unique_id  : (unit,int) Rpc.Rpc.t
+
+(** Query for setting the counter used to generate unique IDs *)
+val set_id_counter : (int,unit) Rpc.Rpc.t
+
+(** This is a deprecated query, no longer supported by the server *)
+val set_id_counter_v0 : (int * int,unit) Rpc.Rpc.t
+
+(** For getting a stream updating the counter values *)
+val counter_values : (unit,int,unit) Rpc.Pipe_rpc.t

base/async/examples/rpc/rpc_server.ml

+open Core.Std
+open Core_extended.Std
+open Async.Std
+
+
+(* The list of implementations supported by the server.  The server state is simply a
+   counter used for allocating unique ids. *)
+let implementations =
+  [ Rpc.Rpc.implement Rpc_intf.get_unique_id
+      (fun ctr () ->
+        printf ".%!";
+        incr ctr;
+        return !ctr
+      )
+  ; Rpc.Rpc.implement Rpc_intf.set_id_counter
+    (fun ctr i  ->
+      printf "!%!";
+      if i = 0 then failwith "Can't set counter back to zero";
+      return (ctr := i)
+    )
+
+  ; Rpc.Pipe_rpc.implement Rpc_intf.counter_values
+    (fun ctr () ~aborted ->
+      let (r,w) = Pipe.create () in
+      let last_value = ref !ctr in
+      let send () =
+        last_value := !ctr;
+        Pipe.write w !ctr
+      in
+      whenever (send ());
+      Clock.every' ~stop:aborted (sec 0.1) (fun () ->
+        if !last_value <> !ctr
+        then send () else return ()
+      );
+      return (Ok r)
+    )
+  ]
+
+let main () =
+  let counter = ref 0 in
+  let server = Rpc.Server.create ~implementations ~on_unknown_rpc:`Ignore in
+  match server with
+  | Error (`Duplicate_implementations _descrs) -> assert false
+  | Ok server ->
+    whenever
+      (Tcp.serve ~port:8080 ~on_handler_error:`Ignore
+         (fun _addr reader writer ->
+           Rpc.Connection.server_with_close reader writer ~server
+             ~connection_state:counter
+             ~on_handshake_error:`Ignore));
+    never_returns (Scheduler.go ())
+
+let () =
+  Version_util_extended.Command.run
+    (Fcommand.cmd
+       ~summary:"A trivial Async-RPC server"
+       (Fcommand.const ())
+       main)

base/async/examples/server.ml

+open Jane.Std
+open Async.Std
+
+module Fd = Unix.Fd
+module Inet_addr = Unix.Inet_addr
+module Socket = Unix.Socket
+
+let stdout_writer = Lazy.force Writer.stdout
+let message s = Writer.write stdout_writer s
+
+let finished () = shutdown 0
+
+let port = 61111
+
+let result =
+    Tcp.serve ~port ~on_handler_error:`Raise
+      (fun _ reader writer ->
+        Deferred.create (fun finished ->
+          let rec loop () =
+            upon (Reader.read_line reader) (function
+              | `Ok query ->
+                  message (sprintf "Server got query: %s\n" query);
+                  Writer.write writer (sprintf "Response to %s\n" query);
+                  loop ()
+              | `Eof ->
+                  Ivar.fill finished ();
+                  message "Server got EOF\n")
+          in
+          loop ()))
+;;
+
+let () =
+  Core.Std.eprintf "TOP\n%!";
+;;
+
+let () =
+  let queries = ["Hello"; "Goodbye"] in
+  upon result (fun _ ->
+    Core.Std.eprintf "IN SERVER\n%!";
+    upon (Tcp.connect ~host:"localhost" ~port ()) (fun (reader, writer) ->
+      let rec loop queries =
+        match queries with
+        | [] -> upon (Writer.close writer) (fun _ -> finished ())
+        | query :: queries ->
+            Writer.write writer query;
+            Writer.write_char writer '\n';
+            upon (Reader.read_line reader) (function
+              | `Eof ->
+                  message "reader got unexpected Eof"
+              | `Ok response ->
+                  message (sprintf "Client got response: %s\n" response);
+                  loop queries)
+      in
+      loop queries))
+;;
+
+let () = never_returns (Scheduler.go ())

base/async/examples/signals.ml

+open Jane.Std
+open Async.Std
+
+let printf = Print.printf
+
+let () = printf "my pid is %s\n" (Pid.to_string (Unix.getpid ()))
+
+let () =
+  Signal.handle Signal.standard ~f:(fun signal ->
+    printf "%s\n" (Signal.to_string signal))
+;;
+
+let () = never_returns (Scheduler.go ())

base/async/examples/sigpipe.ml

+open Jane.Std
+open Async.Std
+
+let (in_fd, out_fd) = Unix.socketpair ()
+ 
+let () =
+  upon (Unix.close in_fd) (fun () ->
+    let w = Writer.create out_fd in
+    Writer.write w "hello\n")
+
+let () = never_returns (Scheduler.go ())

base/async/examples/socket.ml

+open Jane.Std
+open Async.Std
+module Socket = Unix.Socket
+
+let printf = Print.printf
+
+let port = 10000
+
+let doit () =
+  upon (Socket.bind (Socket.create Socket.Type.tcp)
+           (Socket.Address.inet_addr_any ~port)) (fun s ->
+    upon (Socket.accept (Socket.listen s)) (fun (s, _) ->
+      let buf = String.create 1000 in
+      let reader = Reader.create (Socket.fd s) in
+      let rec loop bytes_read =
+        upon (Reader.read reader buf) (function
+          | `Eof -> printf "EOF\n"
+          | `Ok n ->
+              let bytes_read = bytes_read + n in
+              printf "read %d bytes in total.\n" bytes_read;
+              loop bytes_read)
+      in
+      loop 0));
+  upon (Clock.after (Time.Span.of_sec 2.)) (fun () ->
+    let s = Socket.create Socket.Type.tcp in
+    upon (Socket.connect s (Socket.Address.inet
+                               (Unix.Inet_addr.of_string "127.0.0.1") ~port))
+      (fun s ->
+        let w = Writer.create (Socket.fd s) in
+        let buf = String.create 4096 in
+        let rec loop bytes_written =
+          Writer.write w buf;
+          upon (Writer.flushed w) (fun _ ->
+            let bytes_written = bytes_written + String.length buf in
+            printf "wrote %d bytes in total.\n" bytes_written;
+            loop bytes_written);
+        in
+        loop 0))
+;;
+
+let () = doit ()
+
+let () = never_returns (Scheduler.go ())

base/async/examples/sound.ml

+open Jane.Std
+open Async.Std
+
+module Sound = Async_extended.Std.Sound (* so omake isn't confused about a circular dependency *)
+
+let () = upon (Sound.play Sound.default) (fun () -> shutdown 0)
+
+let () = never_returns (Scheduler.go ())

base/async/examples/speed_test.ml

+open Core.Std
+open Async.Std
+
+let run_test ~fill_before_upon ~no_ivars ~spawn_factor =
+  let spawn =
+    let spawns = List.range 0 spawn_factor in
+    fun i ->  List.iter spawns ~f:(fun _ -> upon (Ivar.read i) Fn.ignore)
+  in
+  let finished = Ivar.create () in
+  let rec loop n =
+    if n > 0 then begin
+      let i = Ivar.create () in
+      if fill_before_upon then begin
+        Ivar.fill i ();
+        spawn i;
+      end else begin
+        spawn i;
+        Ivar.fill i ();
+      end;
+      loop (n - 1)
+    end else
+      Ivar.fill finished ()
+  in
+  loop no_ivars;
+  Ivar.read finished
+;;
+
+let () =
+  let no_ivars = int_of_string (Sys.argv.(1)) in
+  let spawn_factor = int_of_string (Sys.argv.(2)) in
+  if spawn_factor >= 20 then failwith "spawn_factor must be less than 20";
+  let fill_before_upon =
+    match (Sys.argv.(3)) with
+    | "fill" -> true
+    | "upon" -> false
+    | _ -> failwith "must specify either 'fill' or 'upon'"
+  in
+  let start = Time.now () in
+  upon (run_test ~fill_before_upon ~no_ivars ~spawn_factor) (fun () ->
+    let stop = Time.now () in
+    Printf.printf "elapsed time: %s\n" (Time.Span.to_string (Time.diff stop start));
+    Shutdown.shutdown 0);
+  never_returns (Scheduler.go ());
+;;

base/async/examples/write_forever.ml

+open Jane.Std
+open Async.Std
+
+let () =
+  upon (Unix.openfile "/tmp/z.foo" ~mode:[`Creat; `Wronly] ~perm:0o0666)
+    (fun fd ->
+      let writer = Writer.create fd in
+      let buf = String.create 1_048_576 in
+      let rec loop i =
+        eprintf "about to write %d\n" i;
+        Writer.write writer buf;
+        upon (Clock.after (Time.Span.of_sec 1.)) (fun () -> loop (i + 1));
+      in
+      loop 0
+    )
+
+let () = never_returns (Scheduler.go ());

base/async/lib/META

+# OASIS_START
+# DO NOT EDIT (digest: d84545bc9c7a98fd8fd88b734c8401ed)
+version = "107.01"
+description = "Jane Street Capital's asynchronous execution library"
+requires
+ =
+ "sexplib.syntax sexplib fieldslib.syntax fieldslib bin_prot bin_prot.syntax bigarray res unix threads\ncore"
+archive(byte) = "async.cma"
+archive(native) = "async.cmxa"
+exists_if = "async.cma"
+# OASIS_STOP
+

base/async/lib/async.mlpack

+# OASIS_START
+# DO NOT EDIT (digest: dd6297cf0a9348c7ddec66f11eff1ea7)
+Std
+# OASIS_STOP

base/async/lib/async.odocl

+# OASIS_START
+# DO NOT EDIT (digest: dd6297cf0a9348c7ddec66f11eff1ea7)
+Std
+# OASIS_STOP

base/async/lib/core/async_stream.ml

+open Core.Std
+open Import
+open Deferred_std
+
+include Basic.Stream
+
+let next d = d
+
+let first_exn t =
+  next t >>| function
+    | Nil -> failwith "Stream.first of empty stream"
+    | Cons (x, _) -> x
+;;
+
+let fold' t ~init ~f =
+  Deferred.create
+    (fun result ->
+      let rec loop t b =
+        upon (next t) (function
+          | Nil -> Ivar.fill result b
+          | Cons (v, t) -> upon (f b v) (loop t))
+      in
+      loop t init)
+;;
+
+(* [fold] is implemented to avoid per-stream-element deferred overhead in the case when
+   multiple stream elements are available simultaneously. *)
+let fold t ~init ~f =
+  Deferred.create
+    (fun result ->
+      let rec loop t b =