Commits

Oliver Gu committed 7784b79

Moved client code into Tws module

Comments (0)

Files changed (12)

 # OASIS_START
-# DO NOT EDIT (digest: cf5964847d2881cdd46dbb69f0de93ba)
+# DO NOT EDIT (digest: f867f286f6d43507e7e0a5cea9db4ce8)
 lib/Config
 lib/Std_internal
 lib/Send_tag
 lib/Raw_order
 lib/Order
 lib/Ib
-lib/Client
 lib/Tws_reqs
 lib/Tws_result
 lib/Tws
 lib/Client_id
-lib/Client_intf
 lib/Std
 # OASIS_STOP
                   Raw_order,
                   Order,
                   Ib,
-                  Client,
                   Tws_reqs,
                   Tws_result,
                   Tws,
                   Client_id,
-                  Client_intf,
                   Std
   BuildDepends:   core  (>= 109.10.00),
                   async (>= 109.35.00),
   BuildDepends:   ibx
   CompiledObject: best
 
-Executable "print-option-data"
-  Build$:         flag(examples)
-  Path:           examples
-  MainIs:         print_option_data.ml
-  Install:        false
-  BuildDepends:   ibx
-  CompiledObject: best
-
 Executable "quote-table"
   Build$:         flag(examples)
   Path:           examples
 # OASIS_START
-# DO NOT EDIT (digest: 47a84d34708367cf9b8c29a8764d48f4)
+# DO NOT EDIT (digest: 47edbb26a464e27295966367d7ab00d2)
 # Ignore VCS directories, you can use the same kind of rule outside 
 # OASIS_START/STOP if you want to exclude directories that contains 
 # useless stuff for the build process
 "lib/raw_order.cmx": for-pack(Ibx)
 "lib/order.cmx": for-pack(Ibx)
 "lib/ib.cmx": for-pack(Ibx)
-"lib/client.cmx": for-pack(Ibx)
 "lib/tws_reqs.cmx": for-pack(Ibx)
 "lib/tws_result.cmx": for-pack(Ibx)
 "lib/tws.cmx": for-pack(Ibx)
 "lib/client_id.cmx": for-pack(Ibx)
-"lib/client_intf.cmx": for-pack(Ibx)
 "lib/std.cmx": for-pack(Ibx)
 <lib/*.ml{,i}>: pkg_core
 <lib/*.ml{,i}>: pkg_async
 <examples/print_market_depth.{native,byte}>: pkg_textutils
 <examples/print_market_depth.{native,byte}>: pkg_threads
 <examples/print_market_depth.{native,byte}>: pkg_str
-# Executable print-option-data
-<examples/print_option_data.{native,byte}>: use_ibx
-<examples/print_option_data.{native,byte}>: pkg_core
-<examples/print_option_data.{native,byte}>: pkg_async
-<examples/print_option_data.{native,byte}>: pkg_sexplib.syntax
-<examples/print_option_data.{native,byte}>: pkg_fieldslib.syntax
-<examples/print_option_data.{native,byte}>: pkg_textutils
-<examples/print_option_data.{native,byte}>: pkg_threads
-<examples/print_option_data.{native,byte}>: pkg_str
 # Executable quote-table
 <examples/quote_table.{native,byte}>: use_ibx
 <examples/quote_table.{native,byte}>: pkg_core

examples/print_option_data.ml

-open Core.Std
-open Async.Std
-open Ibx.Std
-
-(* This example demonstrates how to customize a client. *)
-
-module Tws_req : sig
-  val req_option_data : (Query.Market_data.t, Tick_option.t) Ib.Streaming_request.t
-end = struct
-  module S = Send_tag
-  module R = Recv_tag
-
-  let req_option_data = Ib.Streaming_request.create
-    ~send_header:(Ib.Header.create ~tag:S.Market_data        ~version:9)
-    ~canc_header:(Ib.Header.create ~tag:S.Cancel_market_data ~version:1)
-    ~recv_header:[
-      Ib.Header.create ~tag:R.Tick_option  ~version:6;
-    ]
-    ~skip_header:[
-      Ib.Header.create ~tag:R.Tick_price   ~version:6;
-      Ib.Header.create ~tag:R.Tick_size    ~version:6;
-      Ib.Header.create ~tag:R.Tick_string  ~version:6;
-      Ib.Header.create ~tag:R.Tick_generic ~version:6;
-    ]
-    ~tws_query:Query.Market_data.pickler
-    ~tws_response:[Response.Tick_option.unpickler]
-    ()
-end
-
-module Option_client : sig
-  type t
-  include Client_intf.S with type t := t
-
-  val option_data
-    :  t
-    -> option:[ `Option ] Contract.t
-    -> (Tick_option.t Tws_result.t Pipe.Reader.t * Query_id.t) Or_error.t Deferred.t
-
-  val option_data_exn
-    :  t
-    -> option:[ `Option ] Contract.t
-    -> (Tick_option.t Pipe.Reader.t * Query_id.t) Deferred.t
-
-  val cancel_option_data : t -> Query_id.t -> unit
-end = struct
-  include Ibx.Client
-
-  let option_data t ~option =
-    let q =
-      Query.Market_data.create
-        ~contract:option
-        ~tick_generics:[]
-        ~snapshot:false
-    in
-    dispatch_streaming_request t Tws_req.req_option_data q
-
-  let option_data_exn t ~option =
-    option_data t ~option >>| function
-    | Error e -> raise (Error.to_exn e)
-    | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
-
-  let cancel_option_data t id =
-    cancel_streaming_request t Tws_req.req_option_data id
-end
-
-let print_option_data ~duration =
-  Option_client.with_client
-    ~on_handler_error:(`Call (fun e ->
-      prerr_endline (Error.to_string_hum e);
-      shutdown 1
-    ))
-    (fun clt ->
-      (* For current contracts check: http://finance.yahoo.com/q/op?s=GOOG *)
-      let goog_call =
-        Contract.option
-          ~id:(Contract_id.of_string "116034093")
-          ~exchange:`CBOE
-          ~currency:`USD
-          ~option_right:`Call
-          ~expiry:(Date.create_exn ~y:2013 ~m:Month.Jun ~d:21)
-          ~strike:(Price.of_float 850.)
-          (Symbol.of_string "GOOG")
-      in
-      Option_client.option_data_exn clt ~option:goog_call
-      >>= fun (option_ticks, id) ->
-      upon (after duration) (fun () -> Option_client.cancel_option_data clt id);
-      Pipe.iter_without_pushback option_ticks ~f:(fun option_tick ->
-        print_endline (Sexp.to_string_hum (Tick_option.sexp_of_t option_tick)))
-    )
-
-let command =
-  Command.async_basic ~summary:"print option data"
-    Command.Spec.(
-      empty
-      +> Common.host_arg ()
-      +> Common.port_arg ()
-      +> Common.duration_arg ()
-    )
-    (fun host port duration () ->
-      print_option_data ~host ~port ~duration
-    )
-
-let () = Command.run command

lib/client.ml

-(* File: client.ml
-
-   IBX - OCaml implementation of the Interactive Brokers TWS API
-
-   Copyright (C) 2013-  Oliver Gu
-   email: gu.oliver@yahoo.com
-
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-*)
-
-open Core.Std
-open Async.Std
-
-include struct
-  open Response
-  module Execution_report  = Execution_report
-  module Commission_report = Commission_report
-end
-
-module Client_msg = struct
-  module Control = struct
-    type t =
-    | Connecting of [ `Client_version of int ] * Host_and_port.t
-    | Connected  of [ `Server_version of int ] * Time.t
-    | Disconnected
-    with sexp
-  end
-
-  type t =
-  | Control of Control.t
-  | Status  of string
-  | Error   of Error.t
-  with sexp
-end
-
-module Query_id = Ib.Streaming_request.Id
-
-type t =
-  { remote_host    : string;
-    remote_port    : int;
-    client_id      : Client_id.t;
-    client_version : int;
-    enable_logging : bool;
-    mutable con :
-      [ `Disconnected
-      | `Connecting of unit -> unit
-      | `Connected of Ib.Connection.t ];
-    mutable server_version  : int option;
-    mutable connection_time : Time.t option;
-    mutable account_code    : Account_code.t option;
-    messages           : Client_msg.t Tail.t;
-    execution_reports  : Execution_report.t Tail.t;
-    commission_reports : Commission_report.t Tail.t;
-  }
-
-let create
-    ?(enable_logging = false)
-    ?(client_id      = Client_id.of_int_exn 0)
-    ~host
-    ~port
-    () =
-  return
-    { client_id;
-      enable_logging;
-      remote_host        = host;
-      remote_port        = port;
-      client_version     = Config.client_version;
-      con                = `Disconnected;
-      server_version     = None;
-      connection_time    = None;
-      account_code       = None;
-      messages           = Tail.create ();
-      execution_reports  = Tail.create ();
-      commission_reports = Tail.create ();
-    }
-
-exception Not_connected_yet with sexp
-
-exception Eof_from_client with sexp
-
-exception Server_version_too_small of int * [ `Min of int ] with sexp
-
-let ignore_errors f = don't_wait_for (Monitor.try_with f >>| ignore)
-
-let connect t =
-  let module C = Client_msg in
-  let module E = Client_msg.Control in
-  match Result.try_with (fun () -> Socket.create Socket.Type.tcp) with
-  | Error exn ->
-    Tail.extend t.messages (C.Error (Error.of_exn exn));
-    t.con <- `Disconnected;
-    return ()
-  | Ok s ->
-    let close_socket exn =
-      t.con <- `Disconnected;
-      Tail.extend t.messages (C.Error (Error.of_exn exn));
-      ignore_errors (fun () -> Unix.close (Socket.fd s));
-    in
-    Tail.extend t.messages (C.Control (E.Connecting (
-      `Client_version t.client_version,
-      Host_and_port.create ~host:t.remote_host ~port:t.remote_port
-    )));
-    t.con <- `Connecting (fun () -> close_socket Not_connected_yet);
-    Monitor.try_with ~name:"connect socket" (fun () ->
-      Unix.Inet_addr.of_string_or_getbyname t.remote_host
-      >>= fun inet_addr ->
-      let address = Socket.Address.Inet.create inet_addr ~port:t.remote_port in
-      Socket.connect s address)
-    >>= function
-    | Error exn ->
-      close_socket (Monitor.extract_exn exn);
-      return ()
-    | Ok s ->
-      let fd = Socket.fd s in
-      Ib.Connection.create
-        ~enable_logging:t.enable_logging
-        ~extend_error:(fun e ->
-          Tail.extend t.messages (C.Error e))
-        ~extend_status:(fun s ->
-          Tail.extend t.messages (C.Status s))
-        ~extend_execution_report:(fun x ->
-          Tail.extend t.execution_reports x)
-        ~extend_commission_report:(fun x ->
-          Tail.extend t.commission_reports x)
-        (Reader.create fd)
-        (Writer.create fd)
-      >>= fun con ->
-      let close_connection exn =
-        t.con <- `Disconnected;
-        Tail.extend t.messages (C.Error (Error.of_exn exn));
-        Ib.Connection.close con
-      in
-      Monitor.try_with ~name:"try connect" (fun () ->
-        let module H = Ib.Connection.Handshake_result in
-        Ib.Connection.try_connect con
-          ~client_version:t.client_version
-          ~client_id:t.client_id
-        >>| function
-        | Error e -> Error.raise e
-        | Ok handshake_result ->
-          begin match handshake_result with
-          | H.Eof ->
-            raise Eof_from_client
-          | H.Version_failure version ->
-            raise (Server_version_too_small (version, `Min Config.server_version))
-          | H.Server_header (`Version version, conn_time, account_code) ->
-            t.con <- `Connected con;
-            t.server_version  <- Some version;
-            t.connection_time <- Some conn_time;
-            t.account_code    <- Some account_code;
-            Tail.extend t.messages (C.Control (
-              E.Connected (`Server_version version, conn_time)));
-          end)
-      >>= function
-      | Error exn -> close_connection (Monitor.extract_exn exn)
-      | Ok () -> return ()
-
-let messages t = Tail.collect t.messages
-let execution_reports  t = Tail.collect t.execution_reports
-let commission_reports t = Tail.collect t.commission_reports
-
-let client_id       t = t.client_id
-let server_version  t = t.server_version
-let connection_time t = t.connection_time
-let account_code    t = t.account_code
-
-let is_connected t =
-  match t.con with
-  | `Disconnected
-  | `Connecting _ -> false
-  | `Connected _  -> true
-
-let state t =
-  match t.con with
-  | `Disconnected -> `Disconnected
-  | `Connecting _ -> `Connecting
-  | `Connected  _ -> `Connected
-
-let set_server_log_level t ~level =
-  match t.con with
-  | `Disconnected
-  | `Connecting _  -> ()
-  | `Connected con -> Ib.Connection.set_server_log_level con ~level
-
-let disconnect t =
-  let module C = Client_msg in
-  let module E = Client_msg.Control in
-  match t.con with
-  | `Disconnected -> return ()
-  | `Connecting close -> return (close ())
-  | `Connected con ->
-    t.con <- `Disconnected;
-    Tail.extend t.messages (C.Control E.Disconnected);
-    Ib.Connection.close con
-
-let with_client
-    ?enable_logging
-    ?client_id
-    ~host
-    ~port
-    ~on_handler_error
-    handler =
-  let module C = Client_msg in
-  let handle_error e =
-    match on_handler_error with
-    | `Ignore -> ()
-    | `Raise  -> Error.raise e
-    | `Call f -> f e
-  in
-  create ?enable_logging ?client_id ~host ~port ()
-  >>= fun t ->
-  Stream.iter (messages t) ~f:(fun clt_msg ->
-    begin
-      match clt_msg, t.enable_logging with
-      | C.Control x, true ->
-        Log.Global.sexp ~level:`Info x <:sexp_of< C.Control.t >>
-      | C.Status x, true ->
-        Log.Global.sexp ~level:`Info x <:sexp_of< string >>
-      | C.Error e, true ->
-        Log.Global.sexp ~level:`Error e <:sexp_of< Error.t >>;
-        handle_error e
-      | C.Error e, false ->
-        handle_error e
-      | _ -> ()
-    end);
-  connect t >>= fun () ->
-  match state t with
-  | `Connected ->
-    Monitor.try_with (fun () -> handler t) >>= (function
-    | Error exn ->
-      disconnect t
-      >>| fun () ->
-      handle_error (Error.of_exn (Monitor.extract_exn exn));
-    | Ok () ->
-      disconnect t)
-  | _ -> return ()
-
-let dispatch_request t req query = match t.con with
-  | `Disconnected
-  | `Connecting _  -> return (Or_error.of_exn Not_connected_yet)
-  | `Connected con -> Ib.Request.dispatch req con query
-
-let dispatch_streaming_request t req query = match t.con with
-  | `Disconnected
-  | `Connecting _  -> return (Or_error.of_exn Not_connected_yet)
-  | `Connected con -> Ib.Streaming_request.dispatch req con query
-
-let cancel_streaming_request t req id = match t.con with
-  | `Disconnected
-  | `Connecting _  -> ()
-  | `Connected con -> Ib.Streaming_request.cancel req con id
-
-let dispatch_and_cancel t req query =
-  dispatch_streaming_request t req query
-  >>= function
-  | Error _ as x -> return x
-  | Ok (reader, id) ->
-    Pipe.read_at_most reader ~num_values:1
-    >>| fun read_result ->
-    Exn.protectx read_result ~f:(function
-    | `Eof -> Or_error.of_exn Eof_from_client
-    | `Ok result -> Ok (Queue.dequeue_exn result)
-    ) ~finally:(fun _ -> cancel_streaming_request t req id)
-
-let dispatch_streaming_request' t req query = match t.con with
-  | `Disconnected
-  | `Connecting _  -> return (Or_error.of_exn Not_connected_yet)
-  | `Connected con -> Ib.Streaming_request_without_id.dispatch req con query
-
-let cancel_streaming_request' t req = match t.con with
-  | `Disconnected
-  | `Connecting _  -> ()
-  | `Connected con -> Ib.Streaming_request_without_id.cancel req con

lib/client.mli

-(* File: client.mli
-
-   IBX - OCaml implementation of the Interactive Brokers TWS API
-
-   Copyright (C) 2013-  Oliver Gu
-   email: gu.oliver@yahoo.com
-
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-*)
-
-(** A module for building TWS clients *)
-
-open Core.Std
-open Async.Std
-
-type t
-include Client_intf.S with type t := t
-
-(** Creates a new (initially disconnected) client. *)
-val create
-  :  ?enable_logging:bool
-  -> ?client_id:Client_id.t
-  -> host:string
-  -> port:int
-  -> unit
-  -> t Deferred.t
-
-(** [connect t] initiates a connection and returns a deferred that becomes
-    determined when the connection is established. *)
-val connect : t -> unit Deferred.t
-
-val disconnect : t -> unit Deferred.t
-
-val dispatch_request
-  :  t
-  -> ('query, 'response) Ib.Request.t
-  -> 'query
-  -> 'response Or_error.t Deferred.t
-
-val dispatch_streaming_request
-  :  t
-  -> ('query, 'response) Ib.Streaming_request.t
-  -> 'query
-  -> ('response Tws_result.t Pipe.Reader.t * Query_id.t) Or_error.t Deferred.t
-
-val cancel_streaming_request
-  :  t
-  -> (_, _) Ib.Streaming_request.t
-  -> Query_id.t
-  -> unit
-
-val dispatch_and_cancel
-  :  t
-  -> ('query, 'response) Ib.Streaming_request.t
-  -> 'query
-  -> 'response Tws_result.t Or_error.t Deferred.t
-
-val dispatch_streaming_request'
-  :  t
-  -> ('query, 'response) Ib.Streaming_request_without_id.t
-  -> 'query
-  -> ('response Pipe.Reader.t) Or_error.t Deferred.t
-
-val cancel_streaming_request'
-  :  t
-  -> (_, _) Ib.Streaming_request_without_id.t
-  -> unit

lib/client_intf.ml

-(* File: client_intf.ml
-
-   IBX - OCaml implementation of the Interactive Brokers TWS API
-
-   Copyright (C) 2013-  Oliver Gu
-   email: gu.oliver@yahoo.com
-
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-*)
-
-(** A signature for customized TWS clients *)
-
-open Core.Std
-open Async.Std
-open Response
-
-module type S = sig
-  module Query_id : Unique_id
-
-  type t
-
-  (** [with_client ~host ~port ~on_handler_error handler] connects to the
-      IB connectivity software on ([host], [port]) and runs the [handler]
-      until an exception is thrown or the returned Deferred is determined.
-
-      [on_handler_error] determines what happens if the [handler] throws an
-      exception.
-
-      The standard port for TWS is 7496 and for the IB Gateway it is 4001.
-  *)
-  val with_client
-    :  ?enable_logging:bool
-    -> ?client_id:Client_id.t
-    -> host:string
-    -> port:int
-    -> on_handler_error:[
-    | `Raise
-    | `Ignore
-    | `Call of (Error.t -> unit)
-    ]
-    -> (t -> unit Deferred.t)
-    -> unit Deferred.t
-
-  val is_connected : t -> bool
-
-  (** [state t] returns the state of the connection. *)
-  val state : t -> [ `Disconnected | `Connecting | `Connected ]
-
-  (** [set_server_log_level level] sets the log entry detail [level] of TWS
-      when processing API requests. *)
-  val set_server_log_level
-    :  t
-    -> level:[
-    | `System
-    | `Error
-    | `Warning
-    | `Information
-    | `Detail
-    ]
-    -> unit
-
-  val execution_reports : t -> Execution_report.t Stream.t
-
-  val commission_reports : t -> Commission_report.t Stream.t
-
-  val client_id : t -> Client_id.t
-
-  val server_version : t -> int option
-
-  val connection_time : t -> Time.t option
-
-  val account_code : t -> Account_code.t option
-
-end
 # OASIS_START
-# DO NOT EDIT (digest: de1094d85c42871344047a22f26fdf6b)
+# DO NOT EDIT (digest: 0f0249607659760c819d4fcc9d589eb4)
 Config
 Std_internal
 Send_tag
 Raw_order
 Order
 Ib
-Client
 Tws_reqs
 Tws_result
 Tws
 Client_id
-Client_intf
 Std
 # OASIS_STOP
 module Trade             = Tws.Trade
 module TAQ               = Tws.TAQ
 module Client_id         = Client_id
-module Client_intf       = Client_intf
 open Std_internal
 open Response
 
-include Client
+module Client_msg = struct
+  module Control = struct
+    type t =
+    | Connecting of [ `Client_version of int ] * Host_and_port.t
+    | Connected  of [ `Server_version of int ] * Time.t
+    | Disconnected
+    with sexp
+  end
 
-(* Connection and server *)
+  type t =
+  | Control of Control.t
+  | Status  of string
+  | Error   of Error.t
+  with sexp
+end
+
+module Query_id = Ib.Streaming_request.Id
+
+type t =
+  { remote_host    : string;
+    remote_port    : int;
+    client_id      : Client_id.t;
+    client_version : int;
+    enable_logging : bool;
+    mutable con :
+      [ `Disconnected
+      | `Connecting of unit -> unit
+      | `Connected of Ib.Connection.t ];
+    mutable server_version  : int option;
+    mutable connection_time : Time.t option;
+    mutable account_code    : Account_code.t option;
+    messages           : Client_msg.t Tail.t;
+    execution_reports  : Execution_report.t Tail.t;
+    commission_reports : Commission_report.t Tail.t;
+  }
+
+let create
+    ?(enable_logging = false)
+    ?(client_id      = Client_id.of_int_exn 0)
+    ~host
+    ~port
+    () =
+  return
+    { client_id;
+      enable_logging;
+      remote_host        = host;
+      remote_port        = port;
+      client_version     = Config.client_version;
+      con                = `Disconnected;
+      server_version     = None;
+      connection_time    = None;
+      account_code       = None;
+      messages           = Tail.create ();
+      execution_reports  = Tail.create ();
+      commission_reports = Tail.create ();
+    }
+
+exception Not_connected_yet with sexp
+
+exception Eof_from_client with sexp
+
+exception Server_version_too_small of int * [ `Min of int ] with sexp
+
+let ignore_errors f = don't_wait_for (Monitor.try_with f >>| ignore)
+
+let connect t =
+  let module C = Client_msg in
+  let module E = Client_msg.Control in
+  match Result.try_with (fun () -> Socket.create Socket.Type.tcp) with
+  | Error exn ->
+    Tail.extend t.messages (C.Error (Error.of_exn exn));
+    t.con <- `Disconnected;
+    return ()
+  | Ok s ->
+    let close_socket exn =
+      t.con <- `Disconnected;
+      Tail.extend t.messages (C.Error (Error.of_exn exn));
+      ignore_errors (fun () -> Unix.close (Socket.fd s));
+    in
+    Tail.extend t.messages (C.Control (E.Connecting (
+      `Client_version t.client_version,
+      Host_and_port.create ~host:t.remote_host ~port:t.remote_port
+    )));
+    t.con <- `Connecting (fun () -> close_socket Not_connected_yet);
+    Monitor.try_with ~name:"connect socket" (fun () ->
+      Unix.Inet_addr.of_string_or_getbyname t.remote_host
+      >>= fun inet_addr ->
+      let address = Socket.Address.Inet.create inet_addr ~port:t.remote_port in
+      Socket.connect s address)
+    >>= function
+    | Error exn ->
+      close_socket (Monitor.extract_exn exn);
+      return ()
+    | Ok s ->
+      let fd = Socket.fd s in
+      Ib.Connection.create
+        ~enable_logging:t.enable_logging
+        ~extend_error:(fun e ->
+          Tail.extend t.messages (C.Error e))
+        ~extend_status:(fun s ->
+          Tail.extend t.messages (C.Status s))
+        ~extend_execution_report:(fun x ->
+          Tail.extend t.execution_reports x)
+        ~extend_commission_report:(fun x ->
+          Tail.extend t.commission_reports x)
+        (Reader.create fd)
+        (Writer.create fd)
+      >>= fun con ->
+      let close_connection exn =
+        t.con <- `Disconnected;
+        Tail.extend t.messages (C.Error (Error.of_exn exn));
+        Ib.Connection.close con
+      in
+      Monitor.try_with ~name:"try connect" (fun () ->
+        let module H = Ib.Connection.Handshake_result in
+        Ib.Connection.try_connect con
+          ~client_version:t.client_version
+          ~client_id:t.client_id
+        >>| function
+        | Error e -> Error.raise e
+        | Ok handshake_result ->
+          begin match handshake_result with
+          | H.Eof ->
+            raise Eof_from_client
+          | H.Version_failure version ->
+            raise (Server_version_too_small (version, `Min Config.server_version))
+          | H.Server_header (`Version version, conn_time, account_code) ->
+            t.con <- `Connected con;
+            t.server_version  <- Some version;
+            t.connection_time <- Some conn_time;
+            t.account_code    <- Some account_code;
+            Tail.extend t.messages (C.Control (
+              E.Connected (`Server_version version, conn_time)));
+          end)
+      >>= function
+      | Error exn -> close_connection (Monitor.extract_exn exn)
+      | Ok () -> return ()
+
+let messages t = Tail.collect t.messages
+let execution_reports  t = Tail.collect t.execution_reports
+let commission_reports t = Tail.collect t.commission_reports
+
+let client_id       t = t.client_id
+let server_version  t = t.server_version
+let connection_time t = t.connection_time
+let account_code    t = t.account_code
+
+let is_connected t = match t.con with
+  | `Disconnected
+  | `Connecting _ -> false
+  | `Connected _  -> true
+
+let state t = match t.con with
+  | `Disconnected -> `Disconnected
+  | `Connecting _ -> `Connecting
+  | `Connected  _ -> `Connected
+
+let disconnect t =
+  let module C = Client_msg in
+  let module E = Client_msg.Control in
+  match t.con with
+  | `Disconnected -> return ()
+  | `Connecting close -> return (close ())
+  | `Connected con ->
+    t.con <- `Disconnected;
+    Tail.extend t.messages (C.Control E.Disconnected);
+    Ib.Connection.close con
+
+let with_client
+    ?enable_logging
+    ?client_id
+    ~host
+    ~port
+    ~on_handler_error
+    handler =
+  let module C = Client_msg in
+  let handle_error e =
+    match on_handler_error with
+    | `Ignore -> ()
+    | `Raise  -> Error.raise e
+    | `Call f -> f e
+  in
+  create ?enable_logging ?client_id ~host ~port ()
+  >>= fun t ->
+  Stream.iter (messages t) ~f:(fun clt_msg ->
+    begin
+      match clt_msg, t.enable_logging with
+      | C.Control x, true ->
+        Log.Global.sexp ~level:`Info x <:sexp_of< C.Control.t >>
+      | C.Status x, true ->
+        Log.Global.sexp ~level:`Info x <:sexp_of< string >>
+      | C.Error e, true ->
+        Log.Global.sexp ~level:`Error e <:sexp_of< Error.t >>;
+        handle_error e
+      | C.Error e, false ->
+        handle_error e
+      | _ -> ()
+    end);
+  connect t >>= fun () ->
+  match state t with
+  | `Connected ->
+    Monitor.try_with (fun () -> handler t) >>= (function
+    | Error exn ->
+      disconnect t
+      >>| fun () ->
+      handle_error (Error.of_exn (Monitor.extract_exn exn));
+    | Ok () ->
+      disconnect t)
+  | _ -> return ()
+
+(* +-----------------------------------------------------------------------+
+   | Helper functions                                                      |
+   +-----------------------------------------------------------------------+ *)
+
+let with_connection t ~f = match t.con with
+  | `Disconnected
+  | `Connecting _  -> return (Or_error.of_exn Not_connected_yet)
+  | `Connected con -> f con
+
+let with_connection_unit t ~f = match t.con with
+  | `Disconnected
+  | `Connecting _  -> ()
+  | `Connected con -> f con
+
+let dispatch_and_cancel req con query =
+  Ib.Streaming_request.dispatch req con query
+  >>= function
+  | Error _ as e -> return e
+  | Ok (reader, id) ->
+    Pipe.read_at_most reader ~num_values:1
+    >>| fun read_result ->
+    Exn.protectx read_result ~f:(function
+    | `Eof -> Or_error.of_exn Eof_from_client
+    | `Ok result -> Ok (Queue.dequeue_exn result)
+    ) ~finally:(fun _ -> Ib.Streaming_request.cancel req con id)
+
+(* +-----------------------------------------------------------------------+
+   | Connection and server                                                 |
+   +-----------------------------------------------------------------------+ *)
+
+let set_server_log_level t ~level = match t.con with
+  | `Disconnected
+  | `Connecting _  -> ()
+  | `Connected con -> Ib.Connection.set_server_log_level con ~level
 
 let server_time t =
-  let q = Query.Server_time.create () in
-  dispatch_request t Tws_reqs.req_server_time q
+  with_connection t ~f:(fun con ->
+    let q = Query.Server_time.create () in
+    Ib.Request.dispatch Tws_reqs.req_server_time con q
+  )
 
 let server_time_exn t = server_time t >>| Or_error.ok_exn
 
-(* Market data *)
+(* +-----------------------------------------------------------------------+
+   | Market data                                                           |
+   +-----------------------------------------------------------------------+ *)
 
 module Market_data = struct
   type t =
 end
 
 let market_data ?(snapshot = false) ?(tick_generics = []) t ~contract =
-  let q = Query.Market_data.create ~contract ~tick_generics ~snapshot in
-  dispatch_streaming_request t Tws_reqs.req_market_data q
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_data.create ~contract ~tick_generics ~snapshot in
+    Ib.Streaming_request.dispatch Tws_reqs.req_market_data con q
+  )
 
 let market_data_exn ?snapshot ?tick_generics t ~contract =
   market_data ?snapshot ?tick_generics t ~contract >>| function
   | Error e -> raise (Error.to_exn e)
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
-let cancel_market_data t id = cancel_streaming_request t Tws_reqs.req_market_data id
+let cancel_market_data t id =
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_market_data con id
+  )
 
 let option_price t ~contract ~volatility ~underlying_price =
-  let q = Query.Option_price.create ~contract ~volatility ~underlying_price in
-  dispatch_and_cancel t Tws_reqs.req_option_price q >>| function
-  | Error _ as x -> x
-  | Ok result ->
-    match result with
-    | Error tws_error -> Error (Tws_error.to_error tws_error)
-    | Ok None -> Error (Error.of_string "missing option price")
-    | Ok (Some opt_price) -> Ok opt_price
+  with_connection t ~f:(fun con ->
+    let q = Query.Option_price.create
+      ~contract
+      ~volatility
+      ~underlying_price
+    in
+    dispatch_and_cancel Tws_reqs.req_option_price con q >>| function
+    | Error _ as x -> x
+    | Ok result ->
+      match result with
+      | Error tws_error -> Error (Tws_error.to_error tws_error)
+      | Ok None -> Error (Error.of_string "missing option price")
+      | Ok (Some opt_price) -> Ok opt_price
+  )
 
 let option_price_exn t ~contract ~volatility ~underlying_price =
   option_price t ~contract ~volatility ~underlying_price >>| Or_error.ok_exn
 
 let implied_volatility t ~contract ~option_price ~underlying_price =
-  let q = Query.Implied_volatility.create ~contract ~option_price ~underlying_price in
-  dispatch_and_cancel t Tws_reqs.req_implied_volatility q >>| function
-  | Error _ as x -> x
-  | Ok result ->
-    match result with
-    | Error tws_error -> Error (Tws_error.to_error tws_error)
-    | Ok None -> Error (Error.of_string "missing implied volatility")
-    | Ok (Some implied_vol) -> Ok implied_vol
+  with_connection t ~f:(fun con ->
+    let q = Query.Implied_volatility.create
+      ~contract
+      ~option_price
+      ~underlying_price
+    in
+    dispatch_and_cancel Tws_reqs.req_implied_volatility con q >>| function
+    | Error _ as x -> x
+    | Ok result ->
+      match result with
+      | Error tws_error -> Error (Tws_error.to_error tws_error)
+      | Ok None -> Error (Error.of_string "missing implied volatility")
+      | Ok (Some implied_vol) -> Ok implied_vol
+  )
 
 let implied_volatility_exn t ~contract ~option_price ~underlying_price =
   implied_volatility t ~contract ~option_price ~underlying_price
   >>| Or_error.ok_exn
 
-(* Orders *)
+(* +-----------------------------------------------------------------------+
+   | Orders                                                                |
+   +-----------------------------------------------------------------------+ *)
 
 let dedup_adjacents ~equal pipe_r =
   let last = ref None in
     | Some y -> if equal x y then None else begin last := Some x; !last end)
 
 let submit_order t ~contract ~order =
-  let q = Query.Submit_order.create
-    ~contract
-    ~order
+  with_connection t ~f:(fun con ->
+    let q = Query.Submit_order.create
+      ~contract
+      ~order
     (* Note: [account_code t] won't return [None] since we
        can only call it in a handler of [Tws.with_client]. *)
-    ~account_code:(Option.value_exn (account_code t))
-  in
-  dispatch_streaming_request t Tws_reqs.req_submit_order q >>| function
-  | Error _ as e -> e
-  | Ok (pipe_r, id) ->
-    let equal t1 t2 = match t1, t2 with
-      | Error e1, Error e2 -> Tws_error.(=) e1 e2
-      | Ok x1, Ok x2 -> Order_status.(=) x1 x2
-      | _, _ -> false
+      ~account_code:(Option.value_exn (account_code t))
     in
-    let oid = Order_id.of_int_exn (Query_id.to_int_exn id) in
-    Ok (dedup_adjacents ~equal pipe_r, oid)
+    Ib.Streaming_request.dispatch Tws_reqs.req_submit_order con q >>| function
+    | Error _ as e -> e
+    | Ok (pipe_r, id) ->
+      let equal t1 t2 = match t1, t2 with
+        | Error e1, Error e2 -> Tws_error.(=) e1 e2
+        | Ok x1, Ok x2 -> Order_status.(=) x1 x2
+        | _, _ -> false
+      in
+      let oid = Order_id.of_int_exn (Query_id.to_int_exn id) in
+      Ok (dedup_adjacents ~equal pipe_r, oid)
+  )
 
 let submit_order_exn t ~contract ~order =
   submit_order t ~contract ~order >>| function
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
 let cancel_order_status t oid =
-  let id = Query_id.of_int_exn (Order_id.to_int_exn oid) in
-  cancel_streaming_request t Tws_reqs.req_submit_order id
+  with_connection_unit t ~f:(fun con ->
+    let id = Query_id.of_int_exn (Order_id.to_int_exn oid) in
+    Ib.Streaming_request.cancel Tws_reqs.req_submit_order con id
+  )
 
-(* Account and portfolio *)
+(* +-----------------------------------------------------------------------+
+   | Account and portfolio                                                 |
+   +-----------------------------------------------------------------------+ *)
 
 let account_and_portfolio_updates t =
-  let account_code = Option.value_exn (account_code t) in
-  let create_query = Query.Account_and_portfolio_updates.create ~account_code in
-  let subscribe = create_query ~subscribe:true in
-  dispatch_streaming_request' t
-    Tws_reqs.req_account_and_portfolio_updates
-    subscribe >>| function
-  | Error _ as e -> e
-  | Ok pipe_r ->
-    let pipe_r = Pipe.filter_map pipe_r ~f:(function
-      | `Account_update _
-      | `Portfolio_update _ as x -> Some x
-      | `Account_update_end code ->
-        if Account_code.(=) account_code code then begin
-          cancel_streaming_request' t
-            Tws_reqs.req_account_and_portfolio_updates
-        end;
-        None)
-    in
-    Ok pipe_r
+  with_connection t ~f:(fun con ->
+    let account_code = Option.value_exn (account_code t) in
+    let create_query = Query.Account_and_portfolio_updates.create ~account_code in
+    let subscribe = create_query ~subscribe:true in
+    Ib.Streaming_request_without_id.dispatch
+      Tws_reqs.req_account_and_portfolio_updates con subscribe
+    >>| function
+    | Error _ as e -> e
+    | Ok pipe_r ->
+      let pipe_r = Pipe.filter_map pipe_r ~f:(function
+        | `Account_update _
+        | `Portfolio_update _ as x -> Some x
+        | `Account_update_end code ->
+          if Account_code.(=) account_code code then begin
+            Ib.Streaming_request_without_id.cancel
+              Tws_reqs.req_account_and_portfolio_updates
+              con
+          end;
+          None)
+      in
+      Ok pipe_r
+  )
 
 let account_and_portfolio_updates_exn t =
   account_and_portfolio_updates t
   >>| Or_error.ok_exn
 
-(* Executions *)
+(* +-----------------------------------------------------------------------+
+   | Executions                                                            |
+   +-----------------------------------------------------------------------+ *)
 
 let filter_executions ?time t ~contract ~order_action =
-  let q = Query.Execution_reports.create
-    ~contract
-    ~client_id:(client_id t)
-    (* Note: [account_code t] won't return [None] since we
-       can only call it in a handler of [Tws.with_client]. *)
-    ~account_code:(Option.value_exn (account_code t))
-    ~time:(Option.value time ~default:(Time.sub (Time.now ()) Time.Span.day))
-    ~order_action
-  in
-  dispatch_streaming_request t Tws_reqs.req_execution_reports q >>| function
-  | Error _ as e -> e
-  | Ok (pipe_r, id) ->
-    let pipe_r = Pipe.filter_map pipe_r ~f:(function
-      | Error _ as x -> Some x
-      | Ok `Execution_report data -> Some (Ok data)
-      | Ok `Execution_report_end  ->
-        cancel_streaming_request t Tws_reqs.req_execution_reports id; None)
+  with_connection t ~f:(fun con ->
+    let q = Query.Execution_reports.create
+      ~contract
+      ~client_id:(client_id t)
+      (* Note: [account_code t] won't return [None] since we
+         can only call it in a handler of [Tws.with_client]. *)
+      ~account_code:(Option.value_exn (account_code t))
+      ~time:(Option.value time ~default:(Time.sub (Time.now ()) Time.Span.day))
+      ~order_action
     in
-    Ok pipe_r
+    Ib.Streaming_request.dispatch Tws_reqs.req_execution_reports con q
+    >>| function
+    | Error _ as e -> e
+    | Ok (pipe_r, id) ->
+      let pipe_r = Pipe.filter_map pipe_r ~f:(function
+        | Error _ as x ->
+          Some x
+        | Ok `Execution_report data ->
+          Some (Ok data)
+        | Ok `Execution_report_end  ->
+          Ib.Streaming_request.cancel Tws_reqs.req_execution_reports con id;
+          None)
+      in
+      Ok pipe_r
+  )
 
 let filter_executions_exn ?time t ~contract ~order_action =
   filter_executions ?time t ~contract ~order_action >>| function
   | Error e -> raise (Error.to_exn e)
   | Ok pipe_r -> Pipe.map pipe_r ~f:Tws_result.ok_exn
 
-(* Contract specs *)
+(* +-----------------------------------------------------------------------+
+   | Contract specs                                                        |
+   +-----------------------------------------------------------------------+ *)
 
 let contract_specs t ~contract =
-  let q = Query.Contract_specs.create ~contract in
-  dispatch_and_cancel t Tws_reqs.req_contract_specs q
+  with_connection t ~f:(fun con ->
+    let q = Query.Contract_specs.create ~contract in
+    dispatch_and_cancel Tws_reqs.req_contract_specs con q
+  )
 
 let contract_specs_exn t ~contract =
   contract_specs t ~contract >>| function
   | Error e -> raise (Error.to_exn e)
   | Ok result -> Tws_result.ok_exn result
 
-(* Market depth *)
+(* +-----------------------------------------------------------------------+
+   | Market depth                                                          |
+   +-----------------------------------------------------------------------+ *)
 
 let market_depth ?(num_rows = 10) t ~contract =
-  let q = Query.Market_depth.create ~contract ~num_rows in
-  dispatch_streaming_request t Tws_reqs.req_market_depth q
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_depth.create ~contract ~num_rows in
+    Ib.Streaming_request.dispatch Tws_reqs.req_market_depth con q
+  )
 
 let market_depth_exn ?num_rows t ~contract =
   market_depth ?num_rows t ~contract >>| function
   | Error e -> raise (Error.to_exn e)
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
-let cancel_market_depth t id = cancel_streaming_request t Tws_reqs.req_market_depth id
+let cancel_market_depth t id =
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_market_depth con id
+  )
 
-(* Historical data *)
+(* +-----------------------------------------------------------------------+
+   | Historical data                                                       |
+   +-----------------------------------------------------------------------+ *)
 
 let historical_data
     ?(bar_size = `One_hour)
     ?(show = `Trades)
     ?(until = Time.now ())
     t ~contract =
-  let q = Query.Historical_data.create ~contract ~end_date_time:until
-    ~bar_size ~duration ~use_rth ~show in
-  dispatch_streaming_request t Tws_reqs.req_historical_data q
+  with_connection t ~f:(fun con ->
+    let q = Query.Historical_data.create ~contract ~end_date_time:until
+      ~bar_size ~duration ~use_rth ~show in
+    Ib.Streaming_request.dispatch Tws_reqs.req_historical_data con q
+  )
 
 let cancel_historical_data t id =
-  cancel_streaming_request t Tws_reqs.req_historical_data id
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_historical_data con id
+  )
 
-(* Realtime bars *)
+(* +-----------------------------------------------------------------------+
+   | Realtime bars                                                         |
+   +-----------------------------------------------------------------------+ *)
 
 let realtime_bars
     ?(bar_size = `Five_secs)
     ?(show = `Trades)
     ?(use_rth = true)
     t ~contract =
-  let q = Query.Realtime_bars.create ~contract ~bar_size ~show ~use_rth in
-  dispatch_streaming_request t Tws_reqs.req_realtime_bars q
+  with_connection t ~f:(fun con ->
+    let q = Query.Realtime_bars.create ~contract ~bar_size ~show ~use_rth in
+    Ib.Streaming_request.dispatch Tws_reqs.req_realtime_bars con q
+  )
 
 let realtime_bars_exn ?bar_size ?show ?use_rth t ~contract =
   realtime_bars ?bar_size ?show ?use_rth t ~contract >>| function
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
 let cancel_realtime_bars t id =
-  cancel_streaming_request t Tws_reqs.req_realtime_bars id
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_realtime_bars con id
+  )
 
-(* Trades *)
+(* +-----------------------------------------------------------------------+
+   | Trades                                                                |
+   +-----------------------------------------------------------------------+ *)
 
 module Trade = struct
   type t =
 end
 
 let trades t ~contract =
-  let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:false in
-  dispatch_streaming_request t Tws_reqs.req_taq_data q >>| function
-  | Error _ as x -> x
-  | Ok (ticks, id) ->
-    let filter_trade = unstage (Trade.make_filter ()) in
-    let trades = Pipe.filter_map ticks ~f:(function
-      | Error _ as x -> Some x
-      | Ok tick ->
-        match filter_trade tick with
-        | None -> None
-        | Some trade -> Some (Ok trade))
-    in
-    Ok (trades, id)
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:false in
+    Ib.Streaming_request.dispatch Tws_reqs.req_taq_data con q >>| function
+    | Error _ as x -> x
+    | Ok (ticks, id) ->
+      let filter_trade = unstage (Trade.make_filter ()) in
+      let trades = Pipe.filter_map ticks ~f:(function
+        | Error _ as x -> Some x
+        | Ok tick ->
+          match filter_trade tick with
+          | None -> None
+          | Some trade -> Some (Ok trade))
+      in
+      Ok (trades, id)
+  )
 
 let trades_exn t ~contract =
   trades t ~contract >>| function
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
 let cancel_trades t id =
-  cancel_streaming_request t Tws_reqs.req_taq_data id
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_taq_data con id
+  )
 
-(* Quotes *)
+(* +-----------------------------------------------------------------------+
+   | Quotes                                                                |
+   +-----------------------------------------------------------------------+ *)
 
 module Quote = struct
   type t =
 end
 
 let quotes t ~contract =
-  let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:false in
-  dispatch_streaming_request t Tws_reqs.req_taq_data q >>| function
-  | Error _ as x -> x
-  | Ok (ticks, id) ->
-    let filter_quote = unstage (Quote.make_filter ()) in
-    let quotes = Pipe.filter_map ticks ~f:(function
-      | Error _ as x -> Some x
-      | Ok tick ->
-        match filter_quote tick with
-        | None -> None
-        | Some quote -> Some (Ok quote))
-    in
-    Ok (quotes, id)
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:false in
+    Ib.Streaming_request.dispatch Tws_reqs.req_taq_data con q >>| function
+    | Error _ as x -> x
+    | Ok (ticks, id) ->
+      let filter_quote = unstage (Quote.make_filter ()) in
+      let quotes = Pipe.filter_map ticks ~f:(function
+        | Error _ as x -> Some x
+        | Ok tick ->
+          match filter_quote tick with
+          | None -> None
+          | Some quote -> Some (Ok quote))
+      in
+      Ok (quotes, id)
+  )
 
 let quotes_exn t ~contract =
   quotes t ~contract >>| function
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
 let cancel_quotes t id =
-  cancel_streaming_request t Tws_reqs.req_taq_data id
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_taq_data con id
+  )
 
-(* TAQ data *)
+(* +-----------------------------------------------------------------------+
+   | TAQ data                                                              |
+   +-----------------------------------------------------------------------+ *)
 
 module TAQ = struct
   type t =
 end
 
 let taq_data t ~contract =
-  let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:false in
-  dispatch_streaming_request t Tws_reqs.req_taq_data q >>| function
-  | Error _ as x -> x
-  | Ok (ticks, id) ->
-    let filter_quote = unstage (Quote.make_filter ()) in
-    let filter_trade = unstage (Trade.make_filter ()) in
-    let taq_records = Pipe.filter_map ticks ~f:(function
-      | Error _ as x -> Some x
-      | Ok tick ->
-        match filter_quote tick with
-        | Some quote -> Some (Ok (TAQ.quote quote))
-        | None ->
-          begin
-            match filter_trade tick with
-            | Some trade -> Some (Ok (TAQ.trade trade))
-            | None -> None
-          end)
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_data.create
+      ~contract
+      ~tick_generics:[]
+      ~snapshot:false
     in
-    Ok (taq_records, id)
+    Ib.Streaming_request.dispatch Tws_reqs.req_taq_data con q >>| function
+    | Error _ as x -> x
+    | Ok (ticks, id) ->
+      let filter_quote = unstage (Quote.make_filter ()) in
+      let filter_trade = unstage (Trade.make_filter ()) in
+      let taq_records = Pipe.filter_map ticks ~f:(function
+        | Error _ as x -> Some x
+        | Ok tick ->
+          match filter_quote tick with
+          | Some quote -> Some (Ok (TAQ.quote quote))
+          | None ->
+            begin
+              match filter_trade tick with
+              | Some trade -> Some (Ok (TAQ.trade trade))
+              | None -> None
+            end)
+      in
+      Ok (taq_records, id)
+  )
 
 let taq_data_exn t ~contract =
   taq_data t ~contract >>| function
   | Ok (pipe_r, id) -> Pipe.map pipe_r ~f:Tws_result.ok_exn, id
 
 let cancel_taq_data t id =
-  cancel_streaming_request t Tws_reqs.req_taq_data id
+  with_connection_unit t ~f:(fun con ->
+    Ib.Streaming_request.cancel Tws_reqs.req_taq_data con id
+  )
 
-(* TAQ snapshots *)
+(* +-----------------------------------------------------------------------+
+   | TAQ snapshots                                                         |
+   +-----------------------------------------------------------------------+ *)
 
 module Quote_snapshot = struct
   type t =
       bid_price = Price.zero;
     }
   in
-  let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:true in
-  dispatch_streaming_request t Tws_reqs.req_taq_snapshot q
-  >>= function
-  | Error _ as x -> return x
-  | Ok (ticks, id) ->
-    let module Type = Tick_price.Type in
-    let num_ticks = 2 in
-    Monitor.try_with (fun () ->
-      Pipe.fold ticks ~init:(0, quote) ~f:(fun (counter, quote) result ->
-        match result with
-        | Error tws_error ->
-          cancel_streaming_request t Tws_reqs.req_taq_snapshot id;
-          Pipe.close_read ticks;
-          raise (Tws_error.to_exn tws_error)
-        | Ok tick ->
-          let counter =
-            match Tick_price.tick_type tick with
-            | Type.Ask ->
-              quote.Quote_snapshot.ask_size  <- Tick_price.size tick;
-              quote.Quote_snapshot.ask_price <- Tick_price.price tick;
-              counter + 1
-            | Type.Bid ->
-              quote.Quote_snapshot.bid_size  <- Tick_price.size tick;
-              quote.Quote_snapshot.bid_price <- Tick_price.price tick;
-              counter + 1
-            | Type.Last | Type.Low | Type.High | Type.Close -> counter
-          in
-          if counter = num_ticks then begin
-            cancel_streaming_request t Tws_reqs.req_taq_snapshot id;
-            Pipe.close_read ticks
-          end;
-          return (counter, quote)))
+  with_connection t ~f:(fun con ->
+    let q = Query.Market_data.create
+      ~contract
+      ~tick_generics:[]
+      ~snapshot:true
+    in
+    Ib.Streaming_request.dispatch Tws_reqs.req_taq_snapshot con q
+    >>= function
+    | Error _ as x -> return x
+    | Ok (ticks, id) ->
+      let module Type = Tick_price.Type in
+      let num_ticks = 2 in
+      Monitor.try_with (fun () ->
+        Pipe.fold ticks ~init:(0, quote) ~f:(fun (counter, quote) result ->
+          match result with
+          | Error tws_error ->
+            Ib.Streaming_request.cancel Tws_reqs.req_taq_snapshot con id;
+            Pipe.close_read ticks;
+            raise (Tws_error.to_exn tws_error)
+          | Ok tick ->
+            let counter =
+              match Tick_price.tick_type tick with
+              | Type.Ask ->
+                quote.Quote_snapshot.ask_size  <- Tick_price.size tick;
+                quote.Quote_snapshot.ask_price <- Tick_price.price tick;
+                counter + 1
+              | Type.Bid ->
+                quote.Quote_snapshot.bid_size  <- Tick_price.size tick;
+                quote.Quote_snapshot.bid_price <- Tick_price.price tick;
+                counter + 1
+              | Type.Last | Type.Low | Type.High | Type.Close -> counter
+            in
+            if counter = num_ticks then begin
+              Ib.Streaming_request.cancel Tws_reqs.req_taq_snapshot con id;
+              Pipe.close_read ticks
+            end;
+            return (counter, quote)))
       >>| function
       | Error exn -> Or_error.of_exn (Monitor.extract_exn exn)
       | Ok (_counter, quote) -> Ok quote
+  )
 
 let quote_snapshot_exn t ~contract =
   quote_snapshot t ~contract >>| Or_error.ok_exn
 end
 
 let trade_snapshot t ~contract =
-  let trade =
-    { Trade_snapshot.
-      symbol = Contract.symbol contract;
-      last_size  = 0;
-      last_price = Price.zero;
-    }
-  in
-  let q = Query.Market_data.create ~contract ~tick_generics:[] ~snapshot:true in
-  dispatch_streaming_request t Tws_reqs.req_taq_snapshot q
-  >>= function
-  | Error _ as x -> return x
-  | Ok (ticks, id) ->
-    let module Type = Tick_price.Type in
-    let num_ticks = 1 in
-    Monitor.try_with (fun () ->
-      Pipe.fold ticks ~init:(0, trade) ~f:(fun (counter, trade) result ->
-        match result with
-        | Error tws_error ->
-          cancel_streaming_request t Tws_reqs.req_taq_snapshot id;
-          Pipe.close_read ticks;
-          raise (Tws_error.to_exn tws_error)
-        | Ok tick ->
-          let counter =
-            match Tick_price.tick_type tick with
-            | Type.Last ->
-              trade.Trade_snapshot.last_size  <- Tick_price.size  tick;
-              trade.Trade_snapshot.last_price <- Tick_price.price tick;
-              counter + 1
-            | Type.Ask | Type.Bid | Type.Low | Type.High | Type.Close -> counter
-          in
-          if counter = num_ticks then begin
-            cancel_streaming_request t Tws_reqs.req_taq_snapshot id;
-            Pipe.close_read ticks
-          end;
-          return (counter, trade)))
+  with_connection t ~f:(fun con ->
+    let trade =
+      { Trade_snapshot.
+        symbol = Contract.symbol contract;
+        last_size  = 0;
+        last_price = Price.zero;
+      }
+    in
+    let q = Query.Market_data.create
+      ~contract
+      ~tick_generics:[]
+      ~snapshot:true
+    in
+    Ib.Streaming_request.dispatch Tws_reqs.req_taq_snapshot con q
+    >>= function
+    | Error _ as x -> return x
+    | Ok (ticks, id) ->
+      let module Type = Tick_price.Type in
+      let num_ticks = 1 in
+      Monitor.try_with (fun () ->
+        Pipe.fold ticks ~init:(0, trade) ~f:(fun (counter, trade) result ->
+          match result with
+          | Error tws_error ->
+            Ib.Streaming_request.cancel Tws_reqs.req_taq_snapshot con id;
+            Pipe.close_read ticks;
+            raise (Tws_error.to_exn tws_error)
+          | Ok tick ->
+            let counter =
+              match Tick_price.tick_type tick with
+              | Type.Last ->
+                trade.Trade_snapshot.last_size  <- Tick_price.size  tick;
+                trade.Trade_snapshot.last_price <- Tick_price.price tick;
+                counter + 1
+              | Type.Ask | Type.Bid | Type.Low | Type.High | Type.Close -> counter
+            in
+            if counter = num_ticks then begin
+              Ib.Streaming_request.cancel Tws_reqs.req_taq_snapshot con id;
+              Pipe.close_read ticks
+            end;
+            return (counter, trade)))
       >>| function
       | Error exn -> Or_error.of_exn (Monitor.extract_exn exn)
       | Ok (_counter, trade) -> Ok trade
+  )
 
 let trade_snapshot_exn t ~contract =
   trade_snapshot t ~contract >>| Or_error.ok_exn
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *)
 
-(** A TWS client *)
-
 open Core.Std
 open Async.Std
 open Std_internal
 open Response
 
-type t
-include Client_intf.S with type t := t
+module Query_id : Unique_id
+
+type t (** A TWS client *)
 
 (** {1 Connection and server} *)
 (******************************************************************************)
 
+(** [with_client ~host ~port ~on_handler_error handler] connects to the
+    IB connectivity software on ([host], [port]) and runs the [handler]
+    until an exception is thrown or the returned Deferred is determined.
+
+    [on_handler_error] determines what happens if the [handler] throws an
+    exception.
+
+    The standard port for TWS is 7496 and for the IB Gateway it is 4001.
+*)
+val with_client
+  :  ?enable_logging:bool
+  -> ?client_id:Client_id.t
+  -> host:string
+  -> port:int
+  -> on_handler_error:[
+  | `Raise
+  | `Ignore
+  | `Call of (Error.t -> unit)
+  ]
+  -> (t -> unit Deferred.t)
+  -> unit Deferred.t
+
+val is_connected : t -> bool
+
+(** [state t] returns the state of the connection. *)
+val state : t -> [ `Disconnected | `Connecting | `Connected ]
+
+(** [set_server_log_level level] sets the log entry detail [level] of TWS
+    when processing API requests. *)
+val set_server_log_level
+  :  t
+  -> level:[
+  | `System
+  | `Error
+  | `Warning
+  | `Information
+  | `Detail
+  ]
+  -> unit
+
 val server_time : t -> Time.t Or_error.t Deferred.t
 
 val server_time_exn : t -> Time.t Deferred.t
 
+val server_version : t -> int option
+
+val connection_time : t -> Time.t option
+
 
 (** {1 Market data} *)
 (******************************************************************************)
 
 val cancel_order_status : t -> Order_id.t -> unit
 
+
 (** {1 Account and portfolio} *)
 (******************************************************************************)
 
       | `Portfolio_update of Portfolio_update.t
       ] Pipe.Reader.t) Deferred.t
 
+val commission_reports : t -> Commission_report.t Stream.t
+
+
 (** {1 Execution reports} *)
 (******************************************************************************)
 
+val execution_reports : t -> Execution_report.t Stream.t
+
 val filter_executions
   :  ?time:Time.t
   -> t
 (* setup.ml generated for the first time by OASIS v0.2.0 *)
 
 (* OASIS_START *)
-(* DO NOT EDIT (digest: e091f435c102d84836b9ed5166abda24) *)
+(* DO NOT EDIT (digest: 4cd7fa68e8658faeadfe5421524a5466) *)
 (*
    Regenerated by OASIS v0.3.0
    Visit http://oasis.forge.ocamlcore.org for more information and
                            "Raw_order";
                            "Order";
                            "Ib";
-                           "Client";
                            "Tws_reqs";
                            "Tws_result";
                            "Tws";
                            "Client_id";
-                           "Client_intf";
                            "Std"
                         ];
                       lib_pack = true;
                       });
                Executable
                  ({
-                     cs_name = "print-option-data";
-                     cs_data = PropList.Data.create ();
-                     cs_plugin_data = [];
-                     },
-                   {
-                      bs_build =
-                        [
-                           (OASISExpr.EBool true, false);
-                           (OASISExpr.EFlag "examples", true)
-                        ];
-                      bs_install = [(OASISExpr.EBool true, false)];
-                      bs_path = "examples";
-                      bs_compiled_object = Best;
-                      bs_build_depends = [InternalLibrary "ibx"];
-                      bs_build_tools =
-                        [ExternalTool "ocamlbuild"; ExternalTool "ocamldoc"];
-                      bs_c_sources = [];
-                      bs_data_files = [];
-                      bs_ccopt = [(OASISExpr.EBool true, [])];
-                      bs_cclib = [(OASISExpr.EBool true, [])];
-                      bs_dlllib = [(OASISExpr.EBool true, [])];
-                      bs_dllpath = [(OASISExpr.EBool true, [])];
-                      bs_byteopt = [(OASISExpr.EBool true, [])];
-                      bs_nativeopt = [(OASISExpr.EBool true, [])];
-                      },
-                   {
-                      exec_custom = false;
-                      exec_main_is = "print_option_data.ml";
-                      });
-               Executable
-                 ({
                      cs_name = "quote-table";
                      cs_data = PropList.Data.create ();
                      cs_plugin_data = [];
           };
      oasis_fn = Some "_oasis";
      oasis_version = "0.3.0";
-     oasis_digest = Some ",\211\252\247\204+s\155[\253\229\140\145\157\252U";
+     oasis_digest = Some "\242[b\220p\220\142w1\186\"u\205_15";
      oasis_exec = None;
      oasis_setup_args = [];
      setup_update = false;
 
 let setup () = BaseSetup.setup setup_t;;
 
-# 6350 "setup.ml"
+# 6317 "setup.ml"
 (* OASIS_STOP *)
 let () = setup ();;