Commits

Oliver Gu committed 8ae1b1b

TWS client can receive portfolio updates

Comments (0)

Files changed (8)

examples/print_account_updates.ml

 
 let print_account_updates () =
   Common.with_tws_client (fun tws ->
-    Tws.account_updates_exn tws
-    >>= fun account_updates ->
-    Pipe.iter_without_pushback account_updates ~f:(fun t ->
-      print_endline (Response.Account_update.sexp_of_t t |> Sexp.to_string_hum)
+    Tws.account_and_portfolio_updates_exn tws
+    >>= fun updates ->
+    Pipe.iter_without_pushback updates ~f:(function
+      | `Account_update x ->
+        print_endline (Sexp.to_string_hum (Account_update.sexp_of_t x));
+      | _ -> ()
     )
   )
 

examples/print_portfolio_updates.ml

+open Core.Std
+open Async.Std
+open Ibx.Std
+
+let print_portfolio_updates () =
+  Common.with_tws_client (fun tws ->
+    Tws.account_and_portfolio_updates_exn tws
+    >>= fun updates ->
+    Pipe.iter_without_pushback updates ~f:(function
+      | `Portfolio_update x ->
+        print_endline (Sexp.to_string_hum (Portfolio_update.sexp_of_t x));
+      | _ -> ()
+    )
+  )
+
+let command =
+  Command.async_basic ~summary:"print portfolio updates"
+    Command.Spec.(
+      empty
+      +> Common.logging_flag ()
+      +> Common.host_arg ()
+      +> Common.port_arg ()
+    )
+    (fun enable_logging host port () ->
+      print_portfolio_updates ~enable_logging ~host ~port ()
+      >>= function
+      | Error e -> prerr_endline (Error.to_string_hum e); exit 1
+      | Ok () -> return ()
+    )
+
+let () = Exn.handle_uncaught ~exit:true (fun () -> Command.run command)

lib/raw_contract.ml

         ~symbol:(fields_value (required Symbol.val_type))
         ~contract_type:(fields_value (required string))
         ~expiry:(fields_value (optional date))
-        ~strike:(fields_value (optional Price.val_type ~default_on_none:"0.0"))
-        ~option_right:(fields_value (optional Option_right.val_type))
+        ~strike:(fields_value (optional Price.val_type ~default_on_none:"0"))
+        ~option_right:(fields_value (optional Option_right.val_type
+                                       ~default_on_none:"0"))
         ~multiplier:(fields_value (optional string))
         ~exchange:(fields_value (required Exchange.val_type))
         ~listing_exchange:(fields_value skipped)
         ~name:(field_name Fields.contract_type)
       ++ value (optional date)
         ~name:(field_name Fields.expiry)
-      ++ value (optional Price.val_type ~none_on_default:"0.0")
+      ++ value (optional Price.val_type ~none_on_default:"0")
         ~name:(field_name Fields.strike)
-      ++ value (optional Option_right.val_type)
+      ++ value (optional Option_right.val_type ~none_on_default:"0")
         ~name:(field_name Fields.option_right)
       ++ value (optional string)
         ~name:(field_name Fields.multiplier)
 module Tick_option       = Response.Tick_option
 module Tick_string       = Response.Tick_string
 module Order_status      = Response.Order_status
+module Account_update    = Response.Account_update
+module Portfolio_update  = Response.Portfolio_update
 module Execution_report  = Response.Execution_report
 module Commission_report = Response.Commission_report
 module Contract_specs    = Response.Contract_specs
 
 (* Account and portfolio *)
 
-let account_updates t =
+let account_and_portfolio_updates t =
   let account_code = Option.value_exn (account_code t) in
   let create_query = Query.Account_and_portfolio_updates.create ~account_code in
   let subscribe = create_query ~subscribe:true in
-  dispatch_streaming_request' t Tws_reqs.req_account_updates subscribe >>| function
+  dispatch_streaming_request' t
+    Tws_reqs.req_account_and_portfolio_updates
+    subscribe >>| function
   | Error _ as e -> e
   | Ok pipe_r ->
     let pipe_r = Pipe.filter_map pipe_r ~f:(function
-      | `Account_update data -> Some data
+      | `Account_update _
+      | `Portfolio_update _ as x -> Some x
       | `Account_update_end code ->
         if Account_code.(=) account_code code then begin
-          cancel_streaming_request' t Tws_reqs.req_account_updates
+          cancel_streaming_request' t
+            Tws_reqs.req_account_and_portfolio_updates
         end;
         None)
     in
     Ok pipe_r
 
-let account_updates_exn t = account_updates t >>| Or_error.ok_exn
+let account_and_portfolio_updates_exn t =
+  account_and_portfolio_updates t
+  >>| Or_error.ok_exn
 
 (* Executions *)
 
 (** {1 Account and portfolio} *)
 (******************************************************************************)
 
-val account_updates
+val account_and_portfolio_updates
   :  t
-  -> (Account_update.t Pipe.Reader.t) Or_error.t Deferred.t
+  -> ([ `Account_update of Account_update.t
+      | `Portfolio_update of Portfolio_update.t
+      ] Pipe.Reader.t) Or_error.t Deferred.t
 
-val account_updates_exn
+val account_and_portfolio_updates_exn
   :  t
-  -> (Account_update.t Pipe.Reader.t) Deferred.t
+  -> ([ `Account_update of Account_update.t
+      | `Portfolio_update of Portfolio_update.t
+      ] Pipe.Reader.t) Deferred.t
 
 (** {1 Execution reports} *)
 (******************************************************************************)
 
 (* ================== Account and portfolio ======================= *)
 
-let req_account_updates = Ib.Streaming_request_without_id.create
+let req_account_and_portfolio_updates = Ib.Streaming_request_without_id.create
   ~send_header:(Ib.Header.create ~tag:S.Portfolio_data ~version:2)
   ~recv_header:[
     Ib.Header.create ~tag:R.Account_update ~version:2;
-    Ib.Header.create ~tag:R.Account_download_end ~version:1;
-  ]
-  ~skip_header:[
     Ib.Header.create ~tag:R.Portfolio_update ~version:7;
-    Ib.Header.create ~tag:R.Account_update_time ~version:1;
+    Ib.Header.create ~tag:R.Account_download_end ~version:1;
   ]
+  ~skip_header:[Ib.Header.create ~tag:R.Account_update_time ~version:1]
   ~tws_query:Query.Account_and_portfolio_updates.pickler
   ~tws_response:[
     U.map Response.Account_update.unpickler ~f:(fun x -> `Account_update x);
+    U.map Response.Portfolio_update.unpickler ~f:(fun x -> `Portfolio_update x);
     U.map Account_code.unpickler ~f:(fun x -> `Account_update_end x);
   ] ()
 
 (** {1 Account and portfolio} *)
 (*****************************************************************************)
 
-val req_account_updates :
+val req_account_and_portfolio_updates :
   (Query.Account_and_portfolio_updates.t,
    [ `Account_update of Response.Account_update.t
+   | `Portfolio_update of Response.Portfolio_update.t
    | `Account_update_end of Account_code.t
    ]) Ib.Streaming_request_without_id.t