Mike Bayer avatar Mike Bayer committed d35be65

- implement a memcached lock, enabled by setting distributed_lock=True
- mutex-related tests

Comments (0)

Files changed (6)

docs/build/api.rst

 .. automodule:: dogpile.cache.backends.memory
     :members:
 
-Pylibmc Backend
----------------
+Memcached Backends
+------------------
 
 .. automodule:: dogpile.cache.backends.memcached
     :members:

dogpile/cache/backends/memcached.py

 """Provides backends for talking to memcached."""
 
-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache.api import CacheBackend, NO_VALUE
 from dogpile.cache import util
+import random
+import time
+
+class MemcachedLock(object):
+    """Simple distributed lock using memcached.
+    
+    This is an adaptation of the lock featured at
+    http://amix.dk/blog/post/19386
+    
+    """
+
+    def __init__(self, client_fn, key):
+        self.client_fn = client_fn
+        self.key = "_lock" + key
+
+    def acquire(self, wait=True):
+        client = self.client_fn()
+        i = 0
+        while True:
+            if client.add(self.key, 1):
+                return True
+            elif not wait:
+                return False
+            else:
+                sleep_time = (((i+1)*random.random()) + 2**i) / 2.5
+                time.sleep(sleep_time)
+            if i < 15:
+                i += 1
+
+    def release(self):
+        client = self.client_fn()
+        client.delete(self.key)
 
 class PylibmcBackend(CacheBackend):
-    """A backend for the `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_ 
+    """A backend for the 
+    `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_ 
     memcached client.
     
     E.g.::
     
     :param url: the string URL to connect to.  Can be a single
      string or a list of strings.
+    :param distributed_lock: boolean, when True, will use a
+     memcached-lock as the dogpile lock (see :class:`.MemcachedLock`).   
+     Use this when multiple
+     processes will be talking to the same memcached instance.
+     When left at False, dogpile will coordinate on a regular
+     threading mutex.  
     :param binary: sets the ``binary`` flag understood by
      ``pylibmc.Client``.
     :param behaviors: a dictionary which will be passed to
      be passed as the ``time`` parameter to ``pylibmc.Client.set``.
      This is used to set the memcached expiry time for a value.
      
-     Note that this is **different** from Dogpile's own 
-     ``expiration_time``, which is the number of seconds after
-     which Dogpile will consider the value to be expired, however
-     Dogpile **will continue to use this value** until a new
-     one can be generated, when using :meth:`.CacheRegion.get_or_create`.
-     Therefore, if you are setting ``memcached_expire_time``, you'll
-     usually want to make sure it is greater than ``expiration_time`` 
-     by at least enough seconds for new values to be generated.
+     .. note::
+
+         This parameter is **different** from Dogpile's own 
+         ``expiration_time``, which is the number of seconds after
+         which Dogpile will consider the value to be expired. 
+         When Dogpile considers a value to be expired, 
+         it **continues to use the value** until generation
+         of a new value is complete, when using 
+         :meth:`.CacheRegion.get_or_create`.
+         Therefore, if you are setting ``memcached_expire_time``, you'll
+         want to make sure it is greater than ``expiration_time`` 
+         by at least enough seconds for new values to be generated,
+         else the value won't be available during a regeneration, 
+         forcing all threads to wait for a regeneration each time 
+         a value expires.
+
     :param min_compres_len: Integer, will be passed as the 
      ``min_compress_len`` parameter to the ``pylibmc.Client.set``
      method.
      
-    Threading
-    ---------
-    
     The :class:`.PylibmcBackend` uses a ``threading.local()``
     object to store individual ``pylibmc.Client`` objects per thread.
     ``threading.local()`` has the advantage over pylibmc's built-in
         self._imports()
         self.url = util.to_list(arguments['url'])
         self.binary = arguments.get('binary', False)
+        self.distributed_lock = arguments.get('distributed_lock', False)
         self.behaviors = arguments.get('behaviors', {})
         self.memcached_expire_time = arguments.get(
                                         'memcached_expire_time', 0)
 
         self._clients = ClientPool()
 
+    def get_mutex(self, key):
+        if self.distributed_lock:
+            return MemcachedLock(lambda: self._clients.memcached, key)
+        else:
+            return None
+
     def _imports(self):
         global pylibmc
         import pylibmc

dogpile/cache/backends/memory.py

 """Provides a simple dictionary-based backend."""
 
-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache.api import CacheBackend, NO_VALUE
 
 class MemoryBackend(CacheBackend):
     """A backend that uses a plain dictionary.

dogpile/cache/region.py

 from dogpile import Dogpile, NeedRegenerationException
 from dogpile.nameregistry import NameRegistry
 
-from dogpile.cache.util import function_key_generator, PluginLoader, memoized_property
+from dogpile.cache.util import function_key_generator, PluginLoader, \
+    memoized_property
 from dogpile.cache.api import NO_VALUE, CachedValue
 import time
 
         :param expiration_time:   Optional.  The expiration time passed 
          to the dogpile system.  The :meth:`.CacheRegion.get_or_create`
          method as well as the :meth:`.CacheRegion.cache_on_arguments` 
-         decorator (though note:  **not** the :meth:`.CacheRegion.get` method)
-         will call upon the value creation function after this
+         decorator (though note:  **not** the :meth:`.CacheRegion.get` 
+         method) will call upon the value creation function after this
          time period has passed since the last generation.
 
-        :param arguments:   Optional.  The structure here is passed directly 
-         to the constructor of the :class:`.CacheBackend` in use, though 
-         is typically a dictionary.
+        :param arguments:   Optional.  The structure here is passed 
+         directly to the constructor of the :class:`.CacheBackend` 
+         in use, though is typically a dictionary.
          
         """
         if "backend" in self.__dict__:
                 "cache.memcached.arguments.url":"127.0.0.1, 10.0.0.1",
             }
             local_region.configure_from_config(myconfig, "cache.local.")
-            memcached_region.configure_from_config(myconfig, "cache.memcached.")
+            memcached_region.configure_from_config(myconfig, 
+                                                "cache.memcached.")
 
         """
         return self.configure(
             config_dict["%s.backend" % prefix],
-            expiration_time = config_dict.get("%s.expiration_time" % prefix, None),
+            expiration_time = config_dict.get(
+                                "%s.expiration_time" % prefix, None),
             _config_argument_dict=config_dict,
             _config_prefix="%s.arguments" % prefix
         )
     def get(self, key):
         """Return a value from the cache, based on the given key.
 
-        While it's typical the key is a string, it's passed through to the
-        underlying backend so can be of any type recognized by the backend. If
-        the value is not present, returns the token ``NO_VALUE``. ``NO_VALUE``
-        evaluates to False, but is separate from ``None`` to distinguish
-        between a cached value of ``None``. Note that the ``expiration_time``
-        argument is **not** used here - this method is a direct line to the
+        While it's typical the key is a string, it's 
+        passed through to the underlying backend so can be 
+        of any type recognized by the backend. If
+        the value is not present, returns the token 
+        ``NO_VALUE``. ``NO_VALUE`` evaluates to False, but is 
+        separate from ``None`` to distinguish between a cached value 
+        of ``None``. Note that the ``expiration_time`` argument is 
+        **not** used here - this method is a direct line to the
         backend's behavior. 
 
         """
         return value.payload
 
     def get_or_create(self, key, creator):
-        """Similar to ``get``, will use the given "creation" function to create a new
+        """Similar to ``get``, will use the given "creation" 
+        function to create a new
         value if the value does not exist.
 
         This will use the underlying dogpile/
-        expiration mechanism to determine when/how the creation function is called.
+        expiration mechanism to determine when/how 
+        the creation function is called.
 
         """
         if self.key_mangler:
         def gen_value():
             value = self._value(creator())
             self.backend.set(key, value)
-            return value
+            return value.payload, value.metadata["creation_time"]
 
         dogpile = self.dogpile_registry.get(key)
-        with dogpile.acquire(gen_value, value_and_created_fn=get_value) as value:
+        with dogpile.acquire(gen_value, 
+                    value_and_created_fn=get_value) as value:
             return value
 
     def _value(self, value):
         self.backend.delete(key)
 
     def cache_on_arguments(self, fn):
-        """A function decorator that will cache the return value of the
-        function using a key derived from the name of the function, its
-        location within the application (i.e. source filename) as well as the
+        """A function decorator that will cache the return 
+        value of the function using a key derived from the 
+        name of the function, its location within the 
+        application (i.e. source filename) as well as the
         arguments passed to the function.
         
         E.g.::
             generate_something.invalidate(5, 6)
 
         The generation of the key from the function is the big 
-        controversial thing that was a source of user issues with Beaker.  Dogpile
-        provides the latest and greatest algorithm used by Beaker, but also
-        allows you to use whatever function you want, by specifying it
-        to using the ``function_key_generator`` argument to :func:`.make_region`
-        and/or :class:`.CacheRegion`.  If defaults to :func:`.function_key_generator`.
+        controversial thing that was a source of user issues with 
+        Beaker. Dogpile provides the latest and greatest algorithm 
+        used by Beaker, but also allows you to use whatever function 
+        you want, by specifying it to using the ``function_key_generator`` 
+        argument to :func:`.make_region` and/or
+        :class:`.CacheRegion`. If defaults to 
+        :func:`.function_key_generator`.
 
         """
         key_generator = self.function_key_generator(fn)

tests/_fixtures.py

 from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
-from dogpile.cache import register_backend, CacheRegion
+from dogpile.cache import register_backend, CacheRegion, util
+from dogpile.cache.region import _backend_loader
 from tests import eq_, assert_raises_message
 import itertools
 import time
 from nose import SkipTest
-
+from threading import Thread, Lock
 from unittest import TestCase
 
-class _GenericBackendTest(TestCase):
+class _GenericBackendFixture(object):
     @classmethod
     def setup_class(cls):
         try:
-            cls._region()
+            backend_cls = _backend_loader.load(cls.backend)
+            backend_cls(cls.config_args.get('arguments', {}))
         except ImportError:
             raise SkipTest("Backend %s not installed" % cls.backend)
 
-    backend = None
     region_args = {}
     config_args = {}
 
-    @classmethod
-    def _region(cls, region_args={}, config_args={}):
-        _region_args = {}
-        _region_args = cls.region_args.copy()
+    _region_inst = None
+    _backend_inst = None
+
+    def _region(self, region_args={}, config_args={}):
+        _region_args = self.region_args.copy()
         _region_args.update(**region_args)
-        reg = CacheRegion(**_region_args)
-        _config_args = cls.config_args.copy()
+        _config_args = self.config_args.copy()
         _config_args.update(config_args)
-        reg.configure(cls.backend, **_config_args)
+
+        self._region_inst = reg = CacheRegion(**_region_args)
+        reg.configure(self.backend, **_config_args)
         return reg
 
-    def test_set_get_value(self):
+    def _backend(self):
+        backend_cls = _backend_loader.load(self.backend)
+        _config_args = self.config_args.copy()
+        self._backend_inst = backend_cls(_config_args.get('arguments', {}))
+        return self._backend_inst
+
+    def tearDown(self):
+        if self._region_inst:
+            self._region_inst.delete("some key")
+        elif self._backend_inst:
+            self._backend_inst.delete("some_key")
+
+class _GenericBackendTest(_GenericBackendFixture, TestCase):
+    def test_backend_get_nothing(self):
+        backend = self._backend()
+        eq_(backend.get("some_key"), NO_VALUE)
+
+    def test_backend_delete_nothing(self):
+        backend = self._backend()
+        backend.delete("some_key")
+
+    def test_backend_set_get_value(self):
+        backend = self._backend()
+        backend.set("some_key", "some value")
+        eq_(backend.get("some_key"), "some value")
+
+    def test_backend_delete(self):
+        backend = self._backend()
+        backend.set("some_key", "some value")
+        backend.delete("some_key")
+        eq_(backend.get("some_key"), NO_VALUE)
+
+    def test_region_set_get_value(self):
         reg = self._region()
         reg.set("some key", "some value")
         eq_(reg.get("some key"), "some value")
 
-    def test_set_get_nothing(self):
+    def test_region_set_get_nothing(self):
         reg = self._region()
         eq_(reg.get("some key"), NO_VALUE)
 
-    def test_creator(self):
+    def test_region_creator(self):
         reg = self._region()
         def creator():
             return "some value"
         eq_(reg.get_or_create("some key", creator), "some value")
 
-    def test_remove(self):
+    def test_threaded_dogpile(self):
+        # run a basic dogpile concurrency test.
+        # note the concurrency of dogpile itself
+        # is intensively tested as part of dogpile.
+        reg = self._region(config_args={"expiration_time":.25})
+        lock = Lock()
+        canary = []
+        def creator():
+            ack = lock.acquire(False)
+            canary.append(ack)
+            time.sleep(.5)
+            if ack:
+                lock.release()
+            return "some value"
+        def f():
+            for x in xrange(5):
+                reg.get_or_create("some key", creator)
+                time.sleep(.5)
+
+        threads = [Thread(target=f) for i in xrange(5)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        assert len(canary) > 3
+        assert False not in canary
+
+    def test_region_delete(self):
         reg = self._region()
         reg.set("some key", "some value")
         reg.delete("some key")
         reg.delete("some key")
         eq_(reg.get("some key"), NO_VALUE)
 
-    def test_expire(self):
-        reg = self._region(config_args={"expiration_time":1})
+    def test_region_expire(self):
+        reg = self._region(config_args={"expiration_time":.25})
         counter = itertools.count(1)
         def creator():
             return "some value %d" % next(counter)
         eq_(reg.get_or_create("some key", creator), "some value 1")
-        time.sleep(1)
+        time.sleep(.4)
         eq_(reg.get("some key"), "some value 1")
         eq_(reg.get_or_create("some key", creator), "some value 2")
         eq_(reg.get("some key"), "some value 2")
+
+class _GenericMutexTest(_GenericBackendFixture, TestCase):
+    def test_mutex(self):
+        backend = self._backend()
+        mutex = backend.get_mutex("foo")
+
+        ac = mutex.acquire()
+        assert ac
+        ac2 = mutex.acquire(wait=False)
+        assert not ac2
+        mutex.release()
+        ac3 = mutex.acquire()
+        assert ac3
+        mutex.release()
+
+    def test_mutex_threaded(self):
+        backend = self._backend()
+        mutex = backend.get_mutex("foo")
+
+        lock = Lock()
+        canary = []
+        def f():
+            for x in xrange(5):
+                mutex = backend.get_mutex("foo")
+                mutex.acquire()
+                for y in xrange(5):
+                    ack = lock.acquire(False)
+                    canary.append(ack)
+                    time.sleep(.002)
+                    if ack:
+                        lock.release()
+                mutex.release()
+                time.sleep(.02)
+
+        threads = [Thread(target=f) for i in xrange(5)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        assert False not in canary

tests/test_pylibmc_backend.py

-from tests._fixtures import _GenericBackendTest
+from tests._fixtures import _GenericBackendTest, _GenericMutexTest
 from tests import eq_
 from unittest import TestCase
 from threading import Thread
 import time
 
-class PyLibMCBackendTest(_GenericBackendTest):
+class PylibmcTest(_GenericBackendTest):
     backend = "dogpile.cache.pylibmc"
 
     region_args = {
         }
     }
 
+class PylibmcDistributedTest(_GenericBackendTest):
+    backend = "dogpile.cache.pylibmc"
+
+    region_args = {
+        "key_mangler":lambda x: x.replace(" ", "_")
+    }
+    config_args = {
+        "arguments":{
+            "url":"127.0.0.1:11211",
+            "distributed_lock":True
+        }
+    }
+
+class PylibmcDistributedMutexTest(_GenericMutexTest):
+    backend = "dogpile.cache.pylibmc"
+
+    config_args = {
+        "arguments":{
+            "url":"127.0.0.1:11211",
+            "distributed_lock":True
+        }
+    }
+
 from dogpile.cache.backends.memcached import PylibmcBackend
 class MockPylibmcBackend(PylibmcBackend):
     def _imports(self):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.