Mike Bayer avatar Mike Bayer committed 0232773

- Added support to DBM file lock to allow reentrant
access per key within a single thread, so that
even though the DBM backend locks for the whole file,
a creation function that calls upon a different
key in the cache can still proceed. #5

- Fixed DBM glitch where multiple readers
could be serialized.

Comments (0)

Files changed (5)

   to Python objects - ints, "false", "true", "None".
   #4
 
+- Added support to DBM file lock to allow reentrant
+  access per key within a single thread, so that 
+  even though the DBM backend locks for the whole file,
+  a creation function that calls upon a different
+  key in the cache can still proceed.  #5
+
+- Fixed DBM glitch where multiple readers 
+  could be serialized.
+
 0.2.2
 =====
 - add Redis backend, courtesy Ollie Rutherfurd

dogpile/cache/backends/file.py

         self._dogpile_lock = self._init_lock(
                                 arguments.get('dogpile_lockfile'), 
                                 ".dogpile.lock", 
-                                dir_, filename)
+                                dir_, filename,
+                                util.KeyReentrantMutex.factory)
 
         # TODO: make this configurable
         if util.py3k:
         self.dbmmodule = dbm
         self._init_dbm_file()
 
-    def _init_lock(self, argument, suffix, basedir, basefile):
+    def _init_lock(self, argument, suffix, basedir, basefile, wrapper=None):
         if argument is None:
-            return FileLock(os.path.join(basedir, basefile + suffix))
+            lock = FileLock(os.path.join(basedir, basefile + suffix))
         elif argument is not False:
-            return FileLock(
+            lock = FileLock(
                         os.path.abspath(
                             os.path.normpath(argument)
                         ))
         else:
             return None
+        if wrapper:
+            lock = wrapper(lock)
+        return lock
 
     def _init_dbm_file(self):
         exists = os.access(self.filename, os.F_OK)
         # break other processes trying to get at the file
         # at the same time - so handling unlimited keys
         # can't imply unlimited filenames
-        return self._dogpile_lock
+        if self._dogpile_lock:
+            return self._dogpile_lock(key)
+        else:
+            return None
 
     @contextmanager
     def _use_rw_lock(self, write):
             dbm.close()
 
     def get(self, key):
-        with self._dbm_file('r') as dbm:
+        with self._dbm_file(False) as dbm:
             value = dbm.get(key, NO_VALUE)
             if value is not NO_VALUE:
                 value = util.pickle.loads(value)
             return value
 
     def set(self, key, value):
-        with self._dbm_file('w') as dbm:
+        with self._dbm_file(True) as dbm:
             dbm[key] = util.pickle.dumps(value)
 
     def delete(self, key):
-        with self._dbm_file('w') as dbm:
+        with self._dbm_file(True) as dbm:
             try:
                 del dbm[key]
             except KeyError:

dogpile/cache/util.py

 import inspect
 import sys
 import re
+import collections
 
 try:
     import threading
     """Return a function that generates a string
     key, based on a given function as well as
     arguments to the returned function itself.
-    
+
     This is used by :meth:`.CacheRegion.cache_on_arguments`
     to generate a cache key from a decorated function.
-    
+
     It can be replaced using the ``function_key_generator``
     argument passed to :func:`.make_region`.
-    
+
     """
 
     if namespace is None:
 def length_conditional_mangler(length, mangler):
     """a key mangler that mangles if the length of the key is
     past a certain threshold.
-    
+
     """
     def mangle(key):
         if len(key) >= length:
         return [x]
     else:
         return x
+
+
+class KeyReentrantMutex(object):
+
+    def __init__(self, key, mutex, keys):
+        self.key = key
+        self.mutex = mutex
+        self.keys = keys
+
+    @classmethod
+    def factory(cls, mutex):
+        # this collection holds zero or one
+        # thread idents as the key; a set of
+        # keynames held as the value.
+        keystore = collections.defaultdict(set)
+        def fac(key):
+            return KeyReentrantMutex(key, mutex, keystore)
+        return fac
+
+    def acquire(self, wait=True):
+        current_thread = threading.current_thread().ident
+        keys = self.keys.get(current_thread)
+        if keys is not None and \
+            self.key not in keys:
+            # current lockholder, new key. add it in
+            keys.add(self.key)
+            return True
+        elif self.mutex.acquire(wait=wait):
+            # after acquire, create new set and add our key
+            self.keys[current_thread].add(self.key)
+            return True
+        else:
+            return False
+
+    def release(self):
+        current_thread = threading.current_thread().ident
+        keys = self.keys.get(current_thread)
+        assert keys is not None, "this thread didn't do the acquire"
+        assert self.key in keys, "No acquire held for key '%s'" % self.key
+        keys.remove(self.key)
+        if not keys:
+            # when list of keys empty, remove
+            # the thread ident and unlock.
+            del self.keys[current_thread]
+            self.mutex.release()

tests/cache/_fixtures.py

         for t in threads:
             t.join()
         assert False not in canary
+
+    def test_mutex_reentrant_across_keys(self):
+        backend = self._backend()
+        for x in range(3):
+            m1 = backend.get_mutex("foo")
+            m2 = backend.get_mutex("bar")
+            try:
+                m1.acquire()
+                assert m2.acquire(wait=False)
+                assert not m2.acquire(wait=False)
+                m2.release()
+
+                assert m2.acquire(wait=False)
+                assert not m2.acquire(wait=False)
+                m2.release()
+            finally:
+                m1.release()
+
+    def test_reentrant_dogpile(self):
+        reg = self._region()
+        def create_foo():
+            return "foo" + reg.get_or_create("bar", create_bar)
+
+        def create_bar():
+            return "bar"
+
+        eq_(
+            reg.get_or_create("foo", create_foo),
+            "foobar"
+        )
+        eq_(
+            reg.get_or_create("foo", create_foo),
+            "foobar"
+        )

tests/cache/test_dbm_backend.py

 from ._fixtures import _GenericBackendTest, _GenericMutexTest
-from . import eq_
+from . import eq_, assert_raises_message
 from unittest import TestCase
 from threading import Thread
 import time
         }
     }
 
+    def test_release_assertion_thread(self):
+        backend = self._backend()
+        m1 = backend.get_mutex("foo")
+        assert_raises_message(
+            AssertionError,
+            "this thread didn't do the acquire",
+            m1.release
+        )
+
+    def test_release_assertion_key(self):
+        backend = self._backend()
+        m1 = backend.get_mutex("foo")
+        m2 = backend.get_mutex("bar")
+
+        m1.acquire()
+        try:
+            assert_raises_message(
+                AssertionError,
+                "No acquire held for key 'bar'",
+                m2.release
+            )
+        finally:
+            m1.release()
+
 
 def teardown():
     for fname in os.listdir(os.curdir):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.