Commits

Anonymous committed 60fa81a

lwt_mvar_parvel copied to lwt_mq_parvel for my local modifications

Comments (0)

Files changed (2)

+(* -*- Mode: Caml; indent-tabs-mode: nil -*- *)
+(******************************************************************************)
+(* Lightweight thread library for Objective Caml
+ * http://www.ocsigen.org/lwt
+ * Interface Lwt_mon
+ ******************************************************************************
+ * Copyright (c) 2009, Metaweb Technologies, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ *       copyright notice, this list of conditions and the following
+ *       disclaimer in the documentation and/or other materials provided
+ *       with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY METAWEB TECHNOLOGIES ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL METAWEB TECHNOLOGIES BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+ * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************)
+
+let return_unit = Lwt.return ()
+
+type 'a t = {
+  mutable contents : 'a option;
+  (* Current contents *)
+
+  mutable writers : ('a * unit Lwt.u) Lwt_sequence.t;
+  (* Threads waiting to put a value *)
+
+  mutable readers : 'a Lwt.u Lwt_sequence.t;
+  (* Threads waiting for a value *)
+}
+
+let create_empty () =
+  { contents = None;
+    writers = Lwt_sequence.create ();
+    readers = Lwt_sequence.create () }
+
+let create v =
+  { contents = Some v;
+    writers = Lwt_sequence.create ();
+    readers = Lwt_sequence.create () }
+
+let put mvar v =
+  match mvar.contents with
+    | None ->
+        begin match Lwt_sequence.take_opt_l mvar.readers with
+          | None ->
+              mvar.contents <- Some v
+          | Some w ->
+              Lwt.wakeup w v
+        end;
+        return_unit
+    | Some _ ->
+        let (res, w) = Lwt.task () in
+        let node = Lwt_sequence.add_r (v, w) mvar.writers in
+        Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
+        res
+
+let take mvar =
+  match mvar.contents with
+    | Some v ->
+        begin match Lwt_sequence.take_opt_l mvar.writers with
+          | Some(v', w) ->
+              mvar.contents <- Some v';
+              Lwt.wakeup w ()
+          | None ->
+              mvar.contents <- None
+        end;
+        Lwt.return v
+    | None ->
+        let (res, w) = Lwt.task () in
+        let node = Lwt_sequence.add_r w mvar.readers in
+        Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
+        res
+
+let will_put_block mvar =
+  mvar.contents <> None
+
+let have_any_reader mvar =
+  not (Lwt_sequence.is_empty mvar.readers)

lwt_mq_parvel.mli

+(* -*- Mode: Caml; indent-tabs-mode: nil -*- *)
+(******************************************************************************)
+(* Lightweight thread library for Objective Caml
+ * http://www.ocsigen.org/lwt
+ * Interface Lwt_mon
+ ******************************************************************************
+ * Copyright (c) 2009, Metaweb Technologies, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY METAWEB TECHNOLOGIES ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL METAWEB TECHNOLOGIES BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+ * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************)
+
+(** Mailbox variables *)
+
+(** "Mailbox" variables implement a synchronising variable, used for
+    communication between concurrent threads.
+
+    This code adapted from
+    {{:http://eigenclass.org/hiki.rb?cmd=view&p=lightweight-threads-with-lwt}Comparing lightweight threads (eigenclass.org)} *)
+
+type 'a t
+  (** The type of a mailbox variable. Mailbox variables are used to
+      communicate values between threads in a synchronous way. The
+      type parameter specifies the type of the value propagated from
+      [put] to [take]. *)
+
+val create : 'a -> 'a t
+  (** [create v] creates a new mailbox variable containing value [v]. *)
+
+val create_empty : unit -> 'a t
+  (** [create ()] creates a new empty mailbox variable. *)
+
+val put : 'a t -> 'a -> unit Lwt.t
+  (** [put mvar value] puts a value into a mailbox variable. This
+      value will remain in the mailbox until [take] is called to
+      remove it. If the mailbox is not empty, the current thread will
+      block until it is emptied. *)
+
+val take : 'a t -> 'a Lwt.t
+  (** [take mvar] will take any currently available value from the
+      mailbox variable. If no value is currently available, the
+      current thread will block, awaiting a value to be [put] by
+      another thread. *)
+
+val will_put_block : 'a t -> bool
+  (** [will_put_block mvar] returns true if next [put mvar] will block
+      the current thread *)
+
+val have_any_reader : 'a t -> bool
+  (** [have_any_reader mvar] returns true if there exists any
+      thread waiting for the [mvar] *)