Commits

Oliver Gu committed d7dfc9a

Response handler expects header instead of tag and version

Comments (0)

Files changed (1)

     tag : 'a;
     version : int;
   } with sexp
-  let create ~tag ~version = { tag; version; }
+
+  let create ~tag ~version =
+    { tag;
+      version;
+    }
+
+  let tws_error         = { tag = Recv_tag.Tws_error        ; version = 2 }
+  let next_order_id     = { tag = Recv_tag.Next_order_id    ; version = 1 }
+  let managed_accounts  = { tag = Recv_tag.Managed_accounts ; version = 1 }
+  let execution_report  = { tag = Recv_tag.Execution_report ; version = 9 }
+  let commission_report = { tag = Recv_tag.Commission_report; version = 1 }
 end
 
 module Ibx_error = struct
     run     : handler;
   }
 
-  let create ~tag ~version ~run = { tag; version; run }
+  let create ~header ~run =
+    { tag = header.Header.tag;
+      version = header.Header.version;
+      run;
+    }
 end
 
 module type Connection_internal = sig
   and response_handler = Response_handler.handler
   and logfun = [ `Send of Query.t | `Recv of Response.t ] -> unit
 
-  let init_handler ?id t ~tag ~version ~unpickler ~action ~f =
+  let init_handler ?id t ~header ~unpickler ~action ~f =
+    let tag = header.Header.tag in
+    let version = header.Header.version in
     Hashtbl.replace t.open_queries
       ~key:(Option.value id ~default:t.default_query_id, tag, version)
       ~data:(fun response ->
       }
     in
     init_handler t
-      ~tag:Recv_tag.Tws_error
-      ~version:2
+      ~header:Header.tws_error
       ~unpickler:Tws_error.unpickler
       ~action:`Keep
       ~f:(fun e -> extend_status ("TWS " ^ Tws_error.to_string_hum e));
     init_handler t
-      ~tag:Recv_tag.Next_order_id
-      ~version:1
+      ~header:Header.next_order_id
       ~unpickler:Next_order_id.unpickler
       ~action:`Remove
       ~f:(Ivar.fill t.next_order_id);
     init_handler t
-      ~tag:Recv_tag.Managed_accounts
-      ~version:1
+      ~header:Header.managed_accounts
       ~unpickler:Account_code.unpickler
       ~action:`Remove
       ~f:(Ivar.fill_if_empty t.account_code);
     init_handler t
-      ~tag:Recv_tag.Execution_report
-      ~version:9
+      ~header:Header.execution_report
       ~unpickler:Execution_report.unpickler
       ~action:`Keep
       ~f:extend_execution_report;
     init_handler t
-      ~tag:Recv_tag.Commission_report
-      ~version:1
+      ~header:Header.commission_report
       ~unpickler:Commission_report.unpickler
       ~action:`Keep
       ~f:extend_commission_report;
       }
     in
     let handler =
-      Response_handler.create
-        ~tag:t.recv_header.Header.tag
-        ~version:t.recv_header.Header.version
-        ~run:(fun response ->
-          match response.Response.data with
-          | Error err as x ->
-            (* If this handler died before, the ivar is already filled. *)
-            Ivar.fill_if_empty ivar x;
-            return (`Die err)
-          | Ok `Cancel ->
-            assert false (* Non-streaming requests are not cancelable. *)
-          | Ok (`Response data) ->
-            begin
-              match of_tws t.tws_response data with
-              | Error err as x ->
-                Ivar.fill ivar x;
-                return (`Die err)
-              | Ok _response as x ->
-                Ivar.fill ivar x;
-                return `Remove
-            end)
+      Response_handler.create ~header:t.recv_header ~run:(fun response ->
+        match response.Response.data with
+        | Error err as x ->
+          (* If this handler died before, the ivar is already filled. *)
+          Ivar.fill_if_empty ivar x;
+          return (`Die err)
+        | Ok `Cancel ->
+          assert false (* Non-streaming requests are not cancelable. *)
+        | Ok (`Response data) ->
+          begin
+            match of_tws t.tws_response data with
+            | Error err as x ->
+              Ivar.fill ivar x;
+              return (`Die err)
+            | Ok _response as x ->
+              Ivar.fill ivar x;
+              return `Remove
+          end)
     in
     begin
       match Connection.dispatch con ~handlers:[handler] query with
       }
     in
     let error_handler =
-      Response_handler.create
-        ~tag:Recv_tag.Tws_error
-        ~version:1
-        ~run:(fun response ->
-          match response.Response.data with
-          | Error err ->
-            Pipe.close pipe_w;
-            return (`Die err)
-          | Ok `Cancel ->
-            Pipe.close pipe_w;
-            return `Remove
-          | Ok (`Response data) ->
-            begin
-              match of_tws Tws_error.unpickler data with
-              | Error err ->
-                Pipe.close pipe_w;
+      Response_handler.create ~header:Header.tws_error ~run:(fun response ->
+        match response.Response.data with
+        | Error err ->
+          Pipe.close pipe_w;
+          return (`Die err)
+        | Ok `Cancel ->
+          Pipe.close pipe_w;
+          return `Remove
+        | Ok (`Response data) ->
+          begin
+            match of_tws Tws_error.unpickler data with
+            | Error err ->
+              Pipe.close pipe_w;
+              return (`Die err)
+            | Ok response ->
+              let err = Ibx_error.Tws_error (
+                "TWS Error " ^ Tws_error.to_string_hum response
+              ) in
+              if Ivar.is_empty ivar then begin
+                Ivar.fill ivar (Error err);
+                return `Remove
+              end else
                 return (`Die err)
-              | Ok response ->
-                let err = Ibx_error.Tws_error (
-                  "TWS Error " ^ Tws_error.to_string_hum response
-                ) in
-                if Ivar.is_empty ivar then begin
-                  Ivar.fill ivar (Error err);
-                  return `Remove
-                end else
-                  return (`Die err)
-            end)
+          end)
     in
     let skip_handlers =
       match t.skip_header with
       | None -> []
       | Some skip_headers ->
         List.map skip_headers ~f:(fun header ->
-          Response_handler.create
-            ~tag:header.Header.tag
-            ~version:header.Header.version
-            ~run:(fun response ->
-              match response.Response.data with
-              | Error err -> return (`Die err)
-              | Ok `Cancel -> return `Remove
-              | Ok (`Response _) -> return `Keep))
+          Response_handler.create ~header ~run:(fun response ->
+            match response.Response.data with
+            | Error err -> return (`Die err)
+            | Ok `Cancel -> return `Remove
+            | Ok (`Response _) -> return `Keep))
     in
     let data_handler_result = Result.try_with (fun () ->
-      List.map2_exn t.recv_header t.tws_response
-        ~f:(fun header unpickler ->
-          Response_handler.create
-            ~tag:header.Header.tag
-            ~version:header.Header.version
-            ~run:(fun response ->
-              let update pipe_w response =
-                match response.Response.data with
+      List.map2_exn t.recv_header t.tws_response ~f:(fun header unpickler ->
+        Response_handler.create ~header ~run:(fun response ->
+          let update pipe_w response =
+            match response.Response.data with
+            | Error err ->
+              Pipe.close pipe_w;
+              return (`Die err)
+            | Ok `Cancel ->
+              Pipe.close pipe_w;
+              return `Remove
+            | Ok (`Response data) ->
+              begin
+                match of_tws unpickler data with
                 | Error err ->
                   Pipe.close pipe_w;
                   return (`Die err)
-                | Ok `Cancel ->
-                  Pipe.close pipe_w;
-                  return `Remove
-                | Ok (`Response data) ->
-                  begin
-                    match of_tws unpickler data with
-                    | Error err ->
-                      Pipe.close pipe_w;
-                      return (`Die err)
-                    | Ok response ->
-                      if not (Pipe.is_closed pipe_w) then begin
-                        (* We guard this write call to protect us against
-                           incoming messages after a cancelation, causing
-                           a write call to a closed pipe. *)
-                        don't_wait_for (Pipe.write pipe_w response)
-                      end;
-                      return `Keep
-                  end
-              in
-              match response.Response.data with
+                | Ok response ->
+                  if not (Pipe.is_closed pipe_w) then begin
+                    (* We guard this write call to protect us against
+                       incoming messages after a cancelation, causing
+                       a write call to a closed pipe. *)
+                    don't_wait_for (Pipe.write pipe_w response)
+                  end;
+                  return `Keep
+              end
+          in
+          match response.Response.data with
+          | Error err as x ->
+            Pipe.close pipe_w;
+            Ivar.fill_if_empty ivar x;
+            return (`Die err)
+          | Ok `Cancel ->
+            Pipe.close pipe_w;
+            return `Remove
+          | Ok (`Response data) ->
+            begin
+              match of_tws unpickler data with
               | Error err as x ->
                 Pipe.close pipe_w;
                 Ivar.fill_if_empty ivar x;
                 return (`Die err)
-              | Ok `Cancel ->
-                Pipe.close pipe_w;
-                return `Remove
-              | Ok (`Response data) ->
-                begin
-                  match of_tws unpickler data with
-                  | Error err as x ->
-                    Pipe.close pipe_w;
-                    Ivar.fill_if_empty ivar x;
-                    return (`Die err)
-                  | Ok response ->
-                    don't_wait_for (Pipe.write pipe_w response);
-                    (* We fill the ivar only in the first iteration. *)
-                    Ivar.fill_if_empty ivar (Ok (pipe_r, query_id));
-                    return (`Replace (update pipe_w))
-                end)))
+              | Ok response ->
+                don't_wait_for (Pipe.write pipe_w response);
+                (* We fill the ivar only in the first iteration. *)
+                Ivar.fill_if_empty ivar (Ok (pipe_r, query_id));
+                return (`Replace (update pipe_w))
+            end)))
     in
     begin
       match data_handler_result with
   let dispatch_exn t con query = dispatch t con query >>| Or_error.ok_exn
 
   let cancel t con query_id =
-    let recv_header =
-      { Header.
-        tag = Recv_tag.Tws_error;
-        version = 1
-      } :: t.recv_header
-    in
+    let recv_header = Header.tws_error :: t.recv_header in
     let result =
       match t.canc_header with
       | None -> Connection.cancel_streaming con ~recv_header ~query_id