Yaron Minsky avatar Yaron Minsky committed 1fd795b

further simplifications

Comments (0)

Files changed (5)

 (* First, we build the implementations *)
 
 let publish_impl dir msg =
-  Directory.publish dir msg;
-  return ()
+  return (Directory.publish dir msg)
 
 let subscribe_impl dir topic ~aborted =
   return (
       don't_wait_for (aborted >>| fun () -> Pipe.close_read pipe);
       Ok pipe
   )
-;;
 
 let dump_impl dir () =
   return (Directory.dump dir)
   ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
   ]
 
-let start_server port =
-  let implementations =
-    match
-      Rpc.Implementations.create
-        ~implementations
-        ~on_unknown_rpc:`Ignore
-    with
-    | Ok x -> x
-    | Error (`Duplicate_implementations _) -> assert false
-  in
-  let directory = Directory.create () in
-  Tcp.Server.create  ~on_handler_error:`Ignore
-    (Tcp.on_port port)
-    (fun _addr r w ->
-      Rpc.Connection.server_with_close r w
-        ~connection_state:directory
-        ~on_handshake_error:`Ignore
-        ~implementations
-    )
-  >>= fun server ->
-  (* deferred that becomes determined when the close of the socket is
-     finished *)
-  Tcp.Server.close_finished server
-
 let command = Command.async_basic
   ~summary:"Start the message broker server"
   Command.Spec.(empty +> Common.port_arg ())
-  (fun port () -> start_server port)
+  (fun port () ->
+    let directory = Directory.create () in
+    Common.start_server ~port ~implementations ~env:directory)
 
 let () =
   Exn.handle_uncaught ~exit:true (fun () -> Command.run command)
 open Core.Std
 open Async.Std
 
+let port_arg () =
+  Command.Spec.(
+    flag "-port" (optional_with_default 8080 int)
+      ~doc:" Broker's port"
+  )
+
+let host_arg () =
+  Command.Spec.(
+    flag "-hostname" (optional_with_default "127.0.0.1" string)
+      ~doc:" Broker's hostname"
+  )
+
 let with_rpc_conn f ~host ~port =
   Tcp.with_connection
     (Tcp.to_host_and_port host port)
       | Ok conn -> f conn
     )
 
-let port_arg () =
-  Command.Spec.(
-    flag "-port" (optional_with_default 8080 int)
-      ~doc:" Broker's port"
-  )
+let start_server ~env ~implementations ~port =
+  let implementations =
+    Rpc.Implementations.create ~on_unknown_rpc:`Ignore ~implementations
+  in
+  match implementations with
+  | Error (`Duplicate_implementations _) -> assert false
+  | Ok implementations ->
+    Tcp.Server.create
+      ~on_handler_error:`Ignore
+      (Tcp.on_port port)
+      (fun _addr r w ->
+        Rpc.Connection.server_with_close r w
+          ~connection_state:env
+          ~on_handshake_error:`Ignore
+          ~implementations
+      )
+    >>= fun server ->
+    Tcp.Server.close_finished server
 
-let host_arg () =
-  Command.Spec.(
-    flag "-hostname" (optional_with_default "127.0.0.1" string)
-      ~doc:" Broker's hostname"
-  )
 open Async.Std
 
+val port_arg : unit -> int    Command.Spec.param
+val host_arg : unit -> string Command.Spec.param
+
 val with_rpc_conn
   :  (Rpc.Connection.t -> 'a Deferred.t)
   -> host:string -> port:int
   -> 'a Deferred.t
 
-val port_arg : unit -> int    Command.Spec.param
-val host_arg : unit -> string Command.Spec.param
+val start_server
+  :  env:'a
+  -> implementations:'a Rpc.Implementation.t list
+  -> port:int
+  -> unit Deferred.t

hello_protocol.ml

 open Core.Std
 open Async.Std
 
-(** A simple RPC for saying hello *)
+(** The protocol for communicating between the hello client and server.
+    There's a single RPC call exposed, which lets you send and receive a
+    string.
+
+    The [bin_query] and [bin_response] arguments are values that contain logic
+    for binary serialization of the query and response types, in this case,
+    both strings.
+
+    The version number is used when you want to mint new versions of an RPC
+    without disturbing older versions.
+*)
+
 let hello_rpc = Rpc.Rpc.create
   ~name:"hello-world"
   ~version:0
 open Core.Std
 open Async.Std
 
+(* The implementation of the "hello" RPC.  The first argument is the environment
+   the query executes against, which in this case is trivial.
+
+   The RPC waits a 10th of a second before responding just to show how you do a
+   query whose implementation blocks.
+*)
 let hello_impl () hello =
   Clock.after (sec 0.1)
   >>= fun () -> return (hello ^ " World!")
 
+(* The list of RPC implementations supported by this server  *)
 let implementations =
   [ Rpc.Rpc.implement Hello_protocol.hello_rpc hello_impl ]
 
-let start_server ~port =
-  let implementations =
-    match Rpc.Implementations.create ~implementations ~on_unknown_rpc:`Ignore with
-    | Ok x -> x
-    | Error (`Duplicate_implementations _) -> assert false
-  in
-  Tcp.Server.create
-    ~on_handler_error:`Ignore
-    (Tcp.on_port port)
-    (fun _addr r w ->
-      Rpc.Connection.server_with_close r w
-        ~connection_state:()
-        ~on_handshake_error:`Ignore
-        ~implementations
-    )
-  >>= fun server ->
-  Tcp.Server.close_finished server
-
 let command =
   Command.async_basic
     ~summary:"Hello World server"
       +> flag "-port" (optional_with_default 8012 int)
         ~doc:" server port"
     )
-    (fun port () -> start_server ~port)
+    (fun port () -> Common.start_server ~env:() ~port ~implementations)
 
 let () = Command.run command
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.