Commits

Oliver Gu committed 4f80691

Minor cleanups

  • Participants
  • Parent commits a053951

Comments (0)

Files changed (2)

    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *)
 
-(* Parts of this module are inspired by the Rpc module of JaneSt's Async
-   library. *)
+(* Parts of this module are inspired by Async's Rpc module. *)
 
 open Core.Std
 open Async.Std
 
 include struct
   open Response
-  module Tws_error = Tws_error
-  module Next_order_id = Next_order_id
-  module Execution_report = Execution_report
+  module Next_order_id     = Next_order_id
+  module Tws_error         = Tws_error
+  module Execution_report  = Execution_report
   module Commission_report = Commission_report
 end
 module Server_log_level = Query.Server_log_level
       version;
     }
 
-  let tws_error         = { tag = Recv_tag.Tws_error        ; version = 2 }
-  let next_order_id     = { tag = Recv_tag.Next_order_id    ; version = 1 }
-  let managed_accounts  = { tag = Recv_tag.Managed_accounts ; version = 1 }
-  let execution_report  = { tag = Recv_tag.Execution_report ; version = 9 }
-  let commission_report = { tag = Recv_tag.Commission_report; version = 1 }
+  module R = Recv_tag
+  let managed_accounts  = { tag = R.Managed_accounts  ; version = 1 }
+  let next_order_id     = { tag = R.Next_order_id     ; version = 1 }
+  let tws_error         = { tag = R.Tws_error         ; version = 2 }
+  let execution_report  = { tag = R.Execution_report  ; version = 9 }
+  let commission_report = { tag = R.Commission_report ; version = 1 }
 end
 
 module Ibx_error = struct
     data     : Response_data.t;
   } with fields, sexp
 
-  let pickler = Only_in_test.of_thunk (fun () ->
+  let pickler = lazy (
     Pickler.create ~name:"Response"
       Pickler.Spec.(
         wrap (
   and response_handler = Response_handler.handler
   and logfun = [ `Send of Query.t | `Recv of Response.t ] -> unit
 
-  let init_handler ?id t ~header ~unpickler ~action ~f =
-    let id = Option.value id ~default:t.default_query_id in
+  let init_handler t ~header ~unpickler ~action ~f =
+    let id = t.default_query_id in
     let tag = header.Header.tag in
     let version = header.Header.version in
     Hashtbl.replace t.open_queries ~key:(id, tag, version) ~data:(fun response ->
       Reader.read_until reader null_delim_pred ~keep_delim:false >>| function
       | `Eof -> `Eof
       | `Ok _ as x -> x
+      | `Eof_without_delim s ->
         (* We pretend that everything is Ok here and handle the error later. *)
-      | `Eof_without_delim s -> `Ok s
+        `Ok s
     in
     let logfun send_recv =
       let tr_null s = String.tr s ~target:'\000' ~replacement:'|' in
         let msg = to_tws Query.pickler query in
         Log.Global.debug ">> %s" (tr_null msg)
       | `Recv response ->
-        let msg = to_tws (Only_in_test.force Response.pickler) response in
+        let msg = to_tws (Lazy.force Response.pickler) response in
         Log.Global.debug "<< %s" (tr_null msg)
     in
     let t =
       }
     in
     init_handler t
-      ~header:Header.tws_error
-      ~unpickler:Tws_error.unpickler
-      ~action:`Keep
-      ~f:(fun e -> extend_status ("TWS " ^ Tws_error.to_string_hum e));
+      ~header:Header.managed_accounts
+      ~unpickler:Account_code.unpickler
+      ~action:`Remove
+      ~f:(Ivar.fill t.account_code);
     init_handler t
       ~header:Header.next_order_id
       ~unpickler:Next_order_id.unpickler
       ~action:`Remove
       ~f:(Ivar.fill t.next_order_id);
     init_handler t
-      ~header:Header.managed_accounts
-      ~unpickler:Account_code.unpickler
-      ~action:`Remove
-      ~f:(Ivar.fill t.account_code);
+      ~header:Header.tws_error
+      ~unpickler:Tws_error.unpickler
+      ~action:`Keep
+      ~f:(fun e -> extend_status ("TWS " ^ Tws_error.to_string_hum e));
     init_handler t
       ~header:Header.execution_report
       ~unpickler:Execution_report.unpickler
       ~f:extend_commission_report;
     return t
 
+  let next_query_id t =
+    let new_id = Query_id.create () in
+    Ivar.read t.next_order_id
+    >>| fun oid -> Query_id.increase new_id (Raw_order.Id.to_int_exn oid)
+
   let is_closed t = Ivar.is_full t.stop
   let closed t = Ivar.read t.stop
 
       Pipe.close_read t.reader
     end else Deferred.unit
 
-  let writer t = if Ivar.is_full t.stop then Error `Closed else Ok t.writer
-
-  let send_tws writer pickler msg =
-    Writer.write writer (to_tws pickler msg)
+  let send_tws writer pickler msg = Writer.write writer (to_tws pickler msg)
 
   let send_query ?logfun writer query =
     begin match logfun with
         | Ok x -> Ok (`Ok x)
       end
 
-  let read_version_and_query_id reader tag =
+  let read_version_and_optional_id reader tag =
     if Recv_tag.corresponding_response_has_query_id tag then
-      let unpickler = Unpickler.create ~name:"Version_id"
-        Unpickler.Spec.(
-          value (required int) ~name:"version"
-          ++ value (required Query_id.val_type) ~name:"query_id")
-        (fun version query_id -> (version, Some query_id))
+      let unpickler =
+        Unpickler.create ~name:"Version_id"
+          Unpickler.Spec.(
+            value (required int) ~name:"version"
+            ++ value (required Query_id.val_type) ~name:"id"
+          )
+          (fun version id -> (version, Some id))
       in
       read_tws reader unpickler ~len:2
     else
-      let unpickler = Unpickler.create ~name:"Version"
-        Unpickler.Spec.(value (required int) ~name:"version")
-        (fun version -> (version, None))
+      let unpickler =
+        Unpickler.create ~name:"Version"
+          Unpickler.Spec.(
+            value (required int) ~name:"version"
+          )
+          (fun version -> (version, None))
       in
       read_tws reader unpickler ~len:1
 
   let read_response reader =
     read_tws reader Recv_tag.unpickler ~len:1
     >>=~ fun tag ->
-    read_version_and_query_id reader tag
-    >>=~ fun (version, query_id) ->
+    read_version_and_optional_id reader tag
+    >>=~ fun (version, id) ->
     read_body reader tag
     >>|~ fun data ->
     { Response.
       tag;
       version;
-      query_id;
+      query_id = id;
       data = Ok (`Response data);
     }
 
-  module Handshake_result = struct
-    type t =
-    | Eof
-    | Version_failure of int
-    | Server_header of [ `Version of int ] * Time.t * Account_code.t
-    with sexp
-  end
+  let writer t = if Ivar.is_full t.stop then Error `Closed else Ok t.writer
 
   let set_server_log_level t ~level =
     match writer t with
         let tag = h.Response_handler.tag in
         let version = h.Response_handler.version in
         let run = h.Response_handler.run in
-        Hashtbl.replace t.open_queries ~key:(id, tag, version) ~data:run);
+        Hashtbl.replace t.open_queries ~key:(id, tag, version) ~data:run
+      );
       Ok ()
 
   let cancel_streaming ?query t ~recv_header ~query_id =
     match writer t with
     | Error `Closed as x -> x
     | Ok writer ->
-      Option.iter query ~f:(send_query ?logfun:t.logfun writer);
+      begin match query with
+      | None -> ()
+      | Some query -> send_query ?logfun:t.logfun writer query
+      end;
       List.iter recv_header ~f:(fun header ->
         let tag = header.Header.tag in
         let version = header.Header.version in
       );
       Ok ()
 
-  let next_query_id t =
-    let new_id = Query_id.create () in
-    Ivar.read t.next_order_id
-    >>| fun oid -> Query_id.increase new_id (Raw_order.Id.to_int_exn oid)
-
   let handle_response t response =
     let id = Option.value response.Response.query_id ~default:t.default_query_id in
     let tag = response.Response.tag in
         t.extend_error (Ibx_error.to_error error));
     Scheduler.within ~monitor loop
 
+  module Handshake_result = struct
+    type t =
+    | Eof
+    | Version_failure of int
+    | Server_header of [ `Version of int ] * Time.t * Account_code.t
+    with sexp
+  end
+
   let try_connect t ~client_version ~client_id =
     let client_header = {
       Client_header.
 
 (** A module for building TWS clients
 
-    Parts of this module are inspired by the Rpc module of JaneSt's Async
-    library.
+    Parts of this module are inspired by Async's Rpc module.
 *)
 
 open Core.Std
     -> tws_response : 'response Tws_prot.Unpickler.t
     -> ('query, 'response) t
 
-  val dispatch :
-    ('query, 'response) t
+  val dispatch
+    :  ('query, 'response) t
     -> Connection.t
     -> 'query
     -> 'response Or_error.t Deferred.t
 
-  val dispatch_exn :
-    ('query, 'response) t
+  val dispatch_exn
+    :  ('query, 'response) t
     -> Connection.t
     -> 'query
     -> 'response Deferred.t
     -> unit
     -> ('query, 'response) t
 
-  val dispatch :
-    ('query, 'response) t
+  val dispatch
+    :  ('query, 'response) t
     -> Connection.t
     -> 'query
     -> ('response Pipe.Reader.t * Id.t) Or_error.t Deferred.t
 
-  val dispatch_exn :
-    ('query, 'response) t
+  val dispatch_exn
+    :  ('query, 'response) t
     -> Connection.t
     -> 'query
     -> ('response Pipe.Reader.t * Id.t) Deferred.t