core hello world / broker_server.ml

Yaron Minsky 67d10bf 

Yaron Minsky cb7475d 
Yaron Minsky 67d10bf 


Yaron Minsky a86abec 
Yaron Minsky da7f04f 

Yaron Minsky 1fd795b 
Yaron Minsky 67d10bf 
Yaron Minsky a86abec 
Yaron Minsky 67d10bf 







Yaron Minsky a86abec 
Yaron Minsky 67d10bf 

Yaron Minsky a86abec 
Yaron Minsky 895887c 
Yaron Minsky a86abec 
Yaron Minsky 67d10bf 

Yaron Minsky 26d36ea 


Yaron Minsky 67d10bf 






Yaron Minsky 26d36ea 

Yaron Minsky 67d10bf 

Yaron Minsky 4d862d0 


Yaron Minsky 895887c 
Yaron Minsky 4d862d0 
Yaron Minsky 895887c 
Yaron Minsky 4d862d0 



Yaron Minsky 895887c 





Yaron Minsky 4d862d0 
Yaron Minsky 895887c 

Yaron Minsky a86abec 
Yaron Minsky 1fd795b 
Yaron Minsky a86abec 




Yaron Minsky 895887c 

Yaron Minsky a86abec 

Yaron Minsky c88e3c2 
open Core.Std
open Async.Std
open Broker_protocol

(* First, we build the implementations *)

let publish_impl (dir,_) msg =
  Log.Global.sexp ~level:`Debug msg
    (Message.sexp_of_t);
  return (Directory.publish dir msg)

let subscribe_impl (dir,_) topic ~aborted =
  return (
    match Directory.subscribe dir topic with
    | None -> Error "Unknown topic"
    | Some pipe ->
      don't_wait_for (aborted >>| fun () -> Pipe.close_read pipe);
      Ok pipe
  )

let dump_impl (dir,_) () =
  return (Directory.dump dir)

let shutdown_impl (_,stop) () =
  Log.Global.info "Shutdown request";
  Ivar.fill_if_empty stop ();
  return ()

(* We then create a list of all the implementations we're going to support in
   the server. *)

let implementations =
  [ Rpc.Rpc.     implement publish_rpc   publish_impl
  ; Rpc.Pipe_rpc.implement subscribe_rpc subscribe_impl
  ; Rpc.Rpc.     implement dump_rpc      dump_impl
  ; Rpc.Rpc.     implement shutdown_rpc  shutdown_impl
  ]

(* Finally we create a command for starting the broker server *)

let command = Command.async_basic
  ~summary:"Start the message broker server"
  Command.Spec.(
    empty
    +> Common.port_arg ()
    +> flag "-fg" no_arg ~doc:" Run in the foreground (daemonize is the default)"
  )
  (fun port fg () ->
    (* We use a blocking call to get the working directory, because the Async
       scheduler isn't running yet.
    *)
    let basedir = Core.Std.Unix.getcwd () in
    let logfile = basedir ^/ "broker.log" in
    if not fg then
      Daemon.daemonize ()
        ~redirect_stdout:(`File_append logfile)
        ~redirect_stderr:(`File_append logfile)
        ~cd:basedir;
    Log.Global.set_output
      [ Log.Output.file `Text ~filename:logfile ];
    Log.Global.info "Starting up";
    let stop = Ivar.create () in
    let directory = Directory.create () in
    Common.start_server ()
      ~stop:(Ivar.read stop)
      ~port
      ~implementations
      ~env:(directory,stop)
    >>| fun () ->
    Log.Global.info "Shutting down"
  )

let () = Command.run command
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.