Commits

Oliver Gu  committed 90064eb

Added flag to create streaming requests with a default id

  • Participants
  • Parent commits ad8da6c

Comments (0)

Files changed (2)

     -> Query.t
     -> (unit, [ `Closed ]) Result.t
 
-  val cancel_streaming :
-    ?query:Query.t
+  val cancel_streaming
+    :  ?query:Query.t
     -> t
     -> recv_header:Recv_tag.t Header.t list
     -> query_id:Query_id.t
     -> (unit, [ `Closed ]) Result.t
 
-  val next_query_id : t -> Query_id.t Deferred.t
+  val next_query_id
+    :  ?use_default_id:bool
+    -> t
+    -> Query_id.t Deferred.t
 end
 
 module Connection : Connection_internal = struct
       ~f:extend_commission_report;
     return t
 
-  let next_query_id t =
-    let new_id = Query_id.create () in
-    Ivar.read t.next_order_id
-    >>| fun oid -> Query_id.increase new_id (Raw_order.Id.to_int_exn oid)
+  let next_query_id ?(use_default_id=false) t =
+    if use_default_id then
+      return Query_id.default
+    else begin
+      let new_id = Query_id.create () in
+      Ivar.read t.next_order_id
+      >>| fun oid -> Query_id.increase new_id (Raw_order.Id.to_int_exn oid)
+    end
 
   let is_closed t = Ivar.is_full t.stop
   let closed t = Ivar.read t.stop
 
 module Streaming_request = struct
   type ('query, 'response) t =
-    { send_header  : Send_tag.t Header.t;
+    { use_default_id : bool;
+      send_header  : Send_tag.t Header.t;
       canc_header  : Send_tag.t Header.t option;
       recv_header  : Recv_tag.t Header.t list;
       skip_header  : Recv_tag.t Header.t list option;
 
   module Id = Query_id
 
-  let create ?canc_header ?skip_header
+  let create ?(use_default_id=false) ?canc_header ?skip_header
       ~send_header ~recv_header ~tws_query ~tws_response () =
-    { send_header;
+    { use_default_id;
+      send_header;
       canc_header;
       recv_header;
       skip_header;
     }
 
   let dispatch t con query =
-    Connection.next_query_id con
+    Connection.next_query_id con ~use_default_id:t.use_default_id
     >>= fun query_id ->
     let ivar = Ivar.create () in
     let pipe_r, pipe_w = Pipe.create () in
   module Id : Unique_id
 
   val create
-    :  ?canc_header:Send_tag.t Header.t
+    :  ?use_default_id:bool (* default: false *)
+    -> ?canc_header:Send_tag.t Header.t
     -> ?skip_header:Recv_tag.t Header.t list
     -> send_header:Send_tag.t Header.t
     -> recv_header:Recv_tag.t Header.t list