Commits

Oliver Gu committed 932352e

TWS client can receive account updates

Comments (0)

Files changed (9)

       | R.Order_status -> read ~len:9
       | R.Tws_error -> read ~len:2
       | R.Open_order -> read ~len:93
-      | R.Account_update -> read ~len:1
-      | R.Portfolio_update -> unimplemented R.Portfolio_update
-      | R.Account_update_time -> unimplemented R.Account_update_time
+      | R.Account_update -> read ~len:4
+      | R.Portfolio_update -> read ~len:17
+      | R.Account_update_time -> read ~len:1
       | R.Next_order_id -> read ~len:1
       | R.Contract_data -> read ~len:26
       | R.Execution_report -> read ~len:23
       | R.Fundamental_data -> unimplemented R.Fundamental_data
       | R.Contract_data_end -> empty_read
       | R.Open_order_end -> empty_read
-      | R.Account_download_end -> unimplemented R.Account_download_end
+      | R.Account_download_end -> read ~len:1
       | R.Execution_report_end -> empty_read
       | R.Delta_neutral_validation -> unimplemented R.Delta_neutral_validation
       | R.Snapshot_end -> empty_read
       { Query.
         tag     = t.send_header.Header.tag;
         version = t.send_header.Header.version;
-        id      = Some query_id;
+        id      = Option.some_if (not t.use_default_id) query_id;
         data    = to_tws t.tws_query query;
       }
     in
   | Fundamental_data -> false
   | Contract_data_end -> true
   | Open_order_end -> true
-  | Account_download_end -> true
+  | Account_download_end -> false
   | Execution_report_end -> true
   | Delta_neutral_validation -> true
   | Snapshot_end -> true
   type t = {
     key : string;
     value : string;
-    currency : Currency.t;
+    currency : string option;
     account_code : Account_code.t;
   } with sexp, fields
 
           ~init:(empty ())
           ~key:(fields_value (required string))
           ~value:(fields_value (required string))
-          ~currency:(fields_value (required Currency.val_type))
+          ~currency:(fields_value (optional string))
           ~account_code:(fields_value (required Account_code.val_type)))
       (fun key value currency account_code ->
         { key; value; currency; account_code })
             ~init:(empty ())
             ~key:(fields_value (required string))
             ~value:(fields_value (required string))
-            ~currency:(fields_value (required Currency.val_type))
+            ~currency:(fields_value (optional string))
             ~account_code:(fields_value (required Account_code.val_type)))
           (fun t -> `Args $ t.key $ t.value $ t.currency $ t.account_code)))
 end
   type t = private
     { key : string;
       value : string;
-      currency : Currency.t;
+      currency : string option;
       account_code : Account_code.t;
     }
   with sexp, fields
   val create :
     key:string
     -> value:string
-    -> currency:Currency.t
+    -> currency:string option
     -> account_code:Account_code.t
     -> t
 end
   let id = Query_id.of_int_exn (Order_id.to_int_exn oid) in
   cancel_streaming_request t Tws_reqs.req_submit_order id
 
+(* Account and portfolio *)
+
+let account_updates t =
+  let account_code = Option.value_exn (account_code t) in
+  let create_query = Query.Account_and_portfolio_updates.create ~account_code in
+  let subscribe = create_query ~subscribe:true in
+  dispatch_streaming_request t Tws_reqs.req_account_updates subscribe >>| function
+  | Error _ as e -> e
+  | Ok (pipe_r, id) ->
+    let pipe_r = Pipe.filter_map pipe_r ~f:(function
+      | `Account_update data -> Some data
+      | `Account_update_end code ->
+        if Account_code.(=) account_code code then begin
+          cancel_streaming_request t Tws_reqs.req_account_updates id
+        end;
+        None)
+    in
+    Ok pipe_r
+
+let account_updates_exn t = account_updates t >>| Or_error.ok_exn
+
 (* Executions *)
 
 let filter_executions ?time t ~contract ~order_action =
 let filter_executions_exn ?time t ~contract ~order_action =
   filter_executions ?time t ~contract ~order_action >>| Or_error.ok_exn
 
-(* Portfolio *)
-
 (* Contract specs *)
 
 let contract_specs t ~contract =
 
 val cancel_order_status : t -> Order_id.t -> unit
 
+(** {1 Account and portfolio} *)
+(******************************************************************************)
+
+val account_updates
+  :  t
+  -> (Account_update.t Pipe.Reader.t) Or_error.t Deferred.t
+
+val account_updates_exn
+  :  t
+  -> (Account_update.t Pipe.Reader.t) Deferred.t
 
 (** {1 Execution reports} *)
 (******************************************************************************)
   ~tws_response:[Response.Order_status.unpickler]
   ()
 
+(* ================== Account and portfolio ======================= *)
+
+let req_account_updates = Ib.Streaming_request.create
+  ~use_default_id:true
+  ~send_header:(Ib.Header.create ~tag:S.Portfolio_data ~version:2)
+  ~recv_header:[
+    Ib.Header.create ~tag:R.Account_update ~version:2;
+    Ib.Header.create ~tag:R.Account_download_end ~version:1;
+  ]
+  ~skip_header:[
+    Ib.Header.create ~tag:R.Portfolio_update ~version:7;
+    Ib.Header.create ~tag:R.Account_update_time ~version:1;
+  ]
+  ~tws_query:Query.Account_and_portfolio_updates.pickler
+  ~tws_response:[
+    U.map Response.Account_update.unpickler ~f:(fun x -> `Account_update x);
+    U.map Account_code.unpickler ~f:(fun x -> `Account_update_end x);
+  ] ()
+
 (* ===================== Execution reports ======================== *)
 
 let req_execution_reports = Ib.Streaming_request.create
   ~tws_response:[
     U.map Response.Execution_report.unpickler ~f:(fun x -> `Execution_report x);
     U.const `Execution_report_end;
-  ]
-  ()
+  ] ()
 
 (* ======================== Market depth ========================== *)
 
 val req_submit_order :
   (Query.Submit_order.t, Response.Order_status.t) Ib.Streaming_request.t
 
+(** {1 Account and portfolio} *)
+(*****************************************************************************)
+
+val req_account_updates :
+  (Query.Account_and_portfolio_updates.t,
+   [ `Account_update of Response.Account_update.t
+   | `Account_update_end of Account_code.t
+   ]) Ib.Streaming_request.t
+
 (** {1 Execution data} *)
 (*****************************************************************************)
 
     Response.Account_update.create
       ~key:(sg ())
       ~value:(sg ())
-      ~currency:(currency_g ())
+      ~currency:(og sg ())
       ~account_code:(account_code_g ())
 
   let portfolio_update_g () =