Yaron Minsky avatar Yaron Minsky committed 67d10bf

cleaned up broker command line handling

Comments (0)

Files changed (7)

broker.ml

-open Core.Std
-open Async.Std
-open Protocol
-
-(* CR yminsky: consider adding an RPC that blocks on the server side. *)
-let publish_impl dir msg =
-  Directory.publish dir msg;
-  return ()
-
-let subscribe_impl dir topic ~aborted =
-  return (
-    match Directory.subscribe dir topic with
-    | None -> Error "Unknown topic"
-    | Some pipe ->
-      don't_wait_for (aborted >>| fun () -> Pipe.close_read pipe);
-      Ok pipe
-  )
-;;
-
-let dump_impl dir () =
-  return (Directory.dump dir)
-
-let shutdown_impl _dir () =
-  (after (sec 0.1) >>> fun () -> shutdown 0);
-  return ()
-
-let implementations =
-  [ Rpc.Rpc.     implement publish_rpc   publish_impl
-  ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
-  ; Rpc.Rpc.     implement dump_rpc      dump_impl
-  ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
-  ]
-
-let start_server () =
-  let implementations =
-    match
-      Rpc.Implementations.create
-        ~implementations
-        ~on_unknown_rpc:`Ignore
-    with
-    | Ok x -> x
-    | Error (`Duplicate_implementations _) -> assert false
-  in
-  let directory = Directory.create () in
-  Tcp.Server.create  ~on_handler_error:`Ignore
-    (Tcp.on_port 8080)
-    (fun _addr r w ->
-      Rpc.Connection.server_with_close r w
-        ~connection_state:directory
-        ~on_handshake_error:`Ignore
-        ~implementations
-    )
-  >>= fun server ->
-  Tcp.Server.close_finished server
-
-let () =
-  don't_wait_for (start_server ());
-  never_returns (Scheduler.go ())
-
+open Core.Std
+open Async.Std
+open Protocol
+
+let shell cmd args =
+  In_thread.run (fun () ->
+    try Ok (Core_extended.Shell.run_full cmd args)
+    with exn -> Error exn)
+
+let shutdown =
+  Common.with_rpc_conn (fun conn ->
+    Rpc.Rpc.dispatch shutdown_rpc conn ()
+    >>= function
+    | Ok  () -> return ()
+    | Error err ->
+      eprintf "failed!\n%s\n" (Error.to_string_hum err);
+      return ()
+  )
+
+let host_and_port () =
+  Command.Spec.(
+    empty
+    +> Common.host_arg ()
+    +> Common.port_arg ()
+  )
+
+let shutdown_cmd =
+  Command.async_basic (host_and_port ())
+    ~summary:"Shut the broker down"
+    (fun host port () -> shutdown ~host ~port)
+
+
+let publish ~topic ~text =
+  Common.with_rpc_conn (fun conn ->
+    shell "whoami" []
+    >>= fun username ->
+    let username = Result.ok_exn username in
+    let from = Username.of_string (String.strip username) in
+    Rpc.Rpc.dispatch_exn publish_rpc conn
+      { Message.
+        text; topic; from; time = Time.now () }
+  )
+
+let pub_cmd = Command.async_basic
+  ~summary:"publish a single value"
+  Command.Spec.(
+    (host_and_port ())
+    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
+                          +> anon ("<text>" %: string)
+  )
+  (fun host port topic text () -> publish ~host ~port ~topic ~text)
+
+let subscribe ~topic =
+  Common.with_rpc_conn (fun conn ->
+    shell "clear" []
+    >>= fun clear_string ->
+    let clear_string =
+      (* if we're not on a terminal, just use the empty string *)
+      match clear_string with
+      | Ok s -> s | Error _ -> ""
+    in
+    Rpc.Pipe_rpc.dispatch subscribe_rpc conn topic
+    >>= function
+    | Error err -> Error.raise err
+    | Ok (Error s) -> eprintf "subscribe failed: %s\n" s; return ()
+    | Ok (Ok (pipe,_id)) ->
+      Pipe.iter pipe ~f:(fun msg ->
+        printf "%s%s\n%!" clear_string msg.Message.text;
+        return ()
+      ))
+
+let sub_cmd = Command.async_basic
+  ~summary:"subscribe to a topic"
+  Command.Spec.(
+    host_and_port ()
+    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
+  )
+  (fun host port topic () -> subscribe ~host ~port ~topic)
+
+
+let dump =
+  Common.with_rpc_conn (fun conn ->
+    Rpc.Rpc.dispatch_exn dump_rpc conn ()
+    >>= fun dump ->
+    printf "%s\n"
+      (Dump.sexp_of_t dump |! Sexp.to_string_hum);
+    return ()
+  )
+
+let dump_cmd = Command.async_basic
+  ~summary:"Get a full dump of the broker's state"
+  (host_and_port ())
+  (fun host port () -> dump ~host ~port)
+
+let () =
+  Exn.handle_uncaught ~exit:true (fun () ->
+    Command.run
+      (Command.group ~summary:"Utilities for interacting with message broker"
+         [ "publish"  , pub_cmd
+         ; "subscribe", sub_cmd
+         ; "dump"     , dump_cmd
+         ; "shutdown" , shutdown_cmd
+         ]))
+
+
+open Core.Std
+open Async.Std
+open Protocol
+
+(* First, we build the implementations *)
+
+let publish_impl dir msg =
+  Directory.publish dir msg;
+  return ()
+
+let subscribe_impl dir topic ~aborted =
+  return (
+    match Directory.subscribe dir topic with
+    | None -> Error "Unknown topic"
+    | Some pipe ->
+      don't_wait_for (aborted >>| fun () -> Pipe.close_read pipe);
+      Ok pipe
+  )
+;;
+
+let dump_impl dir () =
+  return (Directory.dump dir)
+
+let shutdown_impl _dir () =
+  (after (sec 0.1) >>> fun () -> shutdown 0);
+  shutdown 0;
+  return ()
+
+let implementations =
+  [ Rpc.Rpc.     implement publish_rpc   publish_impl
+  ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
+  ; Rpc.Rpc.     implement dump_rpc      dump_impl
+  ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
+  ]
+
+let start_server port =
+  let implementations =
+    match
+      Rpc.Implementations.create
+        ~implementations
+        ~on_unknown_rpc:`Ignore
+    with
+    | Ok x -> x
+    | Error (`Duplicate_implementations _) -> assert false
+  in
+  let directory = Directory.create () in
+  Tcp.Server.create  ~on_handler_error:`Ignore
+    (Tcp.on_port port)
+    (fun _addr r w ->
+      Rpc.Connection.server_with_close r w
+        ~connection_state:directory
+        ~on_handshake_error:`Ignore
+        ~implementations
+    )
+  >>= fun server ->
+  (* deferred that becomes determined when the close of the socket is
+     finished *)
+  Tcp.Server.close_finished server
+
+let command = Command.async_basic
+  ~summary:"Start the message broker server"
+  Command.Spec.(empty +> Common.port_arg ())
+  (fun port () -> start_server port)
+
+let () =
+  Exn.handle_uncaught ~exit:true (fun () -> Command.run command)
+
     hello_world.byte \
     hello_client.byte \
     hello_server.byte \
-    broker.byte \
-    client.byte \
+    broker_server.byte \
+    broker_client.byte \
 

client.ml

-open Core.Std
-open Async.Std
-open Protocol
-
-(* Defaults are set here, and can be overridden by command-line arguments. *)
-let hostname = ref "127.0.0.1"
-let port = ref 8080
-
-let host_and_port () =
-  Command.Spec.(
-    empty
-    +> flag "-hostname" (optional_with_default "127.0.0.1" string)
-      ~doc:" Broker's hostname"
-    +> flag "-port" (optional_with_default 8080 int)
-      ~doc:" Broker's port"
-  )
-
-(** [start_async f] runs the function f, shutting down when it's done, and also calls
-    [Scheduler.go] to get the async event-loop started.  *)
-let run_under_async f =
-  don't_wait_for (f () >>| fun () -> shutdown 0);
-  never_returns (Scheduler.go ())
-
-let with_rpc_conn f ~host ~port =
-  Tcp.with_connection
-    (Tcp.to_host_and_port host port)
-    ~timeout:(sec 1.)
-    (fun r w ->
-      Rpc.Connection.create r w ~connection_state:()
-      >>= function
-      | Error exn -> raise exn
-      | Ok conn -> f conn
-    )
-
-let shell cmd args =
-  In_thread.run (fun () ->
-    try Ok (Core_extended.Shell.run_full cmd args)
-    with exn -> Error exn)
-
-let shutdown =
-  with_rpc_conn (fun conn ->
-    Rpc.Rpc.dispatch shutdown_rpc conn ()
-    >>= function
-    | Ok  () -> return ()
-    | Error err ->
-      eprintf "failed!\n%s\n" (Error.to_string_hum err);
-      return ()
-  )
-
-let shutdown_cmd =
-  Command.basic (host_and_port ())
-    ~summary:"Shut the broker down"
-    (fun host port ->
-      run_under_async (fun () -> shutdown ~host ~port))
-
-
-let publish ~topic ~text =
-  with_rpc_conn (fun conn ->
-    shell "whoami" []
-    >>= fun username ->
-    let username = Result.ok_exn username in
-    let from = Username.of_string (String.strip username) in
-    Rpc.Rpc.dispatch_exn publish_rpc conn
-      { Message.
-        text; topic; from; time = Time.now () }
-  )
-
-let pub_cmd = Command.basic
-  ~summary:"publish a single value"
-  Command.Spec.(
-    (host_and_port ())
-    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
-                          +> anon ("<text>" %: string)
-  )
-  (fun host port topic text ->
-    run_under_async (fun () -> publish ~host ~port ~topic ~text))
-
-let subscribe ~topic =
-  with_rpc_conn (fun conn ->
-    shell "clear" []
-    >>= fun clear_string ->
-    let clear_string =
-      (* if we're not on a terminal, just use the empty string *)
-      match clear_string with
-      | Ok s -> s | Error _ -> ""
-    in
-    Rpc.Pipe_rpc.dispatch subscribe_rpc conn topic
-    >>= function
-    | Error err -> Error.raise err
-    | Ok (Error s) -> eprintf "subscribe failed: %s\n" s; return ()
-    | Ok (Ok (pipe,_id)) ->
-      Pipe.iter pipe ~f:(fun msg ->
-        printf "%s%s\n%!" clear_string msg.Message.text;
-        return ()
-      ))
-
-let sub_cmd = Command.basic
-  ~summary:"subscribe to a topic"
-  Command.Spec.(
-    host_and_port ()
-    +> anon ("<topic>" %: Arg_type.create Topic.of_string)
-  )
-  (fun host port topic -> run_under_async (fun () -> subscribe ~host ~port ~topic))
-
-
-let dump =
-  with_rpc_conn (fun conn ->
-    Rpc.Rpc.dispatch_exn dump_rpc conn ()
-    >>= fun dump ->
-    printf "%s\n"
-      (Dump.sexp_of_t dump |! Sexp.to_string_hum);
-    return ()
-  )
-
-let dump_cmd = Command.basic
-  ~summary:"Get a full dump of the broker's state"
-  (host_and_port ())
-  (fun host port -> run_under_async (fun () -> dump ~host ~port))
-
-let () =
-  Exn.handle_uncaught ~exit:true (fun () ->
-    Command.run
-      (Command.group ~summary:"Utilities for interacting with message broker"
-         [ "publish"  , pub_cmd
-         ; "subscribe", sub_cmd
-         ; "dump"     , dump_cmd
-         ; "shutdown" , shutdown_cmd
-         ]))
-
-
       | Error exn -> raise exn
       | Ok conn -> f conn
     )
+
+let port_arg () =
+  Command.Spec.(
+    flag "-port" (optional_with_default 8080 int)
+      ~doc:" Broker's port"
+  )
+
+let host_arg () =
+  Command.Spec.(
+    flag "-hostname" (optional_with_default "127.0.0.1" string)
+      ~doc:" Broker's hostname"
+  )
   :  (Rpc.Connection.t -> 'a Deferred.t)
   -> host:string -> port:int
   -> 'a Deferred.t
+
+val port_arg : unit -> int    Command.Spec.param
+val host_arg : unit -> string Command.Spec.param
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.