Commits

Yaron Minsky committed 4d862d0

daemonization and logging

Comments (0)

Files changed (3)

     (fun host port sexp () -> dump ~host ~port ~sexp)
 
 let () =
-  Exn.handle_uncaught ~exit:true (fun () ->
-    Command.run
-      (Command.group ~summary:"Utilities for interacting with message broker"
-         [ "publish"  , pub_cmd
-         ; "subscribe", sub_cmd
-         ; "dump"     , dump_cmd
-         ; "shutdown" , shutdown_cmd
-         ]))
+  Log.Global.set_output [ Log.Output.file `Text ~filename:("client.log") ];
+  Command.run
+    (Command.group ~summary:"Utilities for interacting with message broker"
+       [ "publish"  , pub_cmd
+       ; "subscribe", sub_cmd
+       ; "dump"     , dump_cmd
+       ; "shutdown" , shutdown_cmd
+       ])
 (* First, we build the implementations *)
 
 let publish_impl (dir,_) msg =
+  Log.Global.info "Message published";
   return (Directory.publish dir msg)
 
 let subscribe_impl (dir,_) topic ~aborted =
+  Log.Global.info "Subscription started";
   return (
     match Directory.subscribe dir topic with
     | None -> Error "Unknown topic"
   )
 
 let dump_impl (dir,_) () =
+  Log.Global.info "Dump requested";
   return (Directory.dump dir)
 
 let shutdown_impl (_,stop) () =
+  Log.Global.info "Shutdown requested";
   Ivar.fill_if_empty stop ();
   return ()
 
 
 let command = Command.async_basic
   ~summary:"Start the message broker server"
-  Command.Spec.(empty +> Common.port_arg ())
+  Command.Spec.(
+    empty
+    +> Common.port_arg ()
+  )
   (fun port () ->
+    (* We use a blocking call to get the working directory, because the Async
+       scheduler isn't running yet.
+    *)
+    let basedir = Core.Std.Unix.getcwd () in
+(*
+    let finish_daemonize =
+      unstage
+        (Daemon.daemonize_wait ()
+           ~redirect_stdout:`Do_not_redirect
+           ~redirect_stderr:`Do_not_redirect
+           ~cd:basedir)
+    in
+*)
+    Log.Global.set_output
+      [ Log.Output.file `Text ~filename:(basedir ^/ "broker.log") ];
+    return () >>= fun () ->
+(*    finish_daemonize ();*)
+    Log.Global.info "Starting broker";
     let stop = Ivar.create () in
     let directory = Directory.create () in
+    Log.Global.info "Starting server";
     Common.start_server ()
       ~stop:(Ivar.read stop)
       ~port
   )
 
 let () = Command.run command
-
   )
 
 let with_rpc_conn f ~host ~port =
+  Log.Global.info "starting RPC connection to %s:%d" host port;
   Tcp.with_connection
     (Tcp.to_host_and_port host port)
     ~timeout:(sec 1.)
     (fun r w ->
+      Log.Global.info "Connection established";
       Rpc.Connection.create r w ~connection_state:()
       >>= function
-      | Error exn -> raise exn
-      | Ok conn -> f conn
+      | Error exn ->
+        Log.Global.info "Received error";
+        raise exn
+      | Ok conn ->
+        Log.Global.info "Calling handling function";
+        f conn
     )
 
 let start_server ~env ?(stop=Deferred.never ()) ~implementations ~port () =
+  Log.Global.info "starting server on %d" port;
   let implementations =
     Rpc.Implementations.create ~on_unknown_rpc:`Ignore ~implementations
   in
   match implementations with
   | Error (`Duplicate_implementations _) -> assert false
   | Ok implementations ->
+    Log.Global.info "About to start TCP server";
     Tcp.Server.create
-      ~on_handler_error:`Ignore
+      ~on_handler_error:(`Call (fun _ exn -> Log.Global.sexp exn Exn.sexp_of_t))
       (Tcp.on_port port)
       (fun _addr r w ->
         Rpc.Connection.server_with_close r w
           ~connection_state:env
-          ~on_handshake_error:`Ignore
+          ~on_handshake_error:(
+            `Call (fun exn -> Log.Global.sexp exn Exn.sexp_of_t; return ()))
           ~implementations
       )
     >>= fun server ->
+    Log.Global.info "TCP server started, waiting for close";
     Deferred.any
       [ (stop >>= fun () -> Tcp.Server.close server)
       ; Tcp.Server.close_finished server ]
-