Commits

Mike Bayer  committed 8e685bb

- Add new "nameregistry" helper. Another fixture
derived from Beaker, this allows the ad-hoc creation of
a new Dogpile lock based on a name, where all other
threads calling that name at the same time will get
the same Dogpile lock. Allows any number of
logical "dogpile" actions to carry on concurrently
without any memory taken up outside of those operations.

- To support the use case supported by nameregistry, added
value_and_created_fn to dogpile.acquire(). The idea
is that the value_and_created_fn can return
(value, createdtime), so that the creation time of the
value can come from the cache, thus eliminating the
need for the dogpile lock to hang around persistently.

  • Participants
  • Parent commits 894cffc

Comments (0)

Files changed (7)

+0.2.1
+=====
+
+- Add new "nameregistry" helper.  Another fixture
+  derived from Beaker, this allows the ad-hoc creation of 
+  a new Dogpile lock based on a name, where all other
+  threads calling that name at the same time will get 
+  the same Dogpile lock.  Allows any number of
+  logical "dogpile" actions to carry on concurrently
+  without any memory taken up outside of those operations.
+
+- To support the use case supported by nameregistry, added
+  value_and_created_fn to dogpile.acquire().  The idea 
+  is that the value_and_created_fn can return 
+  (value, createdtime), so that the creation time of the 
+  value can come from the cache, thus eliminating the 
+  need for the dogpile lock to hang around persistently.
+
+0.2.0
+=====
+
+- change name to lowercase "dogpile".
+
+0.1.0
+=====
+
+- initial revision.
 once per access, instead of Beaker's system which calls it twice, and doesn't make us call
 get() when we just created the value.
 
+Using Dogpile across lots of keys
+----------------------------------
+
+The above patterns all feature the usage of Dogpile as an object held persistently
+for the lifespan of some value.  Two more helpers can allow the dogpile to be created
+as needed and then disposed, while still maintaining that concurrent threads lock.
+Here's the memcached example again using that technique::
+
+    import pylibmc
+    mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
+
+    from dogpile import Dogpile, NeedRegenerationException, NameRegistry
+    import pickle
+    import time
+
+    def cache(expiration_time)
+        dogpile_registry = NameRegistry(lambda identifier: Dogpile(expiration_time))
+
+        def get_or_create(key):
+
+            def get_value():
+                 with mc_pool.reserve() as mc:
+                    value = mc.get(key)
+                    if value is None:
+                        raise NeedRegenerationException()
+                    # deserialize a tuple
+                    # (value, createdtime)
+                    return pickle.loads(value)
+
+            dogpile = dogpile_registry.get(key)
+
+            def gen_cached():
+                value = fn()
+                with mc_pool.reserve() as mc:
+                    # serialize a tuple
+                    # (value, createdtime)
+                    mc.put(key, pickle.dumps(value, time.time()))
+                return value
+
+            with dogpile.acquire(gen_cached, value_and_created_fn=get_value) as value:
+                return value
+
+        return get_or_create
+
+Above, we use a ``NameRegistry`` which will give us a ``Dogpile`` object that's 
+unique on a certain name.   When all usages of that name are complete, the ``Dogpile``
+object falls out of scope, so total number of keys used is not a memory issue.
+Then, tell Dogpile that we'll give it the "creation time" that we'll store in our
+cache - we do this using the ``value_and_created_fn`` argument, which assumes we'll
+be storing and loading the value as a tuple of (value, createdtime).  The creation time
+should always be calculated via ``time.time()``.   The ``acquire()`` function
+returns just the first part of the tuple, the value, to us, and uses the 
+createdtime portion to determine if the value is expired.
+
+
 Development Status
 -------------------
 

File dogpile/__init__.py

 from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
+from nameregistry import NameRegistry
 
-__version__ = '0.2.0'
+__version__ = '0.2.1'
 

File dogpile/dogpile.py

             replace_old_datafile_with_new()
 
 """
-try:
-    import threading
-    import thread
-except ImportError:
-    import dummy_threading as threading
-    import dummy_thread as thread
-
+from util import thread, threading
 import time
 import logging
 from readwrite_lock import ReadWriteMutex
     """
     def __init__(self, expiretime, init=False):
         self.dogpilelock = threading.Lock()
+
         self.expiretime = expiretime
         if init:
             self.createdtime = time.time()
         else:
             self.createdtime = -1
 
-    def acquire(self, creator, value_fn=None):
+    def acquire(self, creator, 
+                        value_fn=None, 
+                        value_and_created_fn=None):
         """Acquire the lock, returning a context manager.
         
         :param creator: Creation function, used if this thread
          is chosen to create a new value.
          
+        :param value_fn: Optional function that returns
+         the value from some datasource.  Will be returned
+         if regeneration is not needed.
+
+        :param value_and_created_fn: Like value_fn, but returns a tuple
+         of (value, createdtime).  The returned createdtime 
+         will replace the "createdtime" value on this dogpile
+         lock.   This option removes the need for the dogpile lock
+         itself to remain persistent across usages; another 
+         dogpile can come along later and pick up where the
+         previous one left off.   Should be used in conjunction
+         with a :class:`.NameRegistry`.
+         
         """
         dogpile = self
+
+        if value_and_created_fn:
+            value_fn = value_and_created_fn
+
         class Lock(object):
             if value_fn:
                 def __enter__(self):
                     try:
                         value = value_fn()
+                        if value_and_created_fn:
+                            value, dogpile.createdtime = value
                     except NeedRegenerationException:
                         dogpile.createdtime = -1
                         value = NOT_REGENERATED
         pass
 
 class SyncReaderDogpile(Dogpile):
-    def __init__(self, expiretime):
-        super(SyncReaderDogpile, self).__init__(expiretime)
+    def __init__(self, *args, **kw):
+        super(SyncReaderDogpile, self).__init__(*args, **kw)
         self.readwritelock = ReadWriteMutex()
 
     def acquire_write_lock(self):

File dogpile/nameregistry.py

+from util import threading
+import weakref
+
+class NameRegistry(object):
+    """Generates and return an object, keeping it as a
+    singleton for a certain identifier for as long as its 
+    strongly referenced.
+    
+    e.g.::
+    
+        class MyFoo(object):
+            "some important object."
+
+        registry = NameRegistry(MyFoo)
+
+        # thread 1:
+        my_foo = registry.get("foo1")
+        
+        # thread 2
+        my_foo = registry.get("foo1")
+    
+    Above, "my_foo" in both thread #1 and #2 will
+    be *the same object*.
+    
+    When thread 1 and thread 2 both complete or 
+    otherwise delete references to "my_foo", the
+    object is *removed* from the NameRegistry as 
+    a result of Python garbage collection.
+    
+    """
+    _locks = weakref.WeakValueDictionary()
+    _mutex = threading.RLock()
+
+    def __init__(self, creator):
+        self._values = weakref.WeakValueDictionary()
+        self._mutex = threading.RLock()
+        self.creator = creator
+
+    def get(self, identifier, *args, **kw):
+        try:
+            if identifier in self._values:
+                return self._values[identifier]
+            else:
+                return self._sync_get(identifier, *args, **kw)
+        except KeyError:
+            return self._sync_get(identifier, *args, **kw)
+
+    def _sync_get(self, identifier, *args, **kw):
+        self._mutex.acquire()
+        try:
+            try:
+                if identifier in self._values:
+                    return self._values[identifier]
+                else:
+                    self._values[identifier] = value = self.creator(identifier, *args, **kw)
+                    return value
+            except KeyError:
+                self._values[identifier] = value = self.creator(identifier, *args, **kw)
+                return value
+        finally:
+            self._mutex.release()

File dogpile/util.py

+try:
+    import threading
+    import thread
+except ImportError:
+    import dummy_threading as threading
+    import dummy_thread as thread
+

File tests/test_nameregistry.py

+from unittest import TestCase
+import time
+import threading
+from dogpile.nameregistry import NameRegistry
+import random
+
+import logging
+log = logging.getLogger(__name__)
+
+class NameRegistryTest(TestCase):
+
+    def test_name_registry(self):
+        success = [True]
+        num_operations = [0]
+
+        def create(identifier):
+            log.debug("Creator running for id: " + identifier)
+            return threading.Lock()
+
+        registry = NameRegistry(create)
+
+        baton = {
+            "beans":False,
+            "means":False,
+            "please":False
+        }
+
+        def do_something(name):
+            for iteration in xrange(20):
+                name = baton.keys()[random.randint(0, 2)]
+                lock = registry.get(name)
+                lock.acquire()
+                try:
+                    if baton[name]:
+                        success[0] = False
+                        log.debug("Baton is already populated")
+                        break
+                    baton[name] = True
+                    try:
+                        time.sleep(random.random() * .01)
+                    finally:
+                        num_operations[0] += 1
+                        baton[name] = False
+                finally:
+                    lock.release()
+            log.debug("thread completed operations")
+
+        threads = []
+        for id_ in range(1, 20):
+            t = threading.Thread(target=do_something, args=("somename",))
+            t.start()
+            threads.append(t)
+
+        for t in threads:
+            t.join()
+
+        assert success[0]
+