1. Sebastien Mondet
  2. pvem_lwt_net

Commits

Sebastien Mondet  committed e180e39

Use the module `Light` from Pvem_lwt_unix

  • Participants
  • Parent commits 0564a9c
  • Branches master

Comments (0)

Files changed (2)

File pvem_lwt_net.ml

View file
 
 end
 
-module Waiter = struct
-  (* This should go to Pvem_lwt_unix *)
-
-  type t = {
-    mutable lwt_t: unit Lwt.t;
-    mutable lwt_u: unit Lwt.u;
-    mutable awake: bool;
-  }
-  let create () =
-    let lwt_t, lwt_u = Lwt.task () in
-    {lwt_u; lwt_t; awake = false}
-
-  let wait w =
-    match w.awake with
-    | true -> return ()
-    | false ->
-      begin match Lwt.state w.lwt_t with
-      | Lwt.Sleep -> ()
-      | Lwt.Return () | Lwt.Fail _ ->
-        let t, u = Lwt.task () in
-        w.lwt_t <- t;
-        w.lwt_u <- u;
-      end;
-      wrap_deferred ~on_exn:(fun e -> e) (fun () -> w.lwt_t)
-      >>< function
-      | `Error Lwt.Canceled -> return ()
-      | `Error other -> failwith "BUG: THIS SHOULD NOT HAPPEN"
-      | `Ok () -> return ()
-
-  let wake_up t =
-    t.awake <- true;
-    Lwt.wakeup_exn t.lwt_u Lwt.Canceled
-  (* We use Lwt.Canceled so that can re-wake-up sleepers at will
-    see https://github.com/ocsigen/lwt/blob/master/src/core/lwt.ml#L312
-    where Lwt.Canceled is ignored *)
-
-end
+(* module Light = struct *)
+(*   (1* This should go to Pvem_lwt_unix *1) *)
+
+(*   type t = { *)
+(*     mutable lwt_t: unit Lwt.t; *)
+(*     mutable lwt_u: unit Lwt.u; *)
+(*     mutable awake: bool; *)
+(*   } *)
+(*   let create () = *)
+(*     let lwt_t, lwt_u = Lwt.task () in *)
+(*     {lwt_u; lwt_t; awake = false} *)
+
+(*   let wait w = *)
+(*     match w.awake with *)
+(*     | true -> return () *)
+(*     | false -> *)
+(*       begin match Lwt.state w.lwt_t with *)
+(*       | Lwt.Sleep -> () *)
+(*       | Lwt.Return () | Lwt.Fail _ -> *)
+(*         let t, u = Lwt.task () in *)
+(*         w.lwt_t <- t; *)
+(*         w.lwt_u <- u; *)
+(*       end; *)
+(*       wrap_deferred ~on_exn:(fun e -> e) (fun () -> w.lwt_t) *)
+(*       >>< function *)
+(*       | `Error Lwt.Canceled -> return () *)
+(*       | `Error other -> failwith "BUG: THIS SHOULD NOT HAPPEN" *)
+(*       | `Ok () -> return () *)
+
+(*   let wake_up t = *)
+(*     t.awake <- true; *)
+(*     Lwt.wakeup_exn t.lwt_u Lwt.Canceled *)
+(*   (1* We use Lwt.Canceled so that can re-wake-up sleepers at will *)
+(*     see https://github.com/ocsigen/lwt/blob/master/src/core/lwt.ml#L312 *)
+(*     where Lwt.Canceled is ignored *1) *)
+
+(* end *)
 
 module type SERVER = sig
 
     ?socket_options:Socket_options.t list ->
     ?accept_number:int ->
     ?listen_on:listen_address ->
-    ?stop_on:Waiter.t ->
+    ?stop_on:Light.t ->
     port:int ->
     ([ `Accept_exn of exn | `Connection of Connection.t] -> (unit, unit) Deferred_result.t) ->
-    (Waiter.t, [> `Net of [> tcp_server_error ] ]) Deferred_result.t
+    (Light.t, [> `Net of [> tcp_server_error ] ]) Deferred_result.t
    (** 
      The call
      [plain ~accept_number ~listen_on ~stop_on ~port handler >>= fun wait_for_end -> ...]
     ?ssl_protocol_version:[ `SSLv23 | `TLSv1 ] ->
     cert_key:string * string ->
     ?listen_on:listen_address ->
-    ?stop_on:Waiter.t ->
+    ?stop_on:Light.t ->
     port:int ->
     ([ `Accept_exn of exn | `Connection of Connection.t | `Openssl_exn of exn * string ] -> (unit, unit) Deferred_result.t) ->
-    (Waiter.t, [> `Net of [> tcp_server_error | `TLS of [> `Context_exn of exn ] ] ]) Deferred_result.t
+    (Light.t, [> `Net of [> tcp_server_error | `TLS of [> `Context_exn of exn ] ] ]) Deferred_result.t
 
 end
 module Server : SERVER = struct
     | None -> return None
     end
     >>= fun tls_opt ->
-    let caller_waiter = Waiter.create () in
-    let self_stopper = Waiter.create () in
+    let caller_waiter = Light.create () in
+    let self_stopper = Light.create () in
     let stop_on =
       match stop_on with
       | Some s -> s
             end;
             begin
               Debug.(p@fun () -> f  "[accept_loop %d] block on stop_on" count);
-              Waiter.wait stop_on >>= fun _ -> 
+              Light.try_to_pass stop_on >>= fun _ -> 
               Debug.(p@fun () -> f  "[accept_loop %d] stopped by user" count);
               return  `Stop
             end;
             begin
               Debug.(p@fun () -> f  "[accept_loop %d] block on self_stopper" count);
-              Waiter.wait self_stopper >>= fun _ -> 
+              Light.try_to_pass self_stopper >>= fun _ -> 
               Debug.(p@fun () -> f  "[accept_loop %d] (re-)stopped by myself" count);
               return  `Stop
             end;
               p @ fun () ->
                 f "[handle_one %d] stopping everything (handler return)" count
             );
-            Waiter.wake_up self_stopper;
-            Waiter.wake_up caller_waiter;
+            Light.green self_stopper;
+            Light.green caller_waiter;
             return ()
         in
         let handle_one accepted =
         return ()
       | `Stop ->
         Debug.(p@fun () -> f  "[accept_loop %d on `Stop] stopping everything" count);
-        Waiter.wake_up self_stopper;
-        Waiter.wake_up caller_waiter;
+        Light.green self_stopper;
+        Light.green caller_waiter;
         Lwt_unix.shutdown socket Lwt_unix.SHUTDOWN_ALL;
         return ()
     in

File test/all.ml

View file
     let stop_on, stop_fun, connection_return =
       match stops with
       | `With_stop_on ->
-        let stop_on = Waiter.create () in
-        (Some stop_on, (fun () -> Waiter.wake_up stop_on), return)
+        let stop_on = Light.create () in
+        (Some stop_on, (fun () -> Light.green stop_on), return)
       | `By_failing_in_handler ->
         (None, (fun () -> ()), fail)
     in
       end;
       begin
         say "Waiting for server to end: %f" (Unix.gettimeofday ());
-        Waiter.wait wait_for_end_of_server >>= fun () ->
+        Light.try_to_pass wait_for_end_of_server >>= fun () ->
         say "Server ended: %f" (Unix.gettimeofday ());
         fail_if_exn Lwt.(fun () -> Lwt_unix.sleep 1.)
       end;
     let max_connections = match push_to with `Ten -> 3 | `Eleven -> 400 in
     (* The actual limit will depend the max number of file descriptors,
        which can be set (within global limits) with `ulimit -n <nb>` *)
-    let stop_on = Waiter.create () in
+    let stop_on = Light.create () in
     Server.tls
       ~cert_key:(cert, key) ~listen_on:`Any ~port
       ~accept_number:20 ~stop_on
       Deferred_list.while_sequential [1;2] client
       >>= fun _ ->
       say "2 more clients went through";
-      Waiter.wake_up stop_on;
+      Light.green stop_on;
       return ()
     end
     >>= fun () ->
     say "Waiting for server to end";
     (* System.with_timeout 4. (fun () -> *) 
-    Waiter.wait wait_for_end_of_server
+    Light.try_to_pass wait_for_end_of_server
     (* ) *)
     >>= fun () ->
     ksprintf System.Shell.do_or_fail "rm -f %s %s" key cert