Source

parvel / lwt_mq_parvel.ml

(* -*- Mode: Caml; indent-tabs-mode: nil -*- *)
(******************************************************************************)
(* Lightweight thread library for Objective Caml
 * http://www.ocsigen.org/lwt
 * Interface Lwt_mon
 ******************************************************************************
 * Copyright (c) 2009, Metaweb Technologies, Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above
 *       copyright notice, this list of conditions and the following
 *       disclaimer in the documentation and/or other materials provided
 *       with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY METAWEB TECHNOLOGIES ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL METAWEB TECHNOLOGIES BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 ******************************************************************************)

(* Modified for "Parvel" project: http://bitbucket.org/gds/parvel/ *)

value default_block_limit = ref 1000;
value default_fail_limit = ref 1000;

value return_unit = Lwt.return ()
  and return_true = Lwt.return True
  and return_false = Lwt.return False
;

type t 'a = {
  contents : Queue.t 'a;
  (* Current contents *)

  writers : mutable Lwt_sequence.t ('a * Lwt.u bool);
  (* Threads waiting to put a value *)

  writers_count : mutable int;

  readers : mutable Lwt_sequence.t (Lwt.u 'a);
  (* Threads waiting for a value *)

  block_limit : mutable int;
  fail_limit : mutable int;
};

value set_block_limit mq n = mq.block_limit := n;
value set_fail_limit mq n = mq.fail_limit := n;

value ( >>= ) = Lwt.bind
;

value create () =
  { contents = Queue.create ();
    writers = Lwt_sequence.create ();
    writers_count = 0;
    readers = Lwt_sequence.create ();
    block_limit = default_block_limit.val;
    fail_limit = default_fail_limit.val;
  }
;

value writer_added mq =
  mq.writers_count := mq.writers_count + 1
;

value writer_deleted mq =
  mq.writers_count := mq.writers_count - 1
;

value put_direct mq v =
  Queue.push v mq.contents
;

value put mq v : Lwt.t bool =
  match Lwt_sequence.take_opt_l mq.readers with
  [ Some w ->
      ( Lwt.wakeup w v
      ; return_true
      )
  | None ->
      if Queue.length mq.contents < mq.block_limit
      then
        ( put_direct mq v
        ; return_true
        )
      else
        if mq.writers_count < mq.fail_limit
        then
          let (res, w) = Lwt.task () in
          let node = Lwt_sequence.add_r (v, w) mq.writers in
          ( writer_added mq
          ; Lwt.on_cancel
              res
              (fun _ ->
                 ( Lwt_sequence.remove node
                 ; writer_deleted mq
                 )
              )
          ; res
          )
        else
          return_false
  ]
;

value take mq =
  if Queue.is_empty mq.contents
  then
    let (res, w) = Lwt.task () in
    let node = Lwt_sequence.add_r w mq.readers in
    ( Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node)
    ; res
    )
  else
    let v = Queue.take mq.contents in
    ( match Lwt_sequence.take_opt_l mq.writers with
      [ Some (v', w) ->
          ( writer_deleted mq
          ; put_direct mq v'
          ; Lwt.wakeup w True
          )
      | None ->
          ()
      ]
    ; Lwt.return v
    )
;

(*
value will_put_block mq =
  mq.contents <> None
;
*)

value have_any_reader mq =
  not (Lwt_sequence.is_empty mq.readers)
;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.