Commits

Yaron Minsky  committed 46ea945

Added robust connection

  • Participants
  • Parent commits 7114cb2

Comments (0)

Files changed (5)

   Command.async_basic
     ~summary:"Monitor your time spent"
     (shared_flags ())
-    (fun user () -> setup_conn user (fun user conn ->
-       force clear_string
-       >>= fun cstring ->
-       let stop = force on_term_signal in
-       Clock.every' (sec 1.) ~stop (fun () ->
-         Rpc.Rpc.dispatch_exn Protocol.status conn user
-         >>| fun { Protocol.Status. state; elapsed } ->
-         let total_elapsed =
-           Map.data elapsed
-           |> List.fold ~init:Time.Span.zero ~f:Time.Span.(+)
-         in
-         printf "%s%s %s %s\n"
-           cstring
-           (spinner ())
-           (span_string total_elapsed)
-           (match state with
-            | `Active cat -> sprintf "{%s}" (Category.to_string cat)
-            | `Not_running -> "(paused)"
-           )
-       );
-       stop
-     ))
+    (fun user () -> setup_robust_conn ~reconnect_after:(sec 10.) user (fun user rconn ->
+         force clear_string
+         >>= fun cstring ->
+         let stop = force on_term_signal in
+         Clock.every' (sec 1.) ~stop (fun () ->
+             match Robust_connection.get rconn with
+             | None -> printf "%sWaiting for reconnection\n" cstring; return ()
+             | Some conn ->
+               try_with (fun () -> Rpc.Rpc.dispatch_exn Protocol.status conn user)
+               >>| function
+               | Error _ ->
+                 printf "%s%s Unable to reach server\n" cstring (spinner ())
+               | Ok { Protocol.Status. state; elapsed } ->
+                 let total_elapsed =
+                   Map.data elapsed
+                   |> List.fold ~init:Time.Span.zero ~f:Time.Span.(+)
+                 in
+                 printf "%s%s %s %s\n"
+                   cstring
+                   (spinner ())
+                   (span_string total_elapsed)
+                   (match state with
+                    | `Active cat -> sprintf "{%s}" (Category.to_string cat)
+                    | `Not_running -> "(paused)"
+                   )
+           );
+         stop
+       ))
 
 module Ascii_table = Textutils.Ascii_table
 

File client_common.ml

   | Some pwd -> Username.of_string pwd.Unix.Passwd.name
 ;;
 
-let setup_conn username k =
-  load_config ()
-  >>= fun config ->
-  (match username with
-   | Some x -> return x
-   | None ->
-     match config.Config.user with
-     | Some x -> return x
-     | None -> get_username ())
-  >>= fun username ->
-  Common.with_rpc_conn
-    ~host:config.Config.server
-    ~port:Common.port
-    (fun conn -> k username conn)
-
 let username_spec = Command.Spec.Arg_type.create Username.of_string
 let category_spec = Command.Spec.Arg_type.create Category.of_string
 
   force clear_string
   >>| fun clear_string ->
   print_endline clear_string
+
+
+let with_rpc_conn f ~host ~port =
+  Tcp.with_connection
+    (Tcp.to_host_and_port host port)
+    ~timeout:(sec 1.)
+    (fun _ r w ->
+       Rpc.Connection.create r w ~connection_state:()
+       >>= function
+         | Error exn -> raise exn
+         | Ok conn -> f conn
+    )
+
+let resolve_username username config =
+  match username with
+  | Some x -> return x
+  | None ->
+    match config.Config.user with
+    | Some x -> return x
+    | None -> get_username ()
+
+
+let setup_conn username k =
+  load_config ()
+  >>= fun config ->
+  resolve_username username config
+  >>= fun username ->
+  with_rpc_conn
+    ~host:config.Config.server
+    ~port:Common.port
+    (fun conn -> k username conn)
+
+module Robust_connection : sig
+  type t
+  val get : t -> Rpc.Connection.t option
+
+  val with_connect
+    :  reconnect_after:Time.Span.t
+    -> 'addr Tcp.where_to_connect
+    -> (t -> 'a Deferred.t)
+    -> 'a Deferred.t
+    
+end = struct
+  type t = Rpc.Connection.t option ref
+
+  let get t = !t
+
+  let create ~reconnect_after ~stop where_to_connect =
+    let t = ref None in
+    let rec connect_loop () =
+      if Deferred.is_determined stop then Deferred.unit
+      else (
+        try_with (fun () -> 
+            Tcp.connect ~interrupt:stop where_to_connect
+            >>= fun (_,r,w) ->
+            Rpc.Connection.create r w ~connection_state:()
+            >>= function
+            | Error exn -> raise exn
+            | Ok conn -> return conn            
+          )
+        >>= function
+        | Error _ ->
+          after reconnect_after
+          >>= fun () ->
+          connect_loop ()
+        | Ok conn ->
+          t := Some conn;
+          Rpc.Connection.close_finished conn
+          >>= fun () ->
+          t := None;
+          connect_loop ()
+      )
+    in
+    don't_wait_for (connect_loop ());
+    t
+
+  let with_connect ~reconnect_after where_to_connect k =
+    let stop = Ivar.create () in
+    let t = create where_to_connect
+        ~reconnect_after ~stop:(Ivar.read stop)
+    in
+    Monitor.protect (fun () -> k t)
+      ~finally:(fun () -> Ivar.fill stop (); Ivar.read stop)
+end
+
+let setup_robust_conn ~reconnect_after username k =
+  load_config ()
+  >>= fun config ->
+  resolve_username username config
+  >>= fun username ->
+  let host = config.Config.server in
+  let port = Common.port in
+  Robust_connection.with_connect
+    ~reconnect_after
+    (Tcp.to_host_and_port host port)
+    (fun conn -> k username conn)
+  
+    

File client_common.mli

 
 val load_config : unit -> Config.t Deferred.t
 val get_username : unit -> Username.t Deferred.t
-val setup_conn
-  :  Username.t option
-  -> (Username.t -> Rpc.Connection.t -> 'a Deferred.t)
-  -> 'a Deferred.t
 
 val username_spec : Username.t Command.Spec.Arg_type.t
 val category_spec : Category.t Command.Spec.Arg_type.t
 val shell_run : string -> string list -> string Deferred.t
 val clear_string : string Deferred.t lazy_t
 val clear_screen : unit -> unit Deferred.t
+
+val with_rpc_conn
+  :  (Rpc.Connection.t -> 'a Deferred.t)
+  -> host:string -> port:int
+  -> 'a Deferred.t
+
+val setup_conn
+  :  Username.t option
+  -> (Username.t -> Rpc.Connection.t -> 'a Deferred.t)
+  -> 'a Deferred.t
+
+module Robust_connection : sig
+  type t
+  val get : t -> Rpc.Connection.t option
+
+  val with_connect
+    :  reconnect_after:Time.Span.t
+    -> 'addr Tcp.where_to_connect
+    -> (t -> 'a Deferred.t)
+    -> 'a Deferred.t
+    
+end
+
+val setup_robust_conn
+  :  reconnect_after:Core.Span.t
+  -> Username.t option
+  -> (Username.t -> Robust_connection.t -> 'a Deferred.t)
+  -> 'a Deferred.t
+
 
 let port = 13874
 
-let with_rpc_conn f ~host ~port =
-  Tcp.with_connection
-    (Tcp.to_host_and_port host port)
-    ~timeout:(sec 1.)
-    (fun _ r w ->
-       Rpc.Connection.create r w ~connection_state:()
-       >>= function
-         | Error exn -> raise exn
-         | Ok conn -> f conn
-    )
-
 let validate_dir ~dir =
   let err s = return (error s dir <:sexp_of<string>>) in
   Sys.file_exists_exn dir
 
 val port : Int.t
 
-val with_rpc_conn
-  :  (Rpc.Connection.t -> 'a Deferred.t)
-  -> host:string -> port:int
-  -> 'a Deferred.t
-
 val validate_dir
   :  dir:string
   -> unit Or_error.t Deferred.t