Commits

Anonymous committed 711baf9

modified local dests to avoid sync problems with lwt

  • Participants
  • Parent commits e864085

Comments (0)

Files changed (1)

     exception ECall of call_resp_error;
 
     type dest_kind =
-      [ DLocal of IO.sink unit and IO.source unit
+      [ DLocal of (unit -> unit)  (* run after put *)
       | DRemote
           of int
           and Cdt.type_name
         value ti_dest_get = ti_abs ti_unit ti_string
         ;
 
+        value ti_dlocal_after_put = ti_abs ti_unit ti_unit
+        ;
 
         value ti_dest_kind =
            ((
                    [| ubox ti_int a ; ubox ti_string b
                     ; ubox ti_dest_put c ; ubox ti_dest_get d
                     |]
-               | DLocal a b -> ti_variant "DLocal"
-                   [| ubox (new IO.ti ti_unit ()) a
-                    ; ubox (new IO.ti ti_unit ()) b
-                    |]
+               | DLocal a -> ti_variant "DLocal"
+                   [| ubox ti_dlocal_after_put a |]
                ]
            ) :> ti _)
         ;
                       match d.dest_kind with
                       [ DRemote i tn_o _ _  ->
                           marshal_ser tn_dest (i, tn_o)
-                      | DLocal _ _ ->
+                      | DLocal _ap ->
                           failwith "can't serialize DLocal %s" &
                             self#type_name
                       ]
       | None ->
           ( d.dest_ref.val := Some v
           ; match d.dest_kind with
-            [ DLocal sink _source -> IO.put_sink sink ()
+            [ DLocal ap ->
+                try ap () with
+                [ e -> d.dest_ref.val := Some (CR_Error (`Exn_string
+                    (Printexc.to_string e)))
+                ]
             | DRemote _ _ _ _ -> ()
             ]
           )
       { dest_ref
       ; dest_kind =
           match d.dest_kind with
-          [ DLocal old_sink _old_source ->
-              let (mid_source, mid_sink) = IO.pipe1 () in
-              let _ : IO.m unit =
-                IO.wait_source mid_source >>= fun () ->
-                let () = d.dest_ref.val :=
-                  match dest_ref.val with
-                  [ None -> failwith "dest_pre_map: sink without filling ref"
-                  | Some (CR_Ok v) -> Some (CR_Ok (m v))
-                  | Some (CR_Error e) -> Some (CR_Error e)
-                  ]
-                in
-                IO.return (IO.put_sink old_sink ())
-              and new_source = IO.error (Failure "dest_pre_map: don't wait")
+          [ DLocal old_ap ->
+              let new_ap () =
+                ( d.dest_ref.val :=
+                    match dest_ref.val with
+                    [ None -> None
+                    | Some (CR_Ok v) -> Some (CR_Ok (m v))
+                    | Some (CR_Error e) -> Some (CR_Error e)
+                    ]
+                ; old_ap ()
+                )
               in
-              DLocal mid_sink new_source
+              DLocal new_ap
           | DRemote _ _ _ _ -> failwith
               "dest_pre_map on DRemote: not implemented"
           ]
         ~mq:(IO.Mq.create ())
     ;
 
+    value dest_local_initial_after_put = fun () -> ()
+    ;
+
     value dest_local
      : unit -> dest 'o
      = fun () ->
-         let (source, sink) = IO.pipe1 () in
          { dest_ref = ref None
-         ; dest_kind = DLocal sink source
+         ; dest_kind = DLocal dest_local_initial_after_put
          }
     ;
 
      = fun d ->
          (
          match d.dest_kind with
-         [ DLocal _sink source ->
-             IO.wait_source source >>= fun () ->
-             IO.return "DLocal"
+         [ DLocal _ap -> IO.return "DLocal"
          | DRemote _ _ _ _ -> IO.return "DRemote"
          ]
          ) >>= fun dk ->