Yaron Minsky avatar Yaron Minsky committed 71a5f7a

Added clear_topic command

Comments (0)

Files changed (6)

     )
     (fun host port sexp () -> dump ~host ~port ~sexp)
 
+let clear topic =
+  Common.with_rpc_conn (fun conn ->
+    Rpc.Rpc.dispatch_exn clear_rpc conn topic)
+
+let clear_cmd =
+  Command.async_basic
+    ~summary:"Clear out a given topic"
+    Command.Spec.(
+      host_and_port ()
+      +> anon ("<topic>" %: Arg_type.create Topic.of_string)
+    )
+    (fun host port topic () -> clear topic ~host ~port)
+
 let () =
   Command.run
     (Command.group ~summary:"Utilities for interacting with message broker"
        ; "subscribe", sub_cmd
        ; "dump"     , dump_cmd
        ; "shutdown" , shutdown_cmd
+       ; "clear"    , clear_cmd
        ])

broker_protocol.ml

   ~version:0
   ~bin_query:Unit.bin_t
   ~bin_response:Unit.bin_t
+
+let clear_rpc = Rpc.Rpc.create
+  ~name:"clear"
+  ~version:0
+  ~bin_query:Topic.bin_t
+  ~bin_response:Unit.bin_t
   Ivar.fill_if_empty stop ();
   return ()
 
+let clear_impl (dir,_) topic =
+  Log.Global.info "Clearing topic %s" (Topic.to_string topic);
+  Directory.clear_topic dir topic;
+  return ()
+
 (* We then create a list of all the implementations we're going to support in
    the server. *)
 
   ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
   ; Rpc.Rpc.     implement dump_rpc      dump_impl
   ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
+  ; Rpc.Rpc.     implement clear_rpc     clear_impl
   ]
 
 (* Finally we create a command for starting the broker server *)
   val subscribe : t -> Message.t Pipe.Reader.t
   val num_subscribers : t -> int
   val last_message : t -> Message.t
+  val clear : t -> unit
 end = struct
   type t = { mutable last_message: Message.t;
              mutable subscribers: Message.t Pipe.Writer.t list;
       List.filter t.subscribers ~f:(fun pipe ->
         not (Pipe.is_closed pipe))
 
+  let clear t =
+    List.iter t.subscribers ~f:Pipe.close;
+    clear_closed t
+
   let publish t msg =
     clear_closed t;
     t.last_message <- msg;
 
 let create () = Topic.Table.create ()
 
+let clear_topic t topic =
+  match Hashtbl.find t topic with
+  | None -> ()
+  | Some s -> Topic_pub.clear s
+
 let publish t message =
   let s =
     Hashtbl.find_or_add t message.Message.topic
 
 (** Creates a dump of the current state of directory *)
 val dump : t -> Dump.t
+
+val clear_topic : t -> Topic.t -> unit

workshop/part_II.org

 ** Exercise
  - Add a new RPC for requesting the time of day (using Time.now).
 * Code walkthrough: Message Broker
+** Exercise
+ Add a disconnect operation
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.