Commits

Yaron Minsky committed a86abec

changed shutdown

  • Participants
  • Parent commits 1fd795b

Comments (0)

Files changed (4)

 
 (* First, we build the implementations *)
 
-let publish_impl dir msg =
+let publish_impl (dir,_) msg =
   return (Directory.publish dir msg)
 
-let subscribe_impl dir topic ~aborted =
+let subscribe_impl (dir,_) topic ~aborted =
   return (
     match Directory.subscribe dir topic with
     | None -> Error "Unknown topic"
       Ok pipe
   )
 
-let dump_impl dir () =
+let dump_impl (dir,_) () =
   return (Directory.dump dir)
 
-let shutdown_impl _dir () =
-  (after (sec 0.1) >>> fun () -> shutdown 0);
+let shutdown_impl (_,stop) () =
+  Ivar.fill_if_empty stop ();
   return ()
 
 let implementations =
   ~summary:"Start the message broker server"
   Command.Spec.(empty +> Common.port_arg ())
   (fun port () ->
+    let stop = Ivar.create () in
     let directory = Directory.create () in
-    Common.start_server ~port ~implementations ~env:directory)
+    Common.start_server ()
+      ~stop:(Ivar.read stop)
+      ~port
+      ~implementations
+      ~env:(directory,stop)
+  )
+
 
 let () =
   Exn.handle_uncaught ~exit:true (fun () -> Command.run command)
       | Ok conn -> f conn
     )
 
-let start_server ~env ~implementations ~port =
+let start_server ~env ?(stop=Deferred.never ()) ~implementations ~port () =
   let implementations =
     Rpc.Implementations.create ~on_unknown_rpc:`Ignore ~implementations
   in
           ~implementations
       )
     >>= fun server ->
-    Tcp.Server.close_finished server
+    Deferred.any
+      [ (stop >>= fun () -> Tcp.Server.close server)
+      ; Tcp.Server.close_finished server ]
 
 
 val start_server
   :  env:'a
+  -> ?stop : unit Deferred.t
   -> implementations:'a Rpc.Implementation.t list
   -> port:int
+  -> unit
   -> unit Deferred.t
   Clock.after (sec 0.1)
   >>= fun () -> return (hello ^ " World!")
 
-(* The list of RPC implementations supported by this server  *)
+(* The list of RPC implementations supported by this server *)
 let implementations =
   [ Rpc.Rpc.implement Hello_protocol.hello_rpc hello_impl ]
 
+(* The command-line interface.  We use [async_basic] so that the command starts
+   the async scheduler, and exits when the server stops.  *)
 let command =
   Command.async_basic
     ~summary:"Hello World server"
       +> flag "-port" (optional_with_default 8012 int)
         ~doc:" server port"
     )
-    (fun port () -> Common.start_server ~env:() ~port ~implementations)
+    (fun port () -> Common.start_server ~env:() ~port ~implementations ())
 
 let () = Command.run command