Commits

Oliver Gu  committed f2a7c94

Execution and commission reports can be accessed via pipes

  • Participants
  • Parent commits 6fc96f8

Comments (0)

Files changed (3)

File examples/buy_market_order.ml

     (Symbol.of_string "AAPL")
   in
   Common.with_tws_client (fun tws ->
-    Stream.iter (Tws.executions tws) ~f:(fun execution ->
-      printf "%s\n\n%!"
-        (Sexp.to_string_hum (Execution.sexp_of_t execution)));
-    Stream.iter (Tws.commissions tws) ~f:(fun commission ->
-      printf "%s\n\n%!"
-        (Sexp.to_string_hum (Commission.sexp_of_t commission)));
+    don't_wait_for (
+      Pipe.iter_without_pushback (Tws.executions tws) ~f:(fun exec ->
+        printf "%s\n\n%!" (Sexp.to_string_hum (Execution.sexp_of_t exec)))
+    );
+    don't_wait_for (
+      Pipe.iter_without_pushback (Tws.commissions tws) ~f:(fun comm ->
+        printf "%s\n\n%!" (Sexp.to_string_hum (Commission.sexp_of_t comm)))
+    );
     let buy_market = Order.buy_market ~quantity:100 in
     Tws.submit_order_exn tws
       ~contract:aapl
     mutable connection_time : Time.t option;
     mutable account_code    : Account_code.t option;
     messages    : Client_msg.t Tail.t;
-    executions  : Execution.t Tail.t;
-    commissions : Commission.t Tail.t;
+    exec_reader : Execution.t  Pipe.Reader.t;
+    exec_writer : Execution.t  Pipe.Writer.t;
+    comm_reader : Commission.t Pipe.Reader.t;
+    comm_writer : Commission.t Pipe.Writer.t;
   }
 
 let create
     ?(enable_logging = false)
-    ?(client_id      = Client_id.of_int_exn 0)
-    ~host
-    ~port
-    () =
+    ?(client_id = Client_id.of_int_exn 0)
+    ~host ~port () =
+  let exec_reader, exec_writer = Pipe.create () in
+  let comm_reader, comm_writer = Pipe.create () in
   return
     { client_id;
       enable_logging;
       connection_time = None;
       account_code    = None;
       messages        = Tail.create ();
-      executions      = Tail.create ();
-      commissions     = Tail.create ();
+      exec_reader;
+      exec_writer;
+      comm_reader;
+      comm_writer;
     }
 
 exception Not_connected_yet with sexp
           Tail.extend t.messages (C.Error e))
         ~extend_status:(fun s ->
           Tail.extend t.messages (C.Status s))
-        ~extend_execution:(fun x ->
-          Tail.extend t.executions x)
-        ~extend_commission:(fun x ->
-          Tail.extend t.commissions x)
+        ~extend_execution:(fun exec ->
+          don't_wait_for (Pipe.write t.exec_writer exec))
+        ~extend_commission:(fun comm ->
+          don't_wait_for (Pipe.write t.comm_writer comm))
         (Reader.create fd)
         (Writer.create fd)
       >>= fun con ->
       let close_connection exn =
         t.con <- `Disconnected;
         Tail.extend t.messages (C.Error (Error.of_exn exn));
+        Tail.close_exn t.messages;
+        Pipe.close t.exec_writer;
+        Pipe.close t.comm_writer;
         Ib.Connection.close con
       in
       Monitor.try_with ~name:"try connect" (fun () ->
       | Error exn -> close_connection (Monitor.extract_exn exn)
       | Ok () -> return ()
 
-let messages    t = Tail.collect t.messages
-let executions  t = Tail.collect t.executions
-let commissions t = Tail.collect t.commissions
-
 let client_id       t = t.client_id
 let server_version  t = t.server_version
 let connection_time t = t.connection_time
 let account_code    t = t.account_code
+let messages        t = Tail.collect t.messages
+let executions      t = t.exec_reader
+let commissions     t = t.comm_reader
 
 let is_connected t = match t.con with
   | `Disconnected
   | `Connected con ->
     t.con <- `Disconnected;
     Tail.extend t.messages (C.Control E.Disconnected);
+    Tail.close_exn t.messages;
+    Pipe.close t.exec_writer;
+    Pipe.close t.comm_writer;
     Ib.Connection.close con
 
 let with_client
       | `Portfolio_update of Portfolio_update.t
       ] Pipe.Reader.t) Deferred.t
 
-val commissions : t -> Commission.t Stream.t
+val commissions : t -> Commission.t Pipe.Reader.t
 
 
 (** {1 Execution reports} *)
 (******************************************************************************)
 
-val executions : t -> Execution.t Stream.t
+val executions : t -> Execution.t Pipe.Reader.t
 
 val filter_executions
   :  ?time:Time.t