Commits

Yaron Minsky committed 5b67326

imported some more examples

Comments (0)

Files changed (11)

 true: syntax(camlp4o)
-true: package(core,sexplib.syntax,bin_prot.syntax,comparelib.syntax,fieldslib.syntax,variantslib.syntax)
+true: package(core,sexplib.syntax,bin_prot.syntax,comparelib.syntax,fieldslib.syntax,variantslib.syntax,async)
 true: thread,debug,annot
+open Core.Std
+open Async.Std
+open Protocol
+
+(* CR yminsky: consider adding an RPC that blocks on the server side. *)
+let publish_impl dir msg =
+  Directory.publish dir msg;
+  return ()
+
+let subscribe_impl dir topic ~aborted =
+  return (
+    match Directory.subscribe dir topic with
+    | None -> Error "Unknown topic"
+    | Some pipe ->
+      don't_wait_for (aborted >>| fun () -> Pipe.close_read pipe);
+      Ok pipe
+  )
+;;
+
+let dump_impl dir () =
+  return (Directory.dump dir)
+
+let shutdown_impl _dir () =
+  (after (sec 0.1) >>> fun () -> shutdown 0);
+  return ()
+
+let implementations =
+  [ Rpc.Rpc.     implement publish_rpc   publish_impl
+  ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
+  ; Rpc.Rpc.     implement dump_rpc      dump_impl
+  ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
+  ]
+
+let start_server () =
+  let server =
+    match
+      Rpc.Server.create
+        ~implementations
+        ~on_unknown_rpc:`Ignore
+    with
+    | Ok x -> x
+    | Error (`Duplicate_implementations _) -> assert false
+  in
+  let directory = Directory.create () in
+  Tcp.Server.create  ~on_handler_error:`Ignore
+    (Tcp.on_port 8080)
+    (fun _addr r w ->
+      Rpc.Connection.server_with_close r w
+        ~connection_state:directory
+        ~on_handshake_error:`Ignore
+        ~server
+    )
+  >>= fun server ->
+  Tcp.Server.close_finished server
+
+let () =
+  don't_wait_for (start_server ());
+  never_returns (Scheduler.go ())
+
+open Core.Std
+open Async.Std
+open Protocol
+
+(* Defaults are set here, and can be overridden by command-line arguments. *)
+let hostname = ref "127.0.0.1"
+let port = ref 8080
+
+let host_and_port () =
+  Command.Spec.(
+    empty
+    +> flag "-hostname" (optional_with_default "127.0.0.1" string)
+      ~doc:" Broker's hostname"
+    +> flag "-port" (optional_with_default 8080 int)
+      ~doc:" Broker's port"
+  )
+
+(** [start_async f] runs the function f, shutting down when it's done, and also calls
+    [Scheduler.go] to get the async event-loop started.  *)
+let run_under_async f =
+  don't_wait_for (f () >>| fun () -> shutdown 0);
+  never_returns (Scheduler.go ())
+
+let with_rpc_conn f ~host ~port =
+  Tcp.with_connection
+    (Tcp.to_host_and_port host port)
+    ~timeout:(sec 1.)
+    (fun r w ->
+      Rpc.Connection.create r w ~connection_state:()
+      >>= function
+      | Error exn -> raise exn
+      | Ok conn -> f conn
+    )
+
+let shell cmd args =
+  In_thread.run (fun () ->
+    try Ok (Core_extended.Shell.run_full cmd args)
+    with exn -> Error exn)
+
+let shutdown =
+  with_rpc_conn (fun conn ->
+    Rpc.Rpc.dispatch shutdown_rpc conn ()
+    >>= function
+    | Ok  () -> return ()
+    | Error exn -> eprintf "failed!\n"; return ()
+  )
+
+let shutdown_cmd =
+  Command.basic (host_and_port ())
+    ~summary:"Shut the broker down"
+    (fun host port ->
+      run_under_async (fun () -> shutdown ~host ~port))
+
+
+let publish ~topic ~text =
+  with_rpc_conn (fun conn ->
+    shell "whoami" []
+    >>= fun username ->
+    let username = Result.ok_exn username in
+    let from = Username.of_string (String.strip username) in
+    Rpc.Rpc.dispatch_exn publish_rpc conn
+      { Message.
+        text; topic; from; time = Time.now () }
+  )
+
+let pub_cmd = Command.basic
+  ~summary:"publish a single value"
+  Command.Spec.(
+    (host_and_port ())
+    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
+                          +> anon ("<text>" %: string)
+  )
+  (fun host port topic text ->
+    run_under_async (fun () -> publish ~host ~port ~topic ~text))
+
+let subscribe ~topic =
+  with_rpc_conn (fun conn ->
+    shell "clear" []
+    >>= fun clear_string ->
+    let clear_string =
+      (* if we're not on a terminal, just use the empty string *)
+      match clear_string with
+      | Ok s -> s | Error _ -> ""
+    in
+    Rpc.Pipe_rpc.dispatch subscribe_rpc conn topic
+    >>= function
+    | Error err -> Error.raise err
+    | Ok (Error s) -> eprintf "subscribe failed: %s\n" s; return ()
+    | Ok (Ok (pipe,_id)) ->
+      Pipe.iter pipe ~f:(fun msg ->
+        printf "%s%s\n%!" clear_string msg.Message.text;
+        return ()
+      ))
+
+let sub_cmd = Command.basic
+  ~summary:"subscribe to a topic"
+  Command.Spec.(
+    host_and_port ()
+    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
+  )
+  (fun host port topic -> run_under_async (fun () -> subscribe ~host ~port ~topic))
+
+
+let dump =
+  with_rpc_conn (fun conn ->
+    Rpc.Rpc.dispatch_exn dump_rpc conn ()
+    >>= fun dump ->
+    printf "%s\n"
+      (Dump.sexp_of_t dump |! Sexp.to_string_hum);
+    return ()
+  )
+
+let dump_cmd = Command.basic
+  ~summary:"Get a full dump of the broker's state"
+  (host_and_port ())
+  (fun host port -> run_under_async (fun () -> dump ~host ~port))
+
+let () =
+  Exn.handle_uncaught ~exit:true (fun () ->
+    Command.run
+      (Command.group ~summary:"Utilities for interacting with message broker"
+         [ "publish"  , pub_cmd
+         ; "subscribe", sub_cmd
+         ; "dump"     , dump_cmd
+         ; "shutdown" , shutdown_cmd
+         ]))
+
+
+open Core.Std
+open Async.Std
+
+let with_rpc_conn f ~host ~port =
+  Tcp.with_connection
+    (Tcp.to_host_and_port host port)
+    ~timeout:(sec 1.)
+    (fun r w ->
+      Rpc.Connection.create r w ~connection_state:()
+      >>= function
+      | Error exn -> raise exn
+      | Ok conn -> f conn
+    )
+open Async.Std
+
+val with_rpc_conn
+  :  (Rpc.Connection.t -> 'a Deferred.t)
+  -> host:string -> port:int
+  -> 'a Deferred.t
+open Core.Std
+open Async.Std
+open Protocol
+
+(* A publisher for a single topic *)
+module Topic_pub : sig
+  type t
+  val create : Message.t -> t
+  val publish : t -> Message.t -> unit
+  val subscribe : t -> Message.t Pipe.Reader.t
+  val num_subscribers : t -> int
+  val last_message : t -> Message.t
+end = struct
+  type t = { mutable last_message: Message.t;
+             mutable subscribers: Message.t Pipe.Writer.t list;
+           }
+  with fields
+
+  let create last_message =
+    { last_message; subscribers = [] }
+
+  let clear_closed t =
+    t.subscribers <-
+      List.filter t.subscribers ~f:(fun pipe ->
+        not (Pipe.is_closed pipe))
+
+  let publish t msg =
+    clear_closed t;
+    t.last_message <- msg;
+    List.iter t.subscribers ~f:(fun pipe ->
+      don't_wait_for (Pipe.write pipe msg))
+
+  let subscribe t =
+    let (r,w) = Pipe.create () in
+    don't_wait_for (Pipe.write w t.last_message);
+    t.subscribers <- w :: t.subscribers;
+    r
+
+  let num_subscribers t = List.length t.subscribers
+end
+
+type t = (Topic.t, Topic_pub.t) Hashtbl.t
+
+let create () = Topic.Table.create ()
+
+let publish t message =
+  let s =
+    Hashtbl.find_or_add t message.Message.topic
+      ~default:(fun () -> Topic_pub.create message)
+  in
+  Topic_pub.publish s message
+
+let subscribe t topic =
+  match Hashtbl.find t topic with
+  | None -> None
+  | Some s -> Some (Topic_pub.subscribe s)
+
+let subscribe t topic =
+  Option.map (Hashtbl.find t topic) ~f:Topic_pub.subscribe
+
+let subscribe t topic =
+  let open Option.Monad_infix in
+  Hashtbl.find t topic
+  >>| fun s ->
+  Topic_pub.subscribe s
+
+let dump t =
+  Hashtbl.to_alist t
+  |! List.map ~f:(fun (topic,tpub) ->
+    let num_subscribers = Topic_pub.num_subscribers tpub in
+    let message = Topic_pub.last_message tpub in
+    {Dump. topic; num_subscribers; message })
+open Core.Std
+(** A directory for storing last-values for a basic pub/sub system *)
+
+open Async.Std
+open Protocol
+
+type t
+
+(** Creates an empty directory *)
+val create : unit -> t
+
+val publish   : t -> Message.t -> unit
+
+(** If the topic is unknown, then None is returned *)
+val subscribe : t -> Topic.t -> Message.t Pipe.Reader.t option
+
+(** Creates a dump of the current state of directory *)
+val dump : t -> Dump.t
+open Core.Std
+open Async.Std
+
+let say_hello port host =
+  Common.with_rpc_conn ~port ~host (fun conn ->
+    Rpc.Rpc.dispatch Hello_protocol.hello_rpc conn "Hello"
+    >>= function
+    | Ok response -> printf "%s\n%!" response; return ()
+    | Error err ->
+      eprintf "An error occurred:\n%s\n%!"
+        (Error.to_string_hum err);
+      return ()
+  )
+
+let command =
+  Command.async_basic
+    ~summary:"Hello World client"
+    Command.Spec.(
+      empty
+      +> flag "-port" (optional_with_default 8012 int)
+        ~doc:" server port"
+      +> flag "-host" (optional_with_default "localhost" string)
+        ~doc:" server host"
+    )
+    (fun port host () ->
+      say_hello port host
+    )
+
+let () = Command.run command

hello_protocol.ml

+open Core.Std
+open Async.Std
+
+(** A simple RPC for saying hello *)
+let hello_rpc = Rpc.Rpc.create
+  ~name:"hello-world"
+  ~version:0
+  ~bin_query:String.bin_t
+  ~bin_response:String.bin_t
+
+open Core.Std
+open Async.Std
+
+let hello_impl () hello =
+  Clock.after (sec 0.1)
+  >>= fun () -> return (hello ^ " World!")
+
+let implementations =
+  [ Rpc.Rpc.implement Hello_protocol.hello_rpc hello_impl ]
+
+let start_server ~port =
+  let implementations =
+    match Rpc.Implementations.create ~implementations ~on_unknown_rpc:`Ignore with
+    | Ok x -> x
+    | Error (`Duplicate_implementations _) -> assert false
+  in
+  Tcp.Server.create
+    ~on_handler_error:`Ignore
+    (Tcp.on_port port)
+    (fun _addr r w ->
+      Rpc.Connection.server_with_close r w
+        ~connection_state:()
+        ~on_handshake_error:`Ignore
+        ~implementations
+    )
+  >>= fun server ->
+  Tcp.Server.close_finished server
+
+let command =
+  Command.async_basic
+    ~summary:"Hello World server"
+    Command.Spec.(
+      empty
+      +> flag "-port" (optional_with_default 8012 int)
+        ~doc:" server port"
+    )
+    (fun port () -> start_server ~port)
+
+let () = Command.run command
+open Core.Std
+open Async.Std
+
+module Username : Identifiable = String
+module Topic    : Identifiable = String
+
+module Message = struct
+  type t = { text: string;
+             topic: Topic.t;
+             from: Username.t;
+             time: Time.t;
+           }
+  with sexp, bin_io
+end
+
+let publish_rpc = Rpc.Rpc.create
+  ~name:"publish"
+  ~version:0
+  ~bin_query:Message.bin_t
+  ~bin_response:Unit.bin_t
+
+let subscribe_rpc = Rpc.Pipe_rpc.create
+  ~name:"subscribe"
+  ~version:0
+  ~bin_query:Topic.bin_t
+  ~bin_response:Message.bin_t
+  ~bin_error:String.bin_t
+
+module Dump = struct
+  type single = { topic : Topic.t;
+                  message : Message.t;
+                  num_subscribers: int; }
+  with sexp,bin_io
+  type t = single list with sexp,bin_io
+end
+
+let dump_rpc = Rpc.Rpc.create
+  ~name:"dump"
+  ~version:0
+  ~bin_query:Unit.bin_t
+  ~bin_response:Dump.bin_t
+
+
+let shutdown_rpc = Rpc.Rpc.create
+  ~name:"shutdown"
+  ~version:0
+  ~bin_query:Unit.bin_t
+  ~bin_response:Unit.bin_t
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.