Commits

Mike Bayer committed 49a99e8

renaming to dogpile.core so that we are doing a more traditional namespace
package setup

  • Participants
  • Parent commits 290f9b4

Comments (0)

Files changed (25)

+0.3.0
+=====
+
+- Renamed the project again - to dogpile.core.
+  Package has been reorganized so that "dogpile"
+  is a pure namespace package.  The base dogpile
+  features are now in "dogpile.core".
+
 0.2.2
 =====
 
-dogpile
-========
+dogpile.core
+============
+
 A "dogpile" lock, one which allows a single thread to generate
 an expensive resource while other threads use the "old" value, until the
 "new" value is ready.
 
 A simple example::
 
-    from dogpile import Dogpile
+    from dogpile.core import Dogpile
 
     # store a reference to a "resource", some 
     # object that is expensive to create.

File docs/build/api.rst

 API
 ===
 
-Dogpile
-========
+dogpile.core
+=============
 
-.. automodule:: dogpile.dogpile
+.. automodule:: dogpile.core
     :members:
 
 NameRegistry
 =============
 
-.. automodule:: dogpile.nameregistry
+.. automodule:: dogpile.core.nameregistry
     :members:
 
 Utilities
 ==========
 
-.. automodule:: dogpile.readwrite_lock
+.. automodule:: dogpile.core.readwrite_lock
     :members:
 
 

File docs/build/conf.py

 # absolute, like shown here.
 sys.path.insert(0, os.path.abspath('../../'))
 
-import dogpile
+import dogpile.core
 
 # -- General configuration -----------------------------------------------------
 
 master_doc = 'index'
 
 # General information about the project.
-project = u'Dogpile'
-copyright = u'2011, Mike Bayer'
+project = u'dogpile.core'
+copyright = u'2011, 2012 Mike Bayer'
 
 # The version info for the project you're documenting, acts as replacement for
 # |version| and |release|, also used in various other places throughout the
 # built documents.
 #
 # The short X.Y version.
-version = dogpile.__version__
+version = dogpile.core.__version__
 # The full version, including alpha/beta/rc tags.
-release = dogpile.__version__
+release = dogpile.core.__version__
 
 
 # The language for content autogenerated by Sphinx. Refer to documentation
 #html_file_suffix = ''
 
 # Output file base name for HTML help builder.
-htmlhelp_basename = 'dogpiledoc'
+htmlhelp_basename = 'dogpile.coredoc'
 
 
 # -- Options for LaTeX output --------------------------------------------------
 # Grouping the document tree into LaTeX files. List of tuples
 # (source start file, target name, title, author, documentclass [howto/manual]).
 latex_documents = [
-  ('index', 'dogpile.tex', u'Dogpile Documentation',
+  ('index', 'dogpile.core.tex', u'dogpile.core Documentation',
    u'Mike Bayer', 'manual'),
 ]
 

File docs/build/front.rst

 Front Matter
 ============
 
-Information about the Dogpile project.
+Information about the dogpile.core project.
 
 Project Homepage
 ================
 
-Dogpile is hosted on `Bitbucket <http://bitbucket.org>`_ - the lead project page is at https://bitbucket.org/zzzeek/dogpile.  Source
+dogpile.core is hosted on `Bitbucket <http://bitbucket.org>`_ - the lead project page is at https://bitbucket.org/zzzeek/dogpile.core.  Source
 code is tracked here using `Mercurial <http://mercurial.selenic.com/>`_.
 
-Releases and project status are available on Pypi at http://pypi.python.org/pypi/dogpile.
+Releases and project status are available on Pypi at http://pypi.python.org/pypi/dogpile.core.
 
-The most recent published version of this documentation should be at http://readthedocs.org/docs/dogpile/.
+The most recent published version of this documentation should be at http://dogpilecore.readthedocs.org.
 
 Installation
 ============
 
-Install released versions of Dogpile from the Python package index with `pip <http://pypi.python.org/pypi/pip>`_ or a similar tool::
+Install released versions of dogpile.core from the Python package index with `pip <http://pypi.python.org/pypi/pip>`_ or a similar tool::
 
-    pip install dogpile
+    pip install dogpile.core
 
 Installation via source distribution is via the ``setup.py`` script::
 
 Community
 =========
 
-Dogpile is developed by `Mike Bayer <http://techspot.zzzeek.org>`_, and is 
+dogpile.core is developed by `Mike Bayer <http://techspot.zzzeek.org>`_, and is 
 loosely associated with the `Pylons Project <http://www.pylonsproject.org/>`_.
-As Dogpile's usage increases, it is anticipated that the Pylons mailing list and IRC channel
+As usage of dogpile.core and dogpile.cache increases, it is anticipated that the Pylons mailing list and IRC channel
 will become the primary channels for support.
 
 Bugs
 ====
-Bugs and feature enhancements to Dogpile should be reported on the `Bitbucket
+Bugs and feature enhancements to dogpile.core should be reported on the `Bitbucket
 issue tracker
 <https://bitbucket.org/zzzeek/dogpile/issues?status=new&status=open>`_.  If you're not sure 
-that a particular issue is specific to either dogpile.cache or Dogpile, posting to the `dogpile.cache issue tracker <https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_
+that a particular issue is specific to either dogpile.cache or dogpile.core, posting to the `dogpile.cache issue tracker <https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_
 is likely the better place to post first, but it's not critical either way.
 
 * `dogpile.cache issue tracker <https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_ (post here if unsure)
-* `dogpile issue tracker <https://bitbucket.org/zzzeek/dogpile/issues?status=new&status=open>`_
+* `dogpile.core issue tracker <https://bitbucket.org/zzzeek/dogpile.core/issues?status=new&status=open>`_
 

File docs/build/index.rst

-===================================
-Welcome to Dogpile's documentation!
-===================================
+========================================
+Welcome to dogpile.core's documentation!
+========================================
 
-`Dogpile <http://bitbucket.org/zzzeek/dogpile>`_ provides the *dogpile* lock, 
+`dogpile.core <http://bitbucket.org/zzzeek/dogpile>`_ provides the *dogpile* lock, 
 one which allows a single thread or process to generate
 an expensive resource while other threads/processes use the "old" value, until the
 "new" value is ready.
 
-Dogpile is at the core of the `dogpile.cache <http://bitbucket.org/zzzeek/dogpile.cache>`_ package
+dogpile.core is at the core of the `dogpile.cache <http://bitbucket.org/zzzeek/dogpile.cache>`_ package
 which provides for a basic cache API and sample backends based on the dogpile concept.
 
 

File docs/build/usage.rst

 
 A simple example::
 
-    from dogpile import Dogpile
+    from dogpile.core import Dogpile
 
     # store a reference to a "resource", some 
     # object that is expensive to create.
 
 Example::
 
-    from dogpile import Dogpile, NeedRegenerationException
+    from dogpile.core import Dogpile, NeedRegenerationException
 
     def get_value_from_cache():
         value = my_cache.get("some key")
     import pylibmc
     mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
 
-    from dogpile import Dogpile, NeedRegenerationException
+    from dogpile.core import Dogpile, NeedRegenerationException
 
     def cached(key, expiration_time):
         """A decorator that will cache the return value of a function
 
     import pylibmc
     import time
-    from dogpile import Dogpile, NeedRegenerationException, NameRegistry
+    from dogpile.core import Dogpile, NeedRegenerationException, NameRegistry
 
     mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
 
 for the critical section where readers should be blocked::
 
 
-    from dogpile import SyncReaderDogpile
+    from dogpile.core import SyncReaderDogpile
 
     dogpile = SyncReaderDogpile(3600)
 

File dogpile/__init__.py

-from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
-
-__version__ = '0.2.2'
-

File dogpile/core/__init__.py

+from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
+
+__version__ = '0.3.0'
+

File dogpile/core/dogpile.py

+from util import threading
+import time
+import logging
+from readwrite_lock import ReadWriteMutex
+
+log = logging.getLogger(__name__)
+
+
+class NeedRegenerationException(Exception):
+    """An exception that when raised in the 'with' block, 
+    forces the 'has_value' flag to False and incurs a 
+    regeneration of the value.
+    
+    """
+
+NOT_REGENERATED = object()
+
+class Dogpile(object):
+    """Dogpile lock class.
+    
+    Provides an interface around an arbitrary mutex 
+    that allows one thread/process to be elected as 
+    the creator of a new value, while other threads/processes 
+    continue to return the previous version 
+    of that value.
+
+    :param expiretime: Expiration time in seconds.  Set to
+     ``None`` for never expires.
+    :param init: if True, set the 'createdtime' to the
+     current time.
+    :param lock: a mutex object that provides
+     ``acquire()`` and ``release()`` methods.
+        
+    """
+    def __init__(self, expiretime, init=False, lock=None):
+        """Construct a new :class:`.Dogpile`.
+
+        """
+        if lock:
+            self.dogpilelock = lock
+        else:
+            self.dogpilelock = threading.Lock()
+
+        self.expiretime = expiretime
+        if init:
+            self.createdtime = time.time()
+
+    createdtime = -1
+    """The last known 'creation time' of the value,
+    stored as an epoch (i.e. from ``time.time()``).
+
+    If the value here is -1, it is assumed the value
+    should recreate immediately.
+    
+    """
+
+    def acquire(self, creator, 
+                        value_fn=None, 
+                        value_and_created_fn=None):
+        """Acquire the lock, returning a context manager.
+        
+        :param creator: Creation function, used if this thread
+         is chosen to create a new value.
+         
+        :param value_fn: Optional function that returns
+         the value from some datasource.  Will be returned
+         if regeneration is not needed.
+
+        :param value_and_created_fn: Like value_fn, but returns a tuple
+         of (value, createdtime).  The returned createdtime 
+         will replace the "createdtime" value on this dogpile
+         lock.   This option removes the need for the dogpile lock
+         itself to remain persistent across usages; another 
+         dogpile can come along later and pick up where the
+         previous one left off.   
+         
+        """
+        dogpile = self
+
+        class Lock(object):
+            def __enter__(self):
+                return dogpile._enter(creator, value_fn, 
+                                    value_and_created_fn)
+
+            def __exit__(self, type, value, traceback):
+                dogpile._exit()
+        return Lock()
+
+    @property
+    def is_expired(self):
+        """Return true if the expiration time is reached, or no 
+        value is available."""
+
+        return not self.has_value or \
+            (
+                self.expiretime is not None and 
+                time.time() - self.createdtime > self.expiretime
+            )
+
+    @property
+    def has_value(self):
+        """Return true if the creation function has proceeded 
+        at least once."""
+        return self.createdtime > 0
+
+    def _enter(self, creator, value_fn=None, value_and_created_fn=None):
+        if value_and_created_fn:
+            value_fn = value_and_created_fn
+
+        if not value_fn:
+            return self._enter_create(creator)
+
+        try:
+            value = value_fn()
+            if value_and_created_fn:
+                value, self.createdtime = value
+        except NeedRegenerationException:
+            log.debug("NeedRegenerationException")
+            self.createdtime = -1
+            value = NOT_REGENERATED
+
+        generated = self._enter_create(creator)
+
+        if generated is not NOT_REGENERATED:
+            if value_and_created_fn:
+                generated, self.createdtime = generated
+            return generated
+        elif value is NOT_REGENERATED:
+            try:
+                if value_and_created_fn:
+                    value, self.createdtime = value_fn()
+                else:
+                    value = value_fn()
+                return value
+            except NeedRegenerationException:
+                raise Exception("Generation function should "
+                            "have just been called by a concurrent "
+                            "thread.")
+        else:
+            return value
+
+    def _enter_create(self, creator):
+
+        if not self.is_expired:
+            return NOT_REGENERATED
+
+        if self.has_value:
+            if not self.dogpilelock.acquire(False):
+                log.debug("creation function in progress "
+                            "elsewhere, returning")
+                return NOT_REGENERATED
+        else:
+            log.debug("no value, waiting for create lock")
+            self.dogpilelock.acquire()
+        try:
+            log.debug("value creation lock %r acquired" % self.dogpilelock)
+
+            # see if someone created the value already
+            if not self.is_expired:
+                log.debug("value already present")
+                return NOT_REGENERATED
+
+            log.debug("Calling creation function")
+            created = creator()
+            self.createdtime = time.time()
+            return created
+        finally:
+            self.dogpilelock.release()
+            log.debug("Released creation lock")
+
+    def _exit(self):
+        pass
+
+class SyncReaderDogpile(Dogpile):
+    """Provide a read-write lock function on top of the :class:`.Dogpile`
+    class.
+    
+    """
+    def __init__(self, *args, **kw):
+        super(SyncReaderDogpile, self).__init__(*args, **kw)
+        self.readwritelock = ReadWriteMutex()
+
+    def acquire_write_lock(self):
+        """Return the "write" lock context manager.
+        
+        This will provide a section that is mutexed against
+        all readers/writers for the dogpile-maintained value.
+        
+        """
+
+        dogpile = self
+        class Lock(object):
+            def __enter__(self):
+                dogpile.readwritelock.acquire_write_lock()
+            def __exit__(self, type, value, traceback):
+                dogpile.readwritelock.release_write_lock()
+        return Lock()
+
+
+    def _enter(self, *arg, **kw):
+        value = super(SyncReaderDogpile, self)._enter(*arg, **kw)
+        self.readwritelock.acquire_read_lock()
+        return value
+
+    def _exit(self):
+        self.readwritelock.release_read_lock()

File dogpile/core/nameregistry.py

+from util import threading
+import weakref
+
+class NameRegistry(object):
+    """Generates and return an object, keeping it as a
+    singleton for a certain identifier for as long as its 
+    strongly referenced.
+    
+    e.g.::
+    
+        class MyFoo(object):
+            "some important object."
+            def __init__(self, identifier):
+                self.identifier = identifier
+
+        registry = NameRegistry(MyFoo)
+
+        # thread 1:
+        my_foo = registry.get("foo1")
+        
+        # thread 2
+        my_foo = registry.get("foo1")
+    
+    Above, ``my_foo`` in both thread #1 and #2 will
+    be *the same object*.   The constructor for
+    ``MyFoo`` will be called once, passing the 
+    identifier ``foo1`` as the argument.
+    
+    When thread 1 and thread 2 both complete or 
+    otherwise delete references to ``my_foo``, the
+    object is *removed* from the :class:`.NameRegistry` as 
+    a result of Python garbage collection.
+    
+    :class:`.NameRegistry` is a utility object that
+    is used to maintain new :class:`.Dogpile` objects
+    against a certain key, for as long as that particular key
+    is referenced within the application.   An application
+    can deal with an arbitrary number of keys, ensuring that
+    all threads requesting a certain key use the same
+    :class:`.Dogpile` object, without the need to maintain
+    each :class:`.Dogpile` object persistently in memory.
+
+    :param creator: A function that will create a new
+     value, given the identifier passed to the :meth:`.NameRegistry.get`
+     method.
+
+    """
+    _locks = weakref.WeakValueDictionary()
+    _mutex = threading.RLock()
+
+    def __init__(self, creator):
+        """Create a new :class:`.NameRegistry`.
+        
+         
+        """
+        self._values = weakref.WeakValueDictionary()
+        self._mutex = threading.RLock()
+        self.creator = creator
+
+    def get(self, identifier, *args, **kw):
+        """Get and possibly create the value.
+        
+        :param identifier: Hash key for the value.
+         If the creation function is called, this identifier
+         will also be passed to the creation function.
+        :param \*args, \**kw: Additional arguments which will
+         also be passed to the creation function if it is 
+         called.
+        
+        """
+        try:
+            if identifier in self._values:
+                return self._values[identifier]
+            else:
+                return self._sync_get(identifier, *args, **kw)
+        except KeyError:
+            return self._sync_get(identifier, *args, **kw)
+
+    def _sync_get(self, identifier, *args, **kw):
+        self._mutex.acquire()
+        try:
+            try:
+                if identifier in self._values:
+                    return self._values[identifier]
+                else:
+                    self._values[identifier] = value = self.creator(identifier, *args, **kw)
+                    return value
+            except KeyError:
+                self._values[identifier] = value = self.creator(identifier, *args, **kw)
+                return value
+        finally:
+            self._mutex.release()

File dogpile/core/readwrite_lock.py

+from util import threading
+
+import logging
+log = logging.getLogger(__name__)
+
+class LockError(Exception):
+    pass
+
+class ReadWriteMutex(object):
+    """A mutex which allows multiple readers, single writer.
+    
+    :class:`.ReadWriteMutex` uses a Python ``threading.Condition``
+    to provide this functionality across threads within a process.
+    
+    The Beaker package also contained a file-lock based version
+    of this concept, so that readers/writers could be synchronized
+    across processes with a common filesystem.  A future Dogpile 
+    release may include this additional class at some point.
+    
+    """
+
+    def __init__(self):
+        # counts how many asynchronous methods are executing
+        self.async = 0
+
+        # pointer to thread that is the current sync operation
+        self.current_sync_operation = None
+
+        # condition object to lock on
+        self.condition = threading.Condition(threading.Lock())
+
+    def acquire_read_lock(self, wait = True):
+        """Acquire the 'read' lock."""
+        self.condition.acquire()
+        try:
+            # see if a synchronous operation is waiting to start
+            # or is already running, in which case we wait (or just
+            # give up and return)
+            if wait:
+                while self.current_sync_operation is not None:
+                    self.condition.wait()
+            else:
+                if self.current_sync_operation is not None:
+                    return False
+
+            self.async += 1
+            log.debug("%s acquired read lock", self)
+        finally:
+            self.condition.release()
+
+        if not wait: 
+            return True
+
+    def release_read_lock(self):
+        """Release the 'read' lock."""
+        self.condition.acquire()
+        try:
+            self.async -= 1
+
+            # check if we are the last asynchronous reader thread 
+            # out the door.
+            if self.async == 0:
+                # yes. so if a sync operation is waiting, notifyAll to wake
+                # it up
+                if self.current_sync_operation is not None:
+                    self.condition.notifyAll()
+            elif self.async < 0:
+                raise LockError("Synchronizer error - too many "
+                                "release_read_locks called")
+            log.debug("%s released read lock", self)
+        finally:
+            self.condition.release()
+
+    def acquire_write_lock(self, wait = True):
+        """Acquire the 'write' lock."""
+        self.condition.acquire()
+        try:
+            # here, we are not a synchronous reader, and after returning,
+            # assuming waiting or immediate availability, we will be.
+
+            if wait:
+                # if another sync is working, wait
+                while self.current_sync_operation is not None:
+                    self.condition.wait()
+            else:
+                # if another sync is working,
+                # we dont want to wait, so forget it
+                if self.current_sync_operation is not None:
+                    return False
+
+            # establish ourselves as the current sync 
+            # this indicates to other read/write operations
+            # that they should wait until this is None again
+            self.current_sync_operation = threading.currentThread()
+
+            # now wait again for asyncs to finish
+            if self.async > 0:
+                if wait:
+                    # wait
+                    self.condition.wait()
+                else:
+                    # we dont want to wait, so forget it
+                    self.current_sync_operation = None
+                    return False
+            log.debug("%s acquired write lock", self)
+        finally:
+            self.condition.release()
+
+        if not wait: 
+            return True
+
+    def release_write_lock(self):
+        """Release the 'write' lock."""
+        self.condition.acquire()
+        try:
+            if self.current_sync_operation is not threading.currentThread():
+                raise LockError("Synchronizer error - current thread doesn't "
+                                "have the write lock")
+
+            # reset the current sync operation so 
+            # another can get it
+            self.current_sync_operation = None
+
+            # tell everyone to get ready
+            self.condition.notifyAll()
+
+            log.debug("%s released write lock", self)
+        finally:
+            # everyone go !!
+            self.condition.release()

File dogpile/core/util.py

+try:
+    import threading
+    import thread
+except ImportError:
+    import dummy_threading as threading
+    import dummy_thread as thread
+

File dogpile/dogpile.py

-from util import threading
-import time
-import logging
-from readwrite_lock import ReadWriteMutex
-
-log = logging.getLogger(__name__)
-
-
-class NeedRegenerationException(Exception):
-    """An exception that when raised in the 'with' block, 
-    forces the 'has_value' flag to False and incurs a 
-    regeneration of the value.
-    
-    """
-
-NOT_REGENERATED = object()
-
-class Dogpile(object):
-    """Dogpile lock class.
-    
-    Provides an interface around an arbitrary mutex 
-    that allows one thread/process to be elected as 
-    the creator of a new value, while other threads/processes 
-    continue to return the previous version 
-    of that value.
-
-    :param expiretime: Expiration time in seconds.  Set to
-     ``None`` for never expires.
-    :param init: if True, set the 'createdtime' to the
-     current time.
-    :param lock: a mutex object that provides
-     ``acquire()`` and ``release()`` methods.
-        
-    """
-    def __init__(self, expiretime, init=False, lock=None):
-        """Construct a new :class:`.Dogpile`.
-
-        """
-        if lock:
-            self.dogpilelock = lock
-        else:
-            self.dogpilelock = threading.Lock()
-
-        self.expiretime = expiretime
-        if init:
-            self.createdtime = time.time()
-
-    createdtime = -1
-    """The last known 'creation time' of the value,
-    stored as an epoch (i.e. from ``time.time()``).
-
-    If the value here is -1, it is assumed the value
-    should recreate immediately.
-    
-    """
-
-    def acquire(self, creator, 
-                        value_fn=None, 
-                        value_and_created_fn=None):
-        """Acquire the lock, returning a context manager.
-        
-        :param creator: Creation function, used if this thread
-         is chosen to create a new value.
-         
-        :param value_fn: Optional function that returns
-         the value from some datasource.  Will be returned
-         if regeneration is not needed.
-
-        :param value_and_created_fn: Like value_fn, but returns a tuple
-         of (value, createdtime).  The returned createdtime 
-         will replace the "createdtime" value on this dogpile
-         lock.   This option removes the need for the dogpile lock
-         itself to remain persistent across usages; another 
-         dogpile can come along later and pick up where the
-         previous one left off.   
-         
-        """
-        dogpile = self
-
-        class Lock(object):
-            def __enter__(self):
-                return dogpile._enter(creator, value_fn, 
-                                    value_and_created_fn)
-
-            def __exit__(self, type, value, traceback):
-                dogpile._exit()
-        return Lock()
-
-    @property
-    def is_expired(self):
-        """Return true if the expiration time is reached, or no 
-        value is available."""
-
-        return not self.has_value or \
-            (
-                self.expiretime is not None and 
-                time.time() - self.createdtime > self.expiretime
-            )
-
-    @property
-    def has_value(self):
-        """Return true if the creation function has proceeded 
-        at least once."""
-        return self.createdtime > 0
-
-    def _enter(self, creator, value_fn=None, value_and_created_fn=None):
-        if value_and_created_fn:
-            value_fn = value_and_created_fn
-
-        if not value_fn:
-            return self._enter_create(creator)
-
-        try:
-            value = value_fn()
-            if value_and_created_fn:
-                value, self.createdtime = value
-        except NeedRegenerationException:
-            log.debug("NeedRegenerationException")
-            self.createdtime = -1
-            value = NOT_REGENERATED
-
-        generated = self._enter_create(creator)
-
-        if generated is not NOT_REGENERATED:
-            if value_and_created_fn:
-                generated, self.createdtime = generated
-            return generated
-        elif value is NOT_REGENERATED:
-            try:
-                if value_and_created_fn:
-                    value, self.createdtime = value_fn()
-                else:
-                    value = value_fn()
-                return value
-            except NeedRegenerationException:
-                raise Exception("Generation function should "
-                            "have just been called by a concurrent "
-                            "thread.")
-        else:
-            return value
-
-    def _enter_create(self, creator):
-
-        if not self.is_expired:
-            return NOT_REGENERATED
-
-        if self.has_value:
-            if not self.dogpilelock.acquire(False):
-                log.debug("creation function in progress "
-                            "elsewhere, returning")
-                return NOT_REGENERATED
-        else:
-            log.debug("no value, waiting for create lock")
-            self.dogpilelock.acquire()
-        try:
-            log.debug("value creation lock %r acquired" % self.dogpilelock)
-
-            # see if someone created the value already
-            if not self.is_expired:
-                log.debug("value already present")
-                return NOT_REGENERATED
-
-            log.debug("Calling creation function")
-            created = creator()
-            self.createdtime = time.time()
-            return created
-        finally:
-            self.dogpilelock.release()
-            log.debug("Released creation lock")
-
-    def _exit(self):
-        pass
-
-class SyncReaderDogpile(Dogpile):
-    """Provide a read-write lock function on top of the :class:`.Dogpile`
-    class.
-    
-    """
-    def __init__(self, *args, **kw):
-        super(SyncReaderDogpile, self).__init__(*args, **kw)
-        self.readwritelock = ReadWriteMutex()
-
-    def acquire_write_lock(self):
-        """Return the "write" lock context manager.
-        
-        This will provide a section that is mutexed against
-        all readers/writers for the dogpile-maintained value.
-        
-        """
-
-        dogpile = self
-        class Lock(object):
-            def __enter__(self):
-                dogpile.readwritelock.acquire_write_lock()
-            def __exit__(self, type, value, traceback):
-                dogpile.readwritelock.release_write_lock()
-        return Lock()
-
-
-    def _enter(self, *arg, **kw):
-        value = super(SyncReaderDogpile, self)._enter(*arg, **kw)
-        self.readwritelock.acquire_read_lock()
-        return value
-
-    def _exit(self):
-        self.readwritelock.release_read_lock()

File dogpile/nameregistry.py

-from util import threading
-import weakref
-
-class NameRegistry(object):
-    """Generates and return an object, keeping it as a
-    singleton for a certain identifier for as long as its 
-    strongly referenced.
-    
-    e.g.::
-    
-        class MyFoo(object):
-            "some important object."
-            def __init__(self, identifier):
-                self.identifier = identifier
-
-        registry = NameRegistry(MyFoo)
-
-        # thread 1:
-        my_foo = registry.get("foo1")
-        
-        # thread 2
-        my_foo = registry.get("foo1")
-    
-    Above, ``my_foo`` in both thread #1 and #2 will
-    be *the same object*.   The constructor for
-    ``MyFoo`` will be called once, passing the 
-    identifier ``foo1`` as the argument.
-    
-    When thread 1 and thread 2 both complete or 
-    otherwise delete references to ``my_foo``, the
-    object is *removed* from the :class:`.NameRegistry` as 
-    a result of Python garbage collection.
-    
-    :class:`.NameRegistry` is a utility object that
-    is used to maintain new :class:`.Dogpile` objects
-    against a certain key, for as long as that particular key
-    is referenced within the application.   An application
-    can deal with an arbitrary number of keys, ensuring that
-    all threads requesting a certain key use the same
-    :class:`.Dogpile` object, without the need to maintain
-    each :class:`.Dogpile` object persistently in memory.
-
-    :param creator: A function that will create a new
-     value, given the identifier passed to the :meth:`.NameRegistry.get`
-     method.
-
-    """
-    _locks = weakref.WeakValueDictionary()
-    _mutex = threading.RLock()
-
-    def __init__(self, creator):
-        """Create a new :class:`.NameRegistry`.
-        
-         
-        """
-        self._values = weakref.WeakValueDictionary()
-        self._mutex = threading.RLock()
-        self.creator = creator
-
-    def get(self, identifier, *args, **kw):
-        """Get and possibly create the value.
-        
-        :param identifier: Hash key for the value.
-         If the creation function is called, this identifier
-         will also be passed to the creation function.
-        :param \*args, \**kw: Additional arguments which will
-         also be passed to the creation function if it is 
-         called.
-        
-        """
-        try:
-            if identifier in self._values:
-                return self._values[identifier]
-            else:
-                return self._sync_get(identifier, *args, **kw)
-        except KeyError:
-            return self._sync_get(identifier, *args, **kw)
-
-    def _sync_get(self, identifier, *args, **kw):
-        self._mutex.acquire()
-        try:
-            try:
-                if identifier in self._values:
-                    return self._values[identifier]
-                else:
-                    self._values[identifier] = value = self.creator(identifier, *args, **kw)
-                    return value
-            except KeyError:
-                self._values[identifier] = value = self.creator(identifier, *args, **kw)
-                return value
-        finally:
-            self._mutex.release()

File dogpile/readwrite_lock.py

-from util import threading
-
-import logging
-log = logging.getLogger(__name__)
-
-class LockError(Exception):
-    pass
-
-class ReadWriteMutex(object):
-    """A mutex which allows multiple readers, single writer.
-    
-    :class:`.ReadWriteMutex` uses a Python ``threading.Condition``
-    to provide this functionality across threads within a process.
-    
-    The Beaker package also contained a file-lock based version
-    of this concept, so that readers/writers could be synchronized
-    across processes with a common filesystem.  A future Dogpile 
-    release may include this additional class at some point.
-    
-    """
-
-    def __init__(self):
-        # counts how many asynchronous methods are executing
-        self.async = 0
-
-        # pointer to thread that is the current sync operation
-        self.current_sync_operation = None
-
-        # condition object to lock on
-        self.condition = threading.Condition(threading.Lock())
-
-    def acquire_read_lock(self, wait = True):
-        """Acquire the 'read' lock."""
-        self.condition.acquire()
-        try:
-            # see if a synchronous operation is waiting to start
-            # or is already running, in which case we wait (or just
-            # give up and return)
-            if wait:
-                while self.current_sync_operation is not None:
-                    self.condition.wait()
-            else:
-                if self.current_sync_operation is not None:
-                    return False
-
-            self.async += 1
-            log.debug("%s acquired read lock", self)
-        finally:
-            self.condition.release()
-
-        if not wait: 
-            return True
-
-    def release_read_lock(self):
-        """Release the 'read' lock."""
-        self.condition.acquire()
-        try:
-            self.async -= 1
-
-            # check if we are the last asynchronous reader thread 
-            # out the door.
-            if self.async == 0:
-                # yes. so if a sync operation is waiting, notifyAll to wake
-                # it up
-                if self.current_sync_operation is not None:
-                    self.condition.notifyAll()
-            elif self.async < 0:
-                raise LockError("Synchronizer error - too many "
-                                "release_read_locks called")
-            log.debug("%s released read lock", self)
-        finally:
-            self.condition.release()
-
-    def acquire_write_lock(self, wait = True):
-        """Acquire the 'write' lock."""
-        self.condition.acquire()
-        try:
-            # here, we are not a synchronous reader, and after returning,
-            # assuming waiting or immediate availability, we will be.
-
-            if wait:
-                # if another sync is working, wait
-                while self.current_sync_operation is not None:
-                    self.condition.wait()
-            else:
-                # if another sync is working,
-                # we dont want to wait, so forget it
-                if self.current_sync_operation is not None:
-                    return False
-
-            # establish ourselves as the current sync 
-            # this indicates to other read/write operations
-            # that they should wait until this is None again
-            self.current_sync_operation = threading.currentThread()
-
-            # now wait again for asyncs to finish
-            if self.async > 0:
-                if wait:
-                    # wait
-                    self.condition.wait()
-                else:
-                    # we dont want to wait, so forget it
-                    self.current_sync_operation = None
-                    return False
-            log.debug("%s acquired write lock", self)
-        finally:
-            self.condition.release()
-
-        if not wait: 
-            return True
-
-    def release_write_lock(self):
-        """Release the 'write' lock."""
-        self.condition.acquire()
-        try:
-            if self.current_sync_operation is not threading.currentThread():
-                raise LockError("Synchronizer error - current thread doesn't "
-                                "have the write lock")
-
-            # reset the current sync operation so 
-            # another can get it
-            self.current_sync_operation = None
-
-            # tell everyone to get ready
-            self.condition.notifyAll()
-
-            log.debug("%s released write lock", self)
-        finally:
-            # everyone go !!
-            self.condition.release()

File dogpile/util.py

-try:
-    import threading
-    import thread
-except ImportError:
-    import dummy_threading as threading
-    import dummy_thread as thread
-

File nose_logging_config.ini

 # nose specific logging
 [loggers]
-keys = root, dogpile, tests
+keys = root, dogpilecore, tests
 
 [handlers]
 keys = console
 level = CRITICAL
 handlers = console
 
-[logger_dogpile]
+[logger_dogpilecore]
 level = DEBUG
-qualname = dogpile
+qualname = dogpile.core
 handlers =
 
 [logger_tests]
         use_2to3=True,
     )
 
-v = open(os.path.join(os.path.dirname(__file__), 'dogpile', '__init__.py'))
+v = open(os.path.join(os.path.dirname(__file__), 'dogpile', 'core', '__init__.py'))
 VERSION = re.compile(r".*__version__ = '(.*?)'", re.S).match(v.read()).group(1)
 v.close()
 
 readme = os.path.join(os.path.dirname(__file__), 'README.rst')
 
-setup(name='dogpile',
+setup(name='dogpile.core',
       version=VERSION,
       description="A 'dogpile' lock, typically used as a component of a larger caching solution",
-      long_description=file(readme).read(),
+      long_description=open(readme).read(),
       classifiers=[
       'Development Status :: 3 - Alpha',
       'Intended Audience :: Developers',
       keywords='caching',
       author='Mike Bayer',
       author_email='mike_mp@zzzcomputing.com',
-      url='http://bitbucket.org/zzzeek/dogpile',
+      url='http://bitbucket.org/zzzeek/dogpile.core',
       license='BSD',
       packages=find_packages(exclude=['ez_setup', 'tests']),
+      namespace_packages=['dogpile'],
       zip_safe=False,
       install_requires=[],
       test_suite='nose.collector',

File tests/__init__.py

Empty file removed.

File tests/core/__init__.py

Empty file added.

File tests/core/test_dogpile.py

+from unittest import TestCase
+import time
+import threading
+from dogpile.core import Dogpile, SyncReaderDogpile, NeedRegenerationException
+from dogpile.core.nameregistry import NameRegistry
+import contextlib
+import math
+import logging
+log = logging.getLogger(__name__)
+
+class ConcurrencyTest(TestCase):
+    # expiretime, time to create, num usages, time spend using, delay btw usage
+    timings = [
+        # quick one
+        (2, .5, 50, .05, .1),
+
+        # slow creation time
+        (5, 2, 50, .1, .1),
+
+    ]
+
+    _assertion_lock = threading.Lock()
+
+    def test_rudimental(self):
+        for exp, crt, nu, ut, dt in self.timings:
+            self._test_multi(
+                10, exp, crt, nu, ut, dt,
+            )
+
+    def test_rudimental_slow_write(self):
+        self._test_multi(
+            10, 2, .5, 50, .05, .1,
+            slow_write_time=2
+        )
+
+    def test_return_while_in_progress(self):
+        self._test_multi(
+            10, 5, 2, 50, 1, .1,
+            inline_create='get_value'
+        )
+
+    def test_rudimental_long_create(self):
+        self._test_multi(
+            10, 2, 2.5, 50, .05, .1,
+        )
+
+    def test_get_value_plus_created_slow_write(self):
+        self._test_multi(
+            10, 2, .5, 50, .05, .1,
+            inline_create='get_value_plus_created',
+            slow_write_time=2
+        )
+
+    def test_get_value_plus_created_long_create(self):
+        self._test_multi(
+            10, 2, 2.5, 50, .05, .1,
+            inline_create='get_value_plus_created',
+        )
+
+    def test_get_value_plus_created_registry_unsafe_cache(self):
+        self._test_multi(
+            10, 1, .6, 100, .05, .1,
+            inline_create='get_value_plus_created',
+            cache_expire_time='unsafe'
+        )
+
+    def test_get_value_plus_created_registry_safe_cache(self):
+        for exp, crt, nu, ut, dt in self.timings:
+            self._test_multi(
+                10, exp, crt, nu, ut, dt,
+                inline_create='get_value_plus_created',
+                cache_expire_time='safe'
+            )
+
+    def _assert_synchronized(self):
+        acq = self._assertion_lock.acquire(False)
+        assert acq, "Could not acquire"
+
+        @contextlib.contextmanager
+        def go():
+            try:
+                yield {}
+            except:
+                raise
+            finally:
+                self._assertion_lock.release()
+        return go()
+
+    def _assert_log(self, cond, msg, *args):
+        if cond:
+            log.debug(msg, *args)
+        else:
+            log.error("Assertion failed: " + msg, *args)
+            assert False, msg % args
+
+    def _test_multi(self, num_threads, 
+                            expiretime, 
+                            creation_time,
+                            num_usages, 
+                            usage_time, 
+                            delay_time,
+                            cache_expire_time=None,
+                            slow_write_time=None,
+                            inline_create='rudimental'):
+
+        if slow_write_time:
+            dogpile_cls = SyncReaderDogpile
+        else:
+            dogpile_cls = Dogpile
+
+        # the registry feature should not be used
+        # unless the value + created time func is used.
+        use_registry = inline_create == 'get_value_plus_created'
+
+        if use_registry:
+            reg = NameRegistry(dogpile_cls)
+            get_dogpile = lambda: reg.get(expiretime)
+        else:
+            dogpile = dogpile_cls(expiretime)
+            get_dogpile = lambda: dogpile
+
+        unsafe_cache = False
+        if cache_expire_time:
+            if cache_expire_time == 'unsafe':
+                unsafe_cache = True
+                cache_expire_time = expiretime *.8
+            elif cache_expire_time == 'safe':
+                cache_expire_time = (expiretime + creation_time) * 1.1
+            else:
+                assert False, cache_expire_time
+
+            log.info("Cache expire time: %s", cache_expire_time)
+
+            effective_expiretime = min(cache_expire_time, expiretime)
+        else:
+            effective_expiretime = expiretime
+
+        effective_creation_time= creation_time
+        if slow_write_time:
+            effective_creation_time += slow_write_time
+
+        max_stale = (effective_expiretime + effective_creation_time + 
+                        usage_time + delay_time) * 1.1
+
+        the_resource = []
+        slow_waiters = [0]
+        failures = [0]
+
+        def create_impl(dogpile):
+            log.debug("creating resource...")
+            time.sleep(creation_time)
+
+            if slow_write_time:
+                with dogpile.acquire_write_lock():
+                    saved = list(the_resource)
+                    # clear out the resource dict so that
+                    # usage threads hitting it will
+                    # raise
+                    the_resource[:] = []
+                    time.sleep(slow_write_time)
+                    the_resource[:] = saved
+            the_resource.append(time.time())
+            return the_resource[-1]
+
+        if inline_create == 'get_value_plus_created':
+            def create_resource(dogpile):
+                with self._assert_synchronized():
+                    value = create_impl(dogpile)
+                    return value, time.time()
+        else:
+            def create_resource(dogpile):
+                with self._assert_synchronized():
+                    return create_impl(dogpile)
+
+        if cache_expire_time:
+            def get_value():
+                if not the_resource:
+                    raise NeedRegenerationException()
+                if time.time() - the_resource[-1] > cache_expire_time:
+                    # should never hit a cache invalidation 
+                    # if we've set expiretime below the cache 
+                    # expire time (assuming a cache which
+                    # honors this).
+                    self._assert_log(
+                        cache_expire_time < expiretime,
+                        "Cache expiration hit, cache "
+                        "expire time %s, expiretime %s",
+                        cache_expire_time,
+                        expiretime,
+                    )
+
+                    raise NeedRegenerationException()
+
+                if inline_create == 'get_value_plus_created':
+                    return the_resource[-1], the_resource[-1]
+                else:
+                    return the_resource[-1]
+        else:
+            def get_value():
+                if not the_resource:
+                    raise NeedRegenerationException()
+                if inline_create == 'get_value_plus_created':
+                    return the_resource[-1], the_resource[-1]
+                else:
+                    return the_resource[-1]
+
+        if inline_create == 'rudimental':
+            assert not cache_expire_time
+
+            @contextlib.contextmanager
+            def enter_dogpile_block(dogpile):
+                with dogpile.acquire(lambda: create_resource(dogpile)) as x:
+                    yield the_resource[-1]
+        elif inline_create == 'get_value':
+            @contextlib.contextmanager
+            def enter_dogpile_block(dogpile):
+                with dogpile.acquire(
+                        lambda: create_resource(dogpile), 
+                        get_value
+                    ) as rec:
+                    yield rec
+        elif inline_create == 'get_value_plus_created':
+            @contextlib.contextmanager
+            def enter_dogpile_block(dogpile):
+                with dogpile.acquire(
+                        lambda: create_resource(dogpile), 
+                        value_and_created_fn=get_value
+                    ) as rec:
+                    yield rec
+        else:
+            assert False, inline_create
+
+
+        def use_dogpile():
+            try:
+                for i in range(num_usages):
+                    dogpile = get_dogpile()
+                    now = time.time()
+                    with enter_dogpile_block(dogpile) as value:
+                        waited = time.time() - now
+                        if waited > .01:
+                            slow_waiters[0] += 1
+                        check_value(value, waited)
+                        time.sleep(usage_time)
+                    time.sleep(delay_time)
+            except:
+                log.error("thread failed", exc_info=True)
+                failures[0] += 1
+
+        def check_value(value, waited):
+            assert value
+
+            # time since the current resource was
+            # created
+            time_since_create = time.time() - value
+
+            self._assert_log(
+                time_since_create < max_stale,
+                "Time since create %.4f max stale time %s, "
+                    "total waited %s",
+                time_since_create, max_stale, 
+                slow_waiters[0]
+            )
+
+        started_at = time.time()
+        threads = []
+        for i in range(num_threads):
+            t = threading.Thread(target=use_dogpile)
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()
+        actual_run_time = time.time() - started_at
+
+        # time spent starts with num usages * time per usage, with a 10% fudge
+        expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
+
+        expected_generations = math.ceil(expected_run_time / effective_expiretime)
+
+        if unsafe_cache:
+            expected_slow_waiters = expected_generations * num_threads
+        else:
+            expected_slow_waiters = expected_generations + num_threads - 1
+
+        if slow_write_time:
+            expected_slow_waiters = num_threads * expected_generations
+
+        # time spent also increments by one wait period in the beginning...
+        expected_run_time += effective_creation_time
+
+        # and a fudged version of the periodic waiting time anticipated
+        # for a single thread...
+        expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads
+        expected_run_time *= 1.1
+
+        log.info("Test Summary")
+        log.info("num threads: %s; expiretime: %s; creation_time: %s; "
+                "num_usages: %s; "
+                "usage_time: %s; delay_time: %s", 
+            num_threads, expiretime, creation_time, num_usages, 
+            usage_time, delay_time
+        )
+        log.info("cache expire time: %s; unsafe cache: %s slow "
+                "write time: %s; inline: %s; registry: %s", 
+            cache_expire_time, unsafe_cache, slow_write_time, 
+            inline_create, use_registry)
+        log.info("Estimated run time %.2f actual run time %.2f", 
+                    expected_run_time, actual_run_time)
+        log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", 
+                    effective_expiretime)
+        log.info("Expected slow waits %s, Total slow waits %s", 
+                    expected_slow_waiters, slow_waiters[0])
+        log.info("Total generations %s Max generations expected %s" % (
+            len(the_resource), expected_generations
+        ))
+
+        assert not failures[0], "%s failures occurred" % failures[0]
+        assert actual_run_time <= expected_run_time
+
+        assert slow_waiters[0] <= expected_slow_waiters, \
+            "Number of slow waiters %s exceeds expected slow waiters %s" % (
+                slow_waiters[0],
+                expected_slow_waiters
+            )
+        assert len(the_resource) <= expected_generations,\
+            "Number of resource generations %d exceeded "\
+            "expected %d" % (len(the_resource), 
+                expected_generations)
+
+class DogpileTest(TestCase):
+    def test_single_create(self):
+        dogpile = Dogpile(2)
+        the_resource = [0]
+
+        def create_resource():
+            the_resource[0] += 1
+
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 1
+
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 1
+
+        time.sleep(2)
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 2
+
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 2
+
+    def test_no_expiration(self):
+        dogpile = Dogpile(None)
+        the_resource = [0]
+
+        def create_resource():
+            the_resource[0] += 1
+
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 1
+
+        with dogpile.acquire(create_resource):
+            assert the_resource[0] == 1
+

File tests/core/test_nameregistry.py

+from unittest import TestCase
+import time
+import threading
+from dogpile.core.nameregistry import NameRegistry
+import random
+
+import logging
+log = logging.getLogger(__name__)
+
+class NameRegistryTest(TestCase):
+
+    def test_name_registry(self):
+        success = [True]
+        num_operations = [0]
+
+        def create(identifier):
+            log.debug("Creator running for id: " + identifier)
+            return threading.Lock()
+
+        registry = NameRegistry(create)
+
+        baton = {
+            "beans":False,
+            "means":False,
+            "please":False
+        }
+
+        def do_something(name):
+            for iteration in xrange(20):
+                name = baton.keys()[random.randint(0, 2)]
+                lock = registry.get(name)
+                lock.acquire()
+                try:
+                    if baton[name]:
+                        success[0] = False
+                        log.debug("Baton is already populated")
+                        break
+                    baton[name] = True
+                    try:
+                        time.sleep(random.random() * .01)
+                    finally:
+                        num_operations[0] += 1
+                        baton[name] = False
+                finally:
+                    lock.release()
+            log.debug("thread completed operations")
+
+        threads = []
+        for id_ in range(1, 20):
+            t = threading.Thread(target=do_something, args=("somename",))
+            t.start()
+            threads.append(t)
+
+        for t in threads:
+            t.join()
+
+        assert success[0]
+

File tests/test_dogpile.py

-from unittest import TestCase
-import time
-import threading
-from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
-from dogpile.nameregistry import NameRegistry
-import contextlib
-import math
-import logging
-log = logging.getLogger(__name__)
-
-class ConcurrencyTest(TestCase):
-    # expiretime, time to create, num usages, time spend using, delay btw usage
-    timings = [
-        # quick one
-        (2, .5, 50, .05, .1),
-
-        # slow creation time
-        (5, 2, 50, .1, .1),
-
-    ]
-
-    _assertion_lock = threading.Lock()
-
-    def test_rudimental(self):
-        for exp, crt, nu, ut, dt in self.timings:
-            self._test_multi(
-                10, exp, crt, nu, ut, dt,
-            )
-
-    def test_rudimental_slow_write(self):
-        self._test_multi(
-            10, 2, .5, 50, .05, .1,
-            slow_write_time=2
-        )
-
-    def test_return_while_in_progress(self):
-        self._test_multi(
-            10, 5, 2, 50, 1, .1,
-            inline_create='get_value'
-        )
-
-    def test_rudimental_long_create(self):
-        self._test_multi(
-            10, 2, 2.5, 50, .05, .1,
-        )
-
-    def test_get_value_plus_created_slow_write(self):
-        self._test_multi(
-            10, 2, .5, 50, .05, .1,
-            inline_create='get_value_plus_created',
-            slow_write_time=2
-        )
-
-    def test_get_value_plus_created_long_create(self):
-        self._test_multi(
-            10, 2, 2.5, 50, .05, .1,
-            inline_create='get_value_plus_created',
-        )
-
-    def test_get_value_plus_created_registry_unsafe_cache(self):
-        self._test_multi(
-            10, 1, .6, 100, .05, .1,
-            inline_create='get_value_plus_created',
-            cache_expire_time='unsafe'
-        )
-
-    def test_get_value_plus_created_registry_safe_cache(self):
-        for exp, crt, nu, ut, dt in self.timings:
-            self._test_multi(
-                10, exp, crt, nu, ut, dt,
-                inline_create='get_value_plus_created',
-                cache_expire_time='safe'
-            )
-
-    def _assert_synchronized(self):
-        acq = self._assertion_lock.acquire(False)
-        assert acq, "Could not acquire"
-
-        @contextlib.contextmanager
-        def go():
-            try:
-                yield {}
-            except:
-                raise
-            finally:
-                self._assertion_lock.release()
-        return go()
-
-    def _assert_log(self, cond, msg, *args):
-        if cond:
-            log.debug(msg, *args)
-        else:
-            log.error("Assertion failed: " + msg, *args)
-            assert False, msg % args
-
-    def _test_multi(self, num_threads, 
-                            expiretime, 
-                            creation_time,
-                            num_usages, 
-                            usage_time, 
-                            delay_time,
-                            cache_expire_time=None,
-                            slow_write_time=None,
-                            inline_create='rudimental'):
-
-        if slow_write_time:
-            dogpile_cls = SyncReaderDogpile
-        else:
-            dogpile_cls = Dogpile
-
-        # the registry feature should not be used
-        # unless the value + created time func is used.
-        use_registry = inline_create == 'get_value_plus_created'
-
-        if use_registry:
-            reg = NameRegistry(dogpile_cls)
-            get_dogpile = lambda: reg.get(expiretime)
-        else:
-            dogpile = dogpile_cls(expiretime)
-            get_dogpile = lambda: dogpile
-
-        unsafe_cache = False
-        if cache_expire_time:
-            if cache_expire_time == 'unsafe':
-                unsafe_cache = True
-                cache_expire_time = expiretime *.8
-            elif cache_expire_time == 'safe':
-                cache_expire_time = (expiretime + creation_time) * 1.1
-            else:
-                assert False, cache_expire_time
-
-            log.info("Cache expire time: %s", cache_expire_time)
-
-            effective_expiretime = min(cache_expire_time, expiretime)
-        else:
-            effective_expiretime = expiretime
-
-        effective_creation_time= creation_time
-        if slow_write_time:
-            effective_creation_time += slow_write_time
-
-        max_stale = (effective_expiretime + effective_creation_time + 
-                        usage_time + delay_time) * 1.1
-
-        the_resource = []
-        slow_waiters = [0]
-        failures = [0]
-
-        def create_impl(dogpile):
-            log.debug("creating resource...")
-            time.sleep(creation_time)
-
-            if slow_write_time:
-                with dogpile.acquire_write_lock():
-                    saved = list(the_resource)
-                    # clear out the resource dict so that
-                    # usage threads hitting it will
-                    # raise
-                    the_resource[:] = []
-                    time.sleep(slow_write_time)
-                    the_resource[:] = saved
-            the_resource.append(time.time())
-            return the_resource[-1]
-
-        if inline_create == 'get_value_plus_created':
-            def create_resource(dogpile):
-                with self._assert_synchronized():
-                    value = create_impl(dogpile)
-                    return value, time.time()
-        else:
-            def create_resource(dogpile):
-                with self._assert_synchronized():
-                    return create_impl(dogpile)
-
-        if cache_expire_time:
-            def get_value():
-                if not the_resource:
-                    raise NeedRegenerationException()
-                if time.time() - the_resource[-1] > cache_expire_time:
-                    # should never hit a cache invalidation 
-                    # if we've set expiretime below the cache 
-                    # expire time (assuming a cache which
-                    # honors this).
-                    self._assert_log(
-                        cache_expire_time < expiretime,
-                        "Cache expiration hit, cache "
-                        "expire time %s, expiretime %s",
-                        cache_expire_time,
-                        expiretime,
-                    )
-
-                    raise NeedRegenerationException()
-
-                if inline_create == 'get_value_plus_created':
-                    return the_resource[-1], the_resource[-1]
-                else:
-                    return the_resource[-1]
-        else:
-            def get_value():
-                if not the_resource:
-                    raise NeedRegenerationException()
-                if inline_create == 'get_value_plus_created':
-                    return the_resource[-1], the_resource[-1]
-                else:
-                    return the_resource[-1]
-
-        if inline_create == 'rudimental':
-            assert not cache_expire_time
-
-            @contextlib.contextmanager
-            def enter_dogpile_block(dogpile):
-                with dogpile.acquire(lambda: create_resource(dogpile)) as x:
-                    yield the_resource[-1]
-        elif inline_create == 'get_value':
-            @contextlib.contextmanager
-            def enter_dogpile_block(dogpile):
-                with dogpile.acquire(
-                        lambda: create_resource(dogpile), 
-                        get_value
-                    ) as rec:
-                    yield rec
-        elif inline_create == 'get_value_plus_created':
-            @contextlib.contextmanager
-            def enter_dogpile_block(dogpile):
-                with dogpile.acquire(
-                        lambda: create_resource(dogpile), 
-                        value_and_created_fn=get_value
-                    ) as rec:
-                    yield rec
-        else:
-            assert False, inline_create
-
-
-        def use_dogpile():
-            try:
-                for i in range(num_usages):
-                    dogpile = get_dogpile()
-                    now = time.time()
-                    with enter_dogpile_block(dogpile) as value:
-                        waited = time.time() - now
-                        if waited > .01:
-                            slow_waiters[0] += 1
-                        check_value(value, waited)
-                        time.sleep(usage_time)
-                    time.sleep(delay_time)
-            except:
-                log.error("thread failed", exc_info=True)
-                failures[0] += 1
-
-        def check_value(value, waited):
-            assert value
-
-            # time since the current resource was
-            # created
-            time_since_create = time.time() - value
-
-            self._assert_log(
-                time_since_create < max_stale,
-                "Time since create %.4f max stale time %s, "
-                    "total waited %s",
-                time_since_create, max_stale, 
-                slow_waiters[0]
-            )
-
-        started_at = time.time()
-        threads = []
-        for i in range(num_threads):
-            t = threading.Thread(target=use_dogpile)
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
-        actual_run_time = time.time() - started_at
-
-        # time spent starts with num usages * time per usage, with a 10% fudge
-        expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
-
-        expected_generations = math.ceil(expected_run_time / effective_expiretime)
-
-        if unsafe_cache:
-            expected_slow_waiters = expected_generations * num_threads
-        else:
-            expected_slow_waiters = expected_generations + num_threads - 1
-
-        if slow_write_time:
-            expected_slow_waiters = num_threads * expected_generations
-
-        # time spent also increments by one wait period in the beginning...
-        expected_run_time += effective_creation_time
-
-        # and a fudged version of the periodic waiting time anticipated
-        # for a single thread...
-        expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads
-        expected_run_time *= 1.1
-
-        log.info("Test Summary")
-        log.info("num threads: %s; expiretime: %s; creation_time: %s; "
-                "num_usages: %s; "
-                "usage_time: %s; delay_time: %s", 
-            num_threads, expiretime, creation_time, num_usages, 
-            usage_time, delay_time
-        )
-        log.info("cache expire time: %s; unsafe cache: %s slow "
-                "write time: %s; inline: %s; registry: %s", 
-            cache_expire_time, unsafe_cache, slow_write_time, 
-            inline_create, use_registry)
-        log.info("Estimated run time %.2f actual run time %.2f", 
-                    expected_run_time, actual_run_time)
-        log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", 
-                    effective_expiretime)
-        log.info("Expected slow waits %s, Total slow waits %s", 
-                    expected_slow_waiters, slow_waiters[0])
-        log.info("Total generations %s Max generations expected %s" % (
-            len(the_resource), expected_generations
-        ))
-
-        assert not failures[0], "%s failures occurred" % failures[0]
-        assert actual_run_time <= expected_run_time
-
-        assert slow_waiters[0] <= expected_slow_waiters, \
-            "Number of slow waiters %s exceeds expected slow waiters %s" % (
-                slow_waiters[0],
-                expected_slow_waiters
-            )
-        assert len(the_resource) <= expected_generations,\
-            "Number of resource generations %d exceeded "\
-            "expected %d" % (len(the_resource), 
-                expected_generations)
-
-class DogpileTest(TestCase):
-    def test_single_create(self):
-        dogpile = Dogpile(2)
-        the_resource = [0]
-
-        def create_resource():
-            the_resource[0] += 1
-
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 1
-
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 1
-
-        time.sleep(2)
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 2
-
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 2
-
-    def test_no_expiration(self):
-        dogpile = Dogpile(None)
-        the_resource = [0]
-
-        def create_resource():
-            the_resource[0] += 1
-
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 1
-
-        with dogpile.acquire(create_resource):
-            assert the_resource[0] == 1
-

File tests/test_nameregistry.py

-from unittest import TestCase
-import time
-import threading
-from dogpile.nameregistry import NameRegistry
-import random
-
-import logging
-log = logging.getLogger(__name__)
-
-class NameRegistryTest(TestCase):
-
-    def test_name_registry(self):
-        success = [True]
-        num_operations = [0]
-
-        def create(identifier):
-            log.debug("Creator running for id: " + identifier)
-            return threading.Lock()
-
-        registry = NameRegistry(create)
-
-        baton = {
-            "beans":False,
-            "means":False,
-            "please":False
-        }
-
-        def do_something(name):
-            for iteration in xrange(20):
-                name = baton.keys()[random.randint(0, 2)]
-                lock = registry.get(name)
-                lock.acquire()
-                try:
-                    if baton[name]:
-                        success[0] = False
-                        log.debug("Baton is already populated")
-                        break
-                    baton[name] = True
-                    try:
-                        time.sleep(random.random() * .01)
-                    finally:
-                        num_operations[0] += 1
-                        baton[name] = False
-                finally:
-                    lock.release()
-            log.debug("thread completed operations")
-
-        threads = []
-        for id_ in range(1, 20):
-            t = threading.Thread(target=do_something, args=("somename",))
-            t.start()
-            threads.append(t)
-
-        for t in threads:
-            t.join()
-
-        assert success[0]
-