Commits

Yaron Minsky committed 26d36ea

added some comments, support for ascii_table dump

Comments (0)

Files changed (3)

 open Core.Std
 open Async.Std
 open Broker_protocol
+module Ascii_table = Core_extended.Std.Ascii_table
 
 let shell cmd args =
   In_thread.run (fun () ->
   (fun host port topic () -> subscribe ~host ~port ~topic)
 
 
-let dump =
+let sexp_print_dump dump =
+  printf "%s\n"
+    (Dump.sexp_of_t dump |! Sexp.to_string_hum)
+
+let col = Ascii_table.Column.create
+let columns =
+  [ col "topic" (fun d -> Topic.to_string d.Dump.topic)
+  ; col "text"  (fun d -> d.Dump.message.Message.text) ~max_width:25
+  ; col "#sub"  (fun d-> Int.to_string d.Dump.num_subscribers)
+  ; col "time"  (fun d -> Time.to_sec_string d.Dump.message.Message.time)
+  ]
+
+let table_print_dump dump =
+  printf "%s\n%!"
+    (Ascii_table.to_string
+       ~display:Ascii_table.Display.line
+       ~limit_width_to:72
+       columns dump)
+
+let dump ~sexp =
   Common.with_rpc_conn (fun conn ->
     Rpc.Rpc.dispatch_exn dump_rpc conn ()
     >>= fun dump ->
-    printf "%s\n"
-      (Dump.sexp_of_t dump |! Sexp.to_string_hum);
+    (if sexp then sexp_print_dump dump else table_print_dump dump);
     return ()
   )
 
-let dump_cmd = Command.async_basic
-  ~summary:"Get a full dump of the broker's state"
-  (host_and_port ())
-  (fun host port () -> dump ~host ~port)
+let dump_cmd =
+  Command.async_basic
+    ~summary:"Get a full dump of the broker's state"
+    Command.Spec.(
+      host_and_port ()
+      +> flag "-sexp" no_arg ~doc:" Show as raw s-expression"
+    )
+    (fun host port sexp () -> dump ~host ~port ~sexp)
 
 let () =
   Exn.handle_uncaught ~exit:true (fun () ->
   Ivar.fill_if_empty stop ();
   return ()
 
+(* We then create a list of all the implementations we're going to support in
+   the server. *)
+
 let implementations =
   [ Rpc.Rpc.     implement publish_rpc   publish_impl
   ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
   ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
   ]
 
+(* Finally we create a command for starting the broker server *)
+
 let command = Command.async_basic
   ~summary:"Start the message broker server"
   Command.Spec.(empty +> Common.port_arg ())
 (** Creates an empty directory *)
 val create : unit -> t
 
-val publish   : t -> Message.t -> unit
+(** Publish a new message *)
+val publish : t -> Message.t -> unit
 
 (** If the topic is unknown, then None is returned *)
 val subscribe : t -> Topic.t -> Message.t Pipe.Reader.t option