Source

ocaml-core / base / async / extra / lib / tcp.ml

Diff from to

base/async/extra/lib/tcp.ml

    Writer.create ?buffer_age_limit fd)
 ;;
 
-exception Connection_attempt_aborted of string * int with sexp
+exception Connection_attempt_aborted of string with sexp
 
-let connect_sock ?(interrupt=Clock.after (sec 10.)) ~host ~port () =
+let connect_sock_gen ?(interrupt=Clock.after (sec 10.)) ~sock_type ~sock_addr () =
   Deferred.create (fun result ->
-    Unix.Inet_addr.of_string_or_getbyname host
-    >>> fun inet_addr ->
-    let addr = Socket.Address.inet inet_addr ~port in
-    let s = create_socket Socket.Type.tcp in
+    let s = create_socket sock_type in
     close_sock_on_error s (fun () ->
-      Socket.connect_interruptible s addr ~interrupt)
+      Socket.connect_interruptible s sock_addr ~interrupt)
     >>> function
-      | `Ok s -> Ivar.fill result s
-      | `Interrupted ->
-        whenever (Unix.close (Socket.fd s));
-        raise (Connection_attempt_aborted (host, port)))
+    | `Ok s -> Ivar.fill result s
+    | `Interrupted ->
+      whenever (Unix.close (Socket.fd s));
+      raise (Connection_attempt_aborted (Socket.Address.to_string sock_addr)))
+;;
+
+let connect_sock ?interrupt ~host ~port () =
+  Unix.Inet_addr.of_string_or_getbyname host >>= fun inet_addr ->
+  let sock_addr = Socket.Address.inet inet_addr ~port in
+  connect_sock_gen
+    ?interrupt
+    ~sock_type:Socket.Type.tcp
+    ~sock_addr ()
+;;
+
+let connect_sock_unix ?interrupt ~file () =
+  connect_sock_gen
+    ?interrupt
+    ~sock_type:Socket.Type.unix
+    ~sock_addr:(Socket.Address.unix file) ()
 ;;
 
 let close_connection r w =
   reader_writer_of_sock ?max_buffer_age ?reader_buffer_size s
 ;;
 
+let connect_unix ?max_buffer_age ?interrupt ?reader_buffer_size ~file () =
+  connect_sock_unix ?interrupt ~file ()
+  >>| fun s ->
+  reader_writer_of_sock ?max_buffer_age ?reader_buffer_size s
+;;
+
 let collect_errors writer f =
   let monitor = Writer.monitor writer in
   ignore (Monitor.errors monitor); (* don't propagate errors up, we handle them here *)
 
 exception Tcp_server_negative_max_connections of int with sexp
 
-let serve_gen ?(max_connections=10_000) ?max_pending_connections ?max_buffer_age 
+let serve_gen ?(max_connections=10_000) ?max_pending_connections ?max_buffer_age
     ~sock_type
     ~sock_addr
     ~on_handler_error handler =
 ;;
 
 let serve ?max_connections ?max_pending_connections ?max_buffer_age ~port
-    ~on_handler_error handler = 
+    ~on_handler_error handler =
   serve_gen ?max_connections ?max_pending_connections ?max_buffer_age
     ~sock_type:Socket.Type.tcp
     ~sock_addr:(Socket.Address.inet_addr_any ~port)
     handler
 
 let serve_unix ?max_connections ?max_pending_connections ?max_buffer_age ~file
-    ~on_handler_error handler = 
+    ~on_handler_error handler =
   serve_gen ?max_connections ?max_pending_connections ?max_buffer_age
     ~sock_type:Socket.Type.unix
     ~sock_addr:(Socket.Address.unix file)
     handler
 
 let connect_sock ~host ~port = connect_sock ~host ~port ()
+let connect_sock_unix ~file = connect_sock_unix ~file ()
 ;;