Commits

Mike Bayer committed 928fe42

- move tests into tests.cache namespace
- implement py3k cross-compatibility

Comments (0)

Files changed (20)

dogpile/cache/__init__.py

 __version__ = '0.2.0'
 
-from region import CacheRegion, register_backend, make_region
+from .region import CacheRegion, register_backend, make_region

dogpile/cache/api.py

 import operator
+from . import util
 
 class NoValue(object):
     """Describe a missing cache value.
     def payload(self):
         return self
 
-    def __nonzero__(self):
-        return False
+    if util.py3k:
+        def __bool__(self):
+            return False
+    else:
+        def __nonzero__(self):
+            return False
 
 NO_VALUE = NoValue()
 """Value returned from ``get()`` that describes 

dogpile/cache/backends/file.py

                                 dir_, filename)
 
         # TODO: make this configurable
-        import anydbm
-        self.dbmmodule = anydbm
+        if util.py3k:
+            import dbm
+        else:
+            import anydbm as dbm
+        self.dbmmodule = dbm
         self._init_dbm_file()
 
     def _init_lock(self, argument, suffix, basedir, basefile):
 
     def delete(self, key):
         with self._dbm_file('w') as dbm:
-            dbm.pop(key, None)
+            try:
+                del dbm[key]
+            except KeyError:
+                pass
 
 class FileLock(object):
     """Use lockfiles to coordinate read/write access to a file."""

dogpile/cache/region.py

 from dogpile.core import Dogpile, NeedRegenerationException
 from dogpile.core.nameregistry import NameRegistry
 
-from dogpile.cache.util import function_key_generator, PluginLoader, \
+from .util import function_key_generator, PluginLoader, \
     memoized_property
-from dogpile.cache.api import NO_VALUE, CachedValue
+from .api import NO_VALUE, CachedValue
 import time
 from functools import wraps
 
 _backend_loader = PluginLoader("dogpile.cache")
 register_backend = _backend_loader.register
-import backends
+from . import backends
 
 value_version = 1
 """An integer placed in the :class:`.CachedValue`

dogpile/cache/util.py

 
 try:
     import threading
-    import thread
 except ImportError:
     import dummy_threading as threading
-    import dummy_thread as thread
 
-py3k = getattr(sys, 'py3kwarning', False) or sys.version_info >= (3, 0)
+py3k = sys.version_info >= (3, 0)
 jython = sys.platform.startswith('java')
 
 if py3k or jython:
 else:
     import cPickle as pickle
 
+if py3k:
+    tounicode = str
+else:
+    tounicode = unicode
+
 class PluginLoader(object):
     def __init__(self, group):
         self.group = group
                     "function does not accept keyword arguments.")
         if has_self:
             args = args[1:]
-        return namespace + "|" + " ".join(map(unicode, args))
+        return namespace + "|" + " ".join(map(tounicode, args))
     return generate_key
 
 def sha1_mangle_key(key):
 
 from setuptools import setup, find_packages
 
-extra = {}
-if sys.version_info >= (3, 0):
-    extra.update(
-        use_2to3=True,
-    )
-
 v = open(os.path.join(os.path.dirname(__file__), 'dogpile', 'cache', '__init__.py'))
 VERSION = re.compile(r".*__version__ = '(.*?)'", re.S).match(v.read()).group(1)
 v.close()
       install_requires=['dogpile.core>=0.3.0'],
       test_suite='nose.collector',
       tests_require=['nose'],
-      **extra
 )

tests/__init__.py

-import re
-
-def eq_(a, b, msg=None):
-    """Assert a == b, with repr messaging on failure."""
-    assert a == b, msg or "%r != %r" % (a, b)
-
-def ne_(a, b, msg=None):
-    """Assert a != b, with repr messaging on failure."""
-    assert a != b, msg or "%r == %r" % (a, b)
-
-def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
-    try:
-        callable_(*args, **kwargs)
-        assert False, "Callable did not raise an exception"
-    except except_cls, e:
-        assert re.search(msg, str(e)), "%r !~ %s" % (msg, e)

tests/_fixtures.py

-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
-from dogpile.cache import register_backend, CacheRegion, util
-from dogpile.cache.region import _backend_loader
-from tests import eq_, assert_raises_message
-import itertools
-import time
-from nose import SkipTest
-from threading import Thread, Lock
-from unittest import TestCase
-
-class _GenericBackendFixture(object):
-    @classmethod
-    def setup_class(cls):
-        try:
-            backend_cls = _backend_loader.load(cls.backend)
-            backend = backend_cls(cls.config_args.get('arguments', {}))
-        except ImportError:
-            raise SkipTest("Backend %s not installed" % cls.backend)
-        cls._check_backend_available(backend)
-
-    @classmethod
-    def _check_backend_available(cls, backend):
-        pass
-
-    region_args = {}
-    config_args = {}
-
-    _region_inst = None
-    _backend_inst = None
-
-    def _region(self, region_args={}, config_args={}):
-        _region_args = self.region_args.copy()
-        _region_args.update(**region_args)
-        _config_args = self.config_args.copy()
-        _config_args.update(config_args)
-
-        self._region_inst = reg = CacheRegion(**_region_args)
-        reg.configure(self.backend, **_config_args)
-        return reg
-
-    def _backend(self):
-        backend_cls = _backend_loader.load(self.backend)
-        _config_args = self.config_args.copy()
-        self._backend_inst = backend_cls(_config_args.get('arguments', {}))
-        return self._backend_inst
-
-class _GenericBackendTest(_GenericBackendFixture, TestCase):
-    def tearDown(self):
-        if self._region_inst:
-            self._region_inst.delete("some key")
-        elif self._backend_inst:
-            self._backend_inst.delete("some_key")
-
-    def test_backend_get_nothing(self):
-        backend = self._backend()
-        eq_(backend.get("some_key"), NO_VALUE)
-
-    def test_backend_delete_nothing(self):
-        backend = self._backend()
-        backend.delete("some_key")
-
-    def test_backend_set_get_value(self):
-        backend = self._backend()
-        backend.set("some_key", "some value")
-        eq_(backend.get("some_key"), "some value")
-
-    def test_backend_delete(self):
-        backend = self._backend()
-        backend.set("some_key", "some value")
-        backend.delete("some_key")
-        eq_(backend.get("some_key"), NO_VALUE)
-
-    def test_region_set_get_value(self):
-        reg = self._region()
-        reg.set("some key", "some value")
-        eq_(reg.get("some key"), "some value")
-
-    def test_region_set_get_nothing(self):
-        reg = self._region()
-        eq_(reg.get("some key"), NO_VALUE)
-
-    def test_region_creator(self):
-        reg = self._region()
-        def creator():
-            return "some value"
-        eq_(reg.get_or_create("some key", creator), "some value")
-
-    def test_threaded_dogpile(self):
-        # run a basic dogpile concurrency test.
-        # note the concurrency of dogpile itself
-        # is intensively tested as part of dogpile.
-        reg = self._region(config_args={"expiration_time":.25})
-        lock = Lock()
-        canary = []
-        def creator():
-            ack = lock.acquire(False)
-            canary.append(ack)
-            time.sleep(.5)
-            if ack:
-                lock.release()
-            return "some value"
-        def f():
-            for x in xrange(5):
-                reg.get_or_create("some key", creator)
-                time.sleep(.5)
-
-        threads = [Thread(target=f) for i in xrange(5)]
-        for t in threads:
-            t.start()
-        for t in threads:
-            t.join()
-        assert len(canary) > 3
-        assert False not in canary
-
-    def test_region_delete(self):
-        reg = self._region()
-        reg.set("some key", "some value")
-        reg.delete("some key")
-        reg.delete("some key")
-        eq_(reg.get("some key"), NO_VALUE)
-
-    def test_region_expire(self):
-        reg = self._region(config_args={"expiration_time":.25})
-        counter = itertools.count(1)
-        def creator():
-            return "some value %d" % next(counter)
-        eq_(reg.get_or_create("some key", creator), "some value 1")
-        time.sleep(.4)
-        eq_(reg.get("some key"), "some value 1")
-        eq_(reg.get_or_create("some key", creator), "some value 2")
-        eq_(reg.get("some key"), "some value 2")
-
-class _GenericMutexTest(_GenericBackendFixture, TestCase):
-    def test_mutex(self):
-        backend = self._backend()
-        mutex = backend.get_mutex("foo")
-
-        ac = mutex.acquire()
-        assert ac
-        ac2 = mutex.acquire(wait=False)
-        assert not ac2
-        mutex.release()
-        ac3 = mutex.acquire()
-        assert ac3
-        mutex.release()
-
-    def test_mutex_threaded(self):
-        backend = self._backend()
-        mutex = backend.get_mutex("foo")
-
-        lock = Lock()
-        canary = []
-        def f():
-            for x in xrange(5):
-                mutex = backend.get_mutex("foo")
-                mutex.acquire()
-                for y in xrange(5):
-                    ack = lock.acquire(False)
-                    canary.append(ack)
-                    time.sleep(.002)
-                    if ack:
-                        lock.release()
-                mutex.release()
-                time.sleep(.02)
-
-        threads = [Thread(target=f) for i in xrange(5)]
-        for t in threads:
-            t.start()
-        for t in threads:
-            t.join()
-        assert False not in canary

tests/cache/__init__.py

+import re
+
+def eq_(a, b, msg=None):
+    """Assert a == b, with repr messaging on failure."""
+    assert a == b, msg or "%r != %r" % (a, b)
+
+def ne_(a, b, msg=None):
+    """Assert a != b, with repr messaging on failure."""
+    assert a != b, msg or "%r == %r" % (a, b)
+
+def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
+    try:
+        callable_(*args, **kwargs)
+        assert False, "Callable did not raise an exception"
+    except except_cls as e:
+        assert re.search(msg, str(e)), "%r !~ %s" % (msg, e)
+

tests/cache/_fixtures.py

+from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache import register_backend, CacheRegion, util
+from dogpile.cache.region import _backend_loader
+from . import eq_, assert_raises_message
+import itertools
+import time
+from nose import SkipTest
+from threading import Thread, Lock
+from unittest import TestCase
+
+class _GenericBackendFixture(object):
+    @classmethod
+    def setup_class(cls):
+        try:
+            backend_cls = _backend_loader.load(cls.backend)
+            backend = backend_cls(cls.config_args.get('arguments', {}))
+        except ImportError:
+            raise SkipTest("Backend %s not installed" % cls.backend)
+        cls._check_backend_available(backend)
+
+    @classmethod
+    def _check_backend_available(cls, backend):
+        pass
+
+    region_args = {}
+    config_args = {}
+
+    _region_inst = None
+    _backend_inst = None
+
+    def _region(self, region_args={}, config_args={}):
+        _region_args = self.region_args.copy()
+        _region_args.update(**region_args)
+        _config_args = self.config_args.copy()
+        _config_args.update(config_args)
+
+        self._region_inst = reg = CacheRegion(**_region_args)
+        reg.configure(self.backend, **_config_args)
+        return reg
+
+    def _backend(self):
+        backend_cls = _backend_loader.load(self.backend)
+        _config_args = self.config_args.copy()
+        self._backend_inst = backend_cls(_config_args.get('arguments', {}))
+        return self._backend_inst
+
+class _GenericBackendTest(_GenericBackendFixture, TestCase):
+    def tearDown(self):
+        if self._region_inst:
+            self._region_inst.delete("some key")
+        elif self._backend_inst:
+            self._backend_inst.delete("some_key")
+
+    def test_backend_get_nothing(self):
+        backend = self._backend()
+        eq_(backend.get("some_key"), NO_VALUE)
+
+    def test_backend_delete_nothing(self):
+        backend = self._backend()
+        backend.delete("some_key")
+
+    def test_backend_set_get_value(self):
+        backend = self._backend()
+        backend.set("some_key", "some value")
+        eq_(backend.get("some_key"), "some value")
+
+    def test_backend_delete(self):
+        backend = self._backend()
+        backend.set("some_key", "some value")
+        backend.delete("some_key")
+        eq_(backend.get("some_key"), NO_VALUE)
+
+    def test_region_set_get_value(self):
+        reg = self._region()
+        reg.set("some key", "some value")
+        eq_(reg.get("some key"), "some value")
+
+    def test_region_set_get_nothing(self):
+        reg = self._region()
+        eq_(reg.get("some key"), NO_VALUE)
+
+    def test_region_creator(self):
+        reg = self._region()
+        def creator():
+            return "some value"
+        eq_(reg.get_or_create("some key", creator), "some value")
+
+    def test_threaded_dogpile(self):
+        # run a basic dogpile concurrency test.
+        # note the concurrency of dogpile itself
+        # is intensively tested as part of dogpile.
+        reg = self._region(config_args={"expiration_time":.25})
+        lock = Lock()
+        canary = []
+        def creator():
+            ack = lock.acquire(False)
+            canary.append(ack)
+            time.sleep(.5)
+            if ack:
+                lock.release()
+            return "some value"
+        def f():
+            for x in range(5):
+                reg.get_or_create("some key", creator)
+                time.sleep(.5)
+
+        threads = [Thread(target=f) for i in range(5)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        assert len(canary) > 3
+        assert False not in canary
+
+    def test_region_delete(self):
+        reg = self._region()
+        reg.set("some key", "some value")
+        reg.delete("some key")
+        reg.delete("some key")
+        eq_(reg.get("some key"), NO_VALUE)
+
+    def test_region_expire(self):
+        reg = self._region(config_args={"expiration_time":.25})
+        counter = itertools.count(1)
+        def creator():
+            return "some value %d" % next(counter)
+        eq_(reg.get_or_create("some key", creator), "some value 1")
+        time.sleep(.4)
+        eq_(reg.get("some key"), "some value 1")
+        eq_(reg.get_or_create("some key", creator), "some value 2")
+        eq_(reg.get("some key"), "some value 2")
+
+class _GenericMutexTest(_GenericBackendFixture, TestCase):
+    def test_mutex(self):
+        backend = self._backend()
+        mutex = backend.get_mutex("foo")
+
+        ac = mutex.acquire()
+        assert ac
+        ac2 = mutex.acquire(wait=False)
+        assert not ac2
+        mutex.release()
+        ac3 = mutex.acquire()
+        assert ac3
+        mutex.release()
+
+    def test_mutex_threaded(self):
+        backend = self._backend()
+        mutex = backend.get_mutex("foo")
+
+        lock = Lock()
+        canary = []
+        def f():
+            for x in range(5):
+                mutex = backend.get_mutex("foo")
+                mutex.acquire()
+                for y in range(5):
+                    ack = lock.acquire(False)
+                    canary.append(ack)
+                    time.sleep(.002)
+                    if ack:
+                        lock.release()
+                mutex.release()
+                time.sleep(.02)
+
+        threads = [Thread(target=f) for i in range(5)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        assert False not in canary

tests/cache/test_dbm_backend.py

+from ._fixtures import _GenericBackendTest, _GenericMutexTest
+from . import eq_
+from unittest import TestCase
+from threading import Thread
+import time
+import os
+from nose import SkipTest
+
+try:
+    import fcntl
+except ImportError:
+    raise SkipTest("fcntl not available")
+
+class DBMBackendTest(_GenericBackendTest):
+    backend = "dogpile.cache.dbm"
+
+    config_args = {
+        "arguments":{
+            "filename":"test.dbm"
+        }
+    }
+
+class DBMBackendNoLockTest(_GenericBackendTest):
+    backend = "dogpile.cache.dbm"
+
+    config_args = {
+        "arguments":{
+            "filename":"test.dbm",
+            "rw_lockfile":False,
+            "dogpile_lockfile":False,
+        }
+    }
+
+
+class DBMMutexTest(_GenericMutexTest):
+    backend = "dogpile.cache.dbm"
+
+    config_args = {
+        "arguments":{
+            "filename":"test.dbm"
+        }
+    }
+
+
+def teardown():
+    for fname in os.listdir(os.curdir):
+        if fname.startswith("test.dbm"):
+            os.unlink(fname)

tests/cache/test_decorator.py

+from ._fixtures import _GenericBackendFixture
+from . import eq_
+from unittest import TestCase
+import time
+from dogpile.cache import util
+import inspect
+
+class DecoratorTest(_GenericBackendFixture, TestCase):
+    backend = "dogpile.cache.memory"
+
+    def _fixture(self, namespace=None, expiration_time=None):
+        reg = self._region(config_args={"expiration_time":.25})
+
+        counter = [0]
+        @reg.cache_on_arguments(namespace=namespace, 
+                            expiration_time=expiration_time)
+        def go(a, b):
+            counter[0] +=1
+            return counter[0], a, b
+        return go
+
+    def test_decorator(self):
+        go = self._fixture()
+        eq_(go(1, 2), (1, 1, 2))
+        eq_(go(3, 4), (2, 3, 4))
+        eq_(go(1, 2), (1, 1, 2))
+        time.sleep(.3)
+        eq_(go(1, 2), (3, 1, 2))
+
+    def test_decorator_namespace(self):
+        # TODO: test the namespace actually
+        # working somehow...
+        go = self._fixture(namespace="x")
+        eq_(go(1, 2), (1, 1, 2))
+        eq_(go(3, 4), (2, 3, 4))
+        eq_(go(1, 2), (1, 1, 2))
+        time.sleep(.3)
+        eq_(go(1, 2), (3, 1, 2))
+
+    def test_decorator_custom_expire(self):
+        go = self._fixture(expiration_time=.5)
+        eq_(go(1, 2), (1, 1, 2))
+        eq_(go(3, 4), (2, 3, 4))
+        eq_(go(1, 2), (1, 1, 2))
+        time.sleep(.3)
+        eq_(go(1, 2), (1, 1, 2))
+        time.sleep(.3)
+        eq_(go(1, 2), (3, 1, 2))
+
+    def test_explicit_expire(self):
+        go = self._fixture(expiration_time=1)
+        eq_(go(1, 2), (1, 1, 2))
+        eq_(go(3, 4), (2, 3, 4))
+        eq_(go(1, 2), (1, 1, 2))
+        go.invalidate(1, 2)
+        eq_(go(1, 2), (3, 1, 2))
+
+class KeyGenerationTest(TestCase):
+    def _keygen_decorator(self, namespace=None):
+        canary = []
+        def decorate(fn):
+            canary.append(util.function_key_generator(namespace, fn))
+            return fn
+        return decorate, canary
+
+    def test_keygen_fn(self):
+        decorate, canary = self._keygen_decorator()
+
+        @decorate
+        def one(a, b):
+            pass
+        gen = canary[0]
+
+        eq_(gen(1, 2), "tests.cache.test_decorator:one|1 2")
+        eq_(gen(None, 5), "tests.cache.test_decorator:one|None 5")
+
+    def test_keygen_fn_namespace(self):
+        decorate, canary = self._keygen_decorator("mynamespace")
+
+        @decorate
+        def one(a, b):
+            pass
+        gen = canary[0]
+
+        eq_(gen(1, 2), "tests.cache.test_decorator:one|mynamespace|1 2")
+        eq_(gen(None, 5), "tests.cache.test_decorator:one|mynamespace|None 5")
+
+

tests/cache/test_memcached_backend.py

+from ._fixtures import _GenericBackendTest, _GenericMutexTest
+from . import eq_
+from unittest import TestCase
+from threading import Thread
+import time
+from nose import SkipTest
+
+class _TestMemcachedConn(object):
+    @classmethod
+    def _check_backend_available(cls, backend):
+        try:
+            client = backend._create_client()
+            client.set("x", "y")
+            assert client.get("x") == "y"
+        except:
+            raise SkipTest(
+                "memcached is not running or "
+                "otherwise not functioning correctly")
+
+class _NonDistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
+    region_args = {
+        "key_mangler":lambda x: x.replace(" ", "_")
+    }
+    config_args = {
+        "arguments":{
+            "url":"127.0.0.1:11211"
+        }
+    }
+
+class _DistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
+    region_args = {
+        "key_mangler":lambda x: x.replace(" ", "_")
+    }
+    config_args = {
+        "arguments":{
+            "url":"127.0.0.1:11211",
+            "distributed_lock":True
+        }
+    }
+
+class _DistributedMemcachedMutexTest(_TestMemcachedConn, _GenericMutexTest):
+    config_args = {
+        "arguments":{
+            "url":"127.0.0.1:11211",
+            "distributed_lock":True
+        }
+    }
+
+class PylibmcTest(_NonDistributedMemcachedTest):
+    backend = "dogpile.cache.pylibmc"
+
+class PylibmcDistributedTest(_DistributedMemcachedTest):
+    backend = "dogpile.cache.pylibmc"
+
+class PylibmcDistributedMutexTest(_DistributedMemcachedMutexTest):
+    backend = "dogpile.cache.pylibmc"
+
+class BMemcachedTest(_NonDistributedMemcachedTest):
+    backend = "dogpile.cache.bmemcached"
+
+class BMemcachedDistributedTest(_DistributedMemcachedTest):
+    backend = "dogpile.cache.bmemcached"
+
+class BMemcachedDistributedMutexTest(_DistributedMemcachedMutexTest):
+    backend = "dogpile.cache.bmemcached"
+
+class MemcachedTest(_NonDistributedMemcachedTest):
+    backend = "dogpile.cache.memcached"
+
+class MemcachedDistributedTest(_DistributedMemcachedTest):
+    backend = "dogpile.cache.memcached"
+
+class MemcachedDistributedMutexTest(_DistributedMemcachedMutexTest):
+    backend = "dogpile.cache.memcached"
+
+
+from dogpile.cache.backends.memcached import GenericMemcachedBackend
+from dogpile.cache.backends.memcached import PylibmcBackend
+class MockMemcachedBackend(GenericMemcachedBackend):
+    def _imports(self):
+        pass
+
+    def _create_client(self):
+        return MockClient(self.url)
+
+class MockPylibmcBackend(PylibmcBackend):
+    def _imports(self):
+        pass
+
+    def _create_client(self):
+        return MockClient(self.url, 
+                        binary=self.binary,
+                        behaviors=self.behaviors
+                    )
+
+class MockClient(object):
+    number_of_clients = 0
+
+    def __init__(self, *arg, **kw):
+        self.arg = arg
+        self.kw = kw
+        self.canary = []
+        self._cache = {}
+        MockClient.number_of_clients += 1
+
+    def get(self, key):
+        return self._cache.get(key)
+    def set(self, key, value, **kw):
+        self.canary.append(kw)
+        self._cache[key] = value
+    def delete(self, key):
+        self._cache.pop(key, None)
+    def __del__(self):
+        MockClient.number_of_clients -= 1
+
+class PylibmcArgsTest(TestCase):
+    def test_binary_flag(self):
+        backend = MockPylibmcBackend(arguments={'url':'foo','binary':True})
+        eq_(backend._create_client().kw["binary"], True)
+
+    def test_url_list(self):
+        backend = MockPylibmcBackend(arguments={'url':["a", "b", "c"]})
+        eq_(backend._create_client().arg[0], ["a", "b", "c"])
+
+    def test_url_scalar(self):
+        backend = MockPylibmcBackend(arguments={'url':"foo"})
+        eq_(backend._create_client().arg[0], ["foo"])
+
+    def test_behaviors(self):
+        backend = MockPylibmcBackend(arguments={'url':"foo", 
+                                    "behaviors":{"q":"p"}})
+        eq_(backend._create_client().kw["behaviors"], {"q": "p"})
+
+    def test_set_time(self):
+        backend = MockPylibmcBackend(arguments={'url':"foo", 
+                                "memcached_expire_time":20})
+        backend.set("foo", "bar")
+        eq_(backend._clients.memcached.canary, [{"time":20}])
+
+    def test_set_min_compress_len(self):
+        backend = MockPylibmcBackend(arguments={'url':"foo", 
+                                "min_compress_len":20})
+        backend.set("foo", "bar")
+        eq_(backend._clients.memcached.canary, [{"min_compress_len":20}])
+
+    def test_no_set_args(self):
+        backend = MockPylibmcBackend(arguments={'url':"foo"})
+        backend.set("foo", "bar")
+        eq_(backend._clients.memcached.canary, [{}])
+
+class LocalThreadTest(TestCase):
+    def setUp(self):
+        import gc
+        gc.collect()
+        eq_(MockClient.number_of_clients, 0)
+
+    def test_client_cleanup_1(self):
+        self._test_client_cleanup(1)
+
+    def test_client_cleanup_3(self):
+        self._test_client_cleanup(3)
+
+    def test_client_cleanup_10(self):
+        self._test_client_cleanup(10)
+
+    def _test_client_cleanup(self, count):
+        backend = MockMemcachedBackend(arguments={'url':'foo'})
+        canary = []
+
+        def f():
+            backend._clients.memcached
+            canary.append(MockClient.number_of_clients)
+            time.sleep(.05)
+
+        threads = [Thread(target=f) for i in range(count)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        eq_(canary, [i + 1 for i in range(count)])
+        eq_(MockClient.number_of_clients, 0)
+
+

tests/cache/test_memory_backend.py

+from ._fixtures import _GenericBackendTest
+
+class MemoryBackendTest(_GenericBackendTest):
+    backend = "dogpile.cache.memory"
+

tests/cache/test_region.py

+from unittest import TestCase
+from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache import make_region, register_backend, CacheRegion
+from . import eq_, assert_raises_message
+import time
+import itertools
+
+class MockMutex(object):
+    def __init__(self, key):
+        self.key = key
+    def acquire(self, blocking=True):
+        return True
+    def release(self):
+        return
+
+class MockBackend(CacheBackend):
+    def __init__(self, arguments):
+        self.arguments = arguments
+        self._cache = {}
+    def get_mutex(self, key):
+        return MockMutex(key)
+    def get(self, key):
+        try:
+            return self._cache[key]
+        except KeyError:
+            return NO_VALUE
+    def set(self, key, value):
+        self._cache[key] = value
+    def delete(self, key):
+        self._cache.pop(key, None)
+register_backend("mock", __name__, "MockBackend")
+
+def key_mangler(key):
+    return "HI!" + key
+
+class RegionTest(TestCase):
+
+    def _region(self, init_args={}, config_args={}, backend="mock"):
+        reg = CacheRegion(**init_args)
+        reg.configure(backend, **config_args)
+        return reg
+
+    def test_instance_from_dict(self):
+        my_conf = { 
+            'cache.example.backend': 'mock',
+            'cache.example.expiration_time': 600,
+            'cache.example.arguments.url': '127.0.0.1'
+            } 
+        my_region = make_region()
+        my_region.configure_from_config(my_conf, 'cache.example.')
+        eq_(my_region.expiration_time, 600)
+        assert isinstance(my_region.backend, MockBackend) is True
+        eq_(my_region.backend.arguments, {'url': '127.0.0.1'})
+
+    def test_key_mangler_argument(self):
+        reg = self._region(init_args={"key_mangler":key_mangler})
+        assert reg.key_mangler is key_mangler
+
+        reg = self._region()
+        assert reg.key_mangler is None
+
+        MockBackend.key_mangler = km = lambda self, k: "foo"
+        reg = self._region()
+        eq_(reg.key_mangler("bar"), "foo")
+        MockBackend.key_mangler = None
+
+    def test_key_mangler_impl(self):
+        reg = self._region(init_args={"key_mangler":key_mangler})
+
+        reg.set("some key", "some value")
+        eq_(list(reg.backend._cache), ["HI!some key"])
+        eq_(reg.get("some key"), "some value")
+        eq_(reg.get_or_create("some key", lambda: "some new value"), "some value")
+        reg.delete("some key")
+        eq_(reg.get("some key"), NO_VALUE)
+
+    def test_no_config(self):
+        reg = CacheRegion()
+        assert_raises_message(
+            Exception,
+            "No backend is configured on this region.",
+            getattr, reg, "backend"
+        )
+
+    def test_set_get_value(self):
+        reg = self._region()
+        reg.set("some key", "some value")
+        eq_(reg.get("some key"), "some value")
+
+    def test_set_get_nothing(self):
+        reg = self._region()
+        eq_(reg.get("some key"), NO_VALUE)
+
+    def test_creator(self):
+        reg = self._region()
+        def creator():
+            return "some value"
+        eq_(reg.get_or_create("some key", creator), "some value")
+
+    def test_remove(self):
+        reg = self._region()
+        reg.set("some key", "some value")
+        reg.delete("some key")
+        reg.delete("some key")
+        eq_(reg.get("some key"), NO_VALUE)
+
+    def test_expire(self):
+        reg = self._region(config_args={"expiration_time":1})
+        counter = itertools.count(1)
+        def creator():
+            return "some value %d" % next(counter)
+        eq_(reg.get_or_create("some key", creator), "some value 1")
+        time.sleep(1)
+        eq_(reg.get("some key"), "some value 1")
+        eq_(reg.get_or_create("some key", creator), "some value 2")
+        eq_(reg.get("some key"), "some value 2")
+
+    def test_expire_override(self):
+        reg = self._region(config_args={"expiration_time":5})
+        counter = itertools.count(1)
+        def creator():
+            return "some value %d" % next(counter)
+        eq_(reg.get_or_create("some key", creator, expiration_time=1), 
+                    "some value 1")
+        time.sleep(1)
+        eq_(reg.get("some key"), "some value 1")
+        eq_(reg.get_or_create("some key", creator, expiration_time=1), 
+                    "some value 2")
+        eq_(reg.get("some key"), "some value 2")
+

tests/test_dbm_backend.py

-from tests._fixtures import _GenericBackendTest, _GenericMutexTest
-from tests import eq_
-from unittest import TestCase
-from threading import Thread
-import time
-import os
-from nose import SkipTest
-
-try:
-    import fcntl
-except ImportError:
-    raise SkipTest("fcntl not available")
-
-class DBMBackendTest(_GenericBackendTest):
-    backend = "dogpile.cache.dbm"
-
-    config_args = {
-        "arguments":{
-            "filename":"test.dbm"
-        }
-    }
-
-class DBMBackendNoLockTest(_GenericBackendTest):
-    backend = "dogpile.cache.dbm"
-
-    config_args = {
-        "arguments":{
-            "filename":"test.dbm",
-            "rw_lockfile":False,
-            "dogpile_lockfile":False,
-        }
-    }
-
-
-class DBMMutexTest(_GenericMutexTest):
-    backend = "dogpile.cache.dbm"
-
-    config_args = {
-        "arguments":{
-            "filename":"test.dbm"
-        }
-    }
-
-
-def teardown():
-    for fname in os.listdir(os.curdir):
-        if fname.startswith("test.dbm"):
-            os.unlink(fname)

tests/test_decorator.py

-from tests._fixtures import _GenericBackendFixture
-from tests import eq_
-from unittest import TestCase
-import time
-from dogpile.cache import util
-import inspect
-
-class DecoratorTest(_GenericBackendFixture, TestCase):
-    backend = "dogpile.cache.memory"
-
-    def _fixture(self, namespace=None, expiration_time=None):
-        reg = self._region(config_args={"expiration_time":.25})
-
-        counter = [0]
-        @reg.cache_on_arguments(namespace=namespace, 
-                            expiration_time=expiration_time)
-        def go(a, b):
-            counter[0] +=1
-            return counter[0], a, b
-        return go
-
-    def test_decorator(self):
-        go = self._fixture()
-        eq_(go(1, 2), (1, 1, 2))
-        eq_(go(3, 4), (2, 3, 4))
-        eq_(go(1, 2), (1, 1, 2))
-        time.sleep(.3)
-        eq_(go(1, 2), (3, 1, 2))
-
-    def test_decorator_namespace(self):
-        # TODO: test the namespace actually
-        # working somehow...
-        go = self._fixture(namespace="x")
-        eq_(go(1, 2), (1, 1, 2))
-        eq_(go(3, 4), (2, 3, 4))
-        eq_(go(1, 2), (1, 1, 2))
-        time.sleep(.3)
-        eq_(go(1, 2), (3, 1, 2))
-
-    def test_decorator_custom_expire(self):
-        go = self._fixture(expiration_time=.5)
-        eq_(go(1, 2), (1, 1, 2))
-        eq_(go(3, 4), (2, 3, 4))
-        eq_(go(1, 2), (1, 1, 2))
-        time.sleep(.3)
-        eq_(go(1, 2), (1, 1, 2))
-        time.sleep(.3)
-        eq_(go(1, 2), (3, 1, 2))
-
-    def test_explicit_expire(self):
-        go = self._fixture(expiration_time=1)
-        eq_(go(1, 2), (1, 1, 2))
-        eq_(go(3, 4), (2, 3, 4))
-        eq_(go(1, 2), (1, 1, 2))
-        go.invalidate(1, 2)
-        eq_(go(1, 2), (3, 1, 2))
-
-class KeyGenerationTest(TestCase):
-    def _keygen_decorator(self, namespace=None):
-        canary = []
-        def decorate(fn):
-            canary.append(util.function_key_generator(namespace, fn))
-            return fn
-        return decorate, canary
-
-    def test_keygen_fn(self):
-        decorate, canary = self._keygen_decorator()
-
-        @decorate
-        def one(a, b):
-            pass
-        gen = canary[0]
-
-        eq_(gen(1, 2), "tests.test_decorator:one|1 2")
-        eq_(gen(None, 5), "tests.test_decorator:one|None 5")
-
-    def test_keygen_fn_namespace(self):
-        decorate, canary = self._keygen_decorator("mynamespace")
-
-        @decorate
-        def one(a, b):
-            pass
-        gen = canary[0]
-
-        eq_(gen(1, 2), "tests.test_decorator:one|mynamespace|1 2")
-        eq_(gen(None, 5), "tests.test_decorator:one|mynamespace|None 5")
-
-

tests/test_memcached_backend.py

-from tests._fixtures import _GenericBackendTest, _GenericMutexTest
-from tests import eq_
-from unittest import TestCase
-from threading import Thread
-import time
-from nose import SkipTest
-
-class _TestMemcachedConn(object):
-    @classmethod
-    def _check_backend_available(cls, backend):
-        try:
-            client = backend._create_client()
-            client.set("x", "y")
-            assert client.get("x") == "y"
-        except:
-            raise SkipTest(
-                "memcached is not running or "
-                "otherwise not functioning correctly")
-
-class _NonDistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
-    region_args = {
-        "key_mangler":lambda x: x.replace(" ", "_")
-    }
-    config_args = {
-        "arguments":{
-            "url":"127.0.0.1:11211"
-        }
-    }
-
-class _DistributedMemcachedTest(_TestMemcachedConn, _GenericBackendTest):
-    region_args = {
-        "key_mangler":lambda x: x.replace(" ", "_")
-    }
-    config_args = {
-        "arguments":{
-            "url":"127.0.0.1:11211",
-            "distributed_lock":True
-        }
-    }
-
-class _DistributedMemcachedMutexTest(_TestMemcachedConn, _GenericMutexTest):
-    config_args = {
-        "arguments":{
-            "url":"127.0.0.1:11211",
-            "distributed_lock":True
-        }
-    }
-
-class PylibmcTest(_NonDistributedMemcachedTest):
-    backend = "dogpile.cache.pylibmc"
-
-class PylibmcDistributedTest(_DistributedMemcachedTest):
-    backend = "dogpile.cache.pylibmc"
-
-class PylibmcDistributedMutexTest(_DistributedMemcachedMutexTest):
-    backend = "dogpile.cache.pylibmc"
-
-class BMemcachedTest(_NonDistributedMemcachedTest):
-    backend = "dogpile.cache.bmemcached"
-
-class BMemcachedDistributedTest(_DistributedMemcachedTest):
-    backend = "dogpile.cache.bmemcached"
-
-class BMemcachedDistributedMutexTest(_DistributedMemcachedMutexTest):
-    backend = "dogpile.cache.bmemcached"
-
-class MemcachedTest(_NonDistributedMemcachedTest):
-    backend = "dogpile.cache.memcached"
-
-class MemcachedDistributedTest(_DistributedMemcachedTest):
-    backend = "dogpile.cache.memcached"
-
-class MemcachedDistributedMutexTest(_DistributedMemcachedMutexTest):
-    backend = "dogpile.cache.memcached"
-
-
-from dogpile.cache.backends.memcached import GenericMemcachedBackend
-from dogpile.cache.backends.memcached import PylibmcBackend
-class MockMemcachedBackend(GenericMemcachedBackend):
-    def _imports(self):
-        pass
-
-    def _create_client(self):
-        return MockClient(self.url)
-
-class MockPylibmcBackend(PylibmcBackend):
-    def _imports(self):
-        pass
-
-    def _create_client(self):
-        return MockClient(self.url, 
-                        binary=self.binary,
-                        behaviors=self.behaviors
-                    )
-
-class MockClient(object):
-    number_of_clients = 0
-
-    def __init__(self, *arg, **kw):
-        self.arg = arg
-        self.kw = kw
-        self.canary = []
-        self._cache = {}
-        MockClient.number_of_clients += 1
-
-    def get(self, key):
-        return self._cache.get(key)
-    def set(self, key, value, **kw):
-        self.canary.append(kw)
-        self._cache[key] = value
-    def delete(self, key):
-        self._cache.pop(key, None)
-    def __del__(self):
-        MockClient.number_of_clients -= 1
-
-class PylibmcArgsTest(TestCase):
-    def test_binary_flag(self):
-        backend = MockPylibmcBackend(arguments={'url':'foo','binary':True})
-        eq_(backend._create_client().kw["binary"], True)
-
-    def test_url_list(self):
-        backend = MockPylibmcBackend(arguments={'url':["a", "b", "c"]})
-        eq_(backend._create_client().arg[0], ["a", "b", "c"])
-
-    def test_url_scalar(self):
-        backend = MockPylibmcBackend(arguments={'url':"foo"})
-        eq_(backend._create_client().arg[0], ["foo"])
-
-    def test_behaviors(self):
-        backend = MockPylibmcBackend(arguments={'url':"foo", 
-                                    "behaviors":{"q":"p"}})
-        eq_(backend._create_client().kw["behaviors"], {"q": "p"})
-
-    def test_set_time(self):
-        backend = MockPylibmcBackend(arguments={'url':"foo", 
-                                "memcached_expire_time":20})
-        backend.set("foo", "bar")
-        eq_(backend._clients.memcached.canary, [{"time":20}])
-
-    def test_set_min_compress_len(self):
-        backend = MockPylibmcBackend(arguments={'url':"foo", 
-                                "min_compress_len":20})
-        backend.set("foo", "bar")
-        eq_(backend._clients.memcached.canary, [{"min_compress_len":20}])
-
-    def test_no_set_args(self):
-        backend = MockPylibmcBackend(arguments={'url':"foo"})
-        backend.set("foo", "bar")
-        eq_(backend._clients.memcached.canary, [{}])
-
-class LocalThreadTest(TestCase):
-    def setUp(self):
-        import gc
-        gc.collect()
-        eq_(MockClient.number_of_clients, 0)
-
-    def test_client_cleanup_1(self):
-        self._test_client_cleanup(1)
-
-    def test_client_cleanup_3(self):
-        self._test_client_cleanup(3)
-
-    def test_client_cleanup_10(self):
-        self._test_client_cleanup(10)
-
-    def _test_client_cleanup(self, count):
-        backend = MockMemcachedBackend(arguments={'url':'foo'})
-        canary = []
-
-        def f():
-            backend._clients.memcached
-            canary.append(MockClient.number_of_clients)
-            time.sleep(.05)
-
-        threads = [Thread(target=f) for i in xrange(count)]
-        for t in threads:
-            t.start()
-        for t in threads:
-            t.join()
-        eq_(canary, [i + 1 for i in xrange(count)])
-        eq_(MockClient.number_of_clients, 0)
-
-

tests/test_memory_backend.py

-from tests._fixtures import _GenericBackendTest
-
-class MemoryBackendTest(_GenericBackendTest):
-    backend = "dogpile.cache.memory"
-

tests/test_region.py

-from unittest import TestCase
-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
-from dogpile.cache import make_region, register_backend, CacheRegion
-from tests import eq_, assert_raises_message
-import time
-import itertools
-
-class MockMutex(object):
-    def __init__(self, key):
-        self.key = key
-    def acquire(self, blocking=True):
-        return True
-    def release(self):
-        return
-
-class MockBackend(CacheBackend):
-    def __init__(self, arguments):
-        self.arguments = arguments
-        self._cache = {}
-    def get_mutex(self, key):
-        return MockMutex(key)
-    def get(self, key):
-        try:
-            return self._cache[key]
-        except KeyError:
-            return NO_VALUE
-    def set(self, key, value):
-        self._cache[key] = value
-    def delete(self, key):
-        self._cache.pop(key, None)
-register_backend("mock", __name__, "MockBackend")
-
-def key_mangler(key):
-    return "HI!" + key
-
-class RegionTest(TestCase):
-
-    def _region(self, init_args={}, config_args={}, backend="mock"):
-        reg = CacheRegion(**init_args)
-        reg.configure(backend, **config_args)
-        return reg
-
-    def test_instance_from_dict(self):
-        my_conf = { 
-            'cache.example.backend': 'mock',
-            'cache.example.expiration_time': 600,
-            'cache.example.arguments.url': '127.0.0.1'
-            } 
-        my_region = make_region()
-        my_region.configure_from_config(my_conf, 'cache.example.')
-        eq_(my_region.expiration_time, 600)
-        assert isinstance(my_region.backend, MockBackend) is True
-        eq_(my_region.backend.arguments, {'url': '127.0.0.1'})
-
-    def test_key_mangler_argument(self):
-        reg = self._region(init_args={"key_mangler":key_mangler})
-        assert reg.key_mangler is key_mangler
-
-        reg = self._region()
-        assert reg.key_mangler is None
-
-        MockBackend.key_mangler = km = lambda self, k: "foo"
-        reg = self._region()
-        eq_(reg.key_mangler("bar"), "foo")
-        MockBackend.key_mangler = None
-
-    def test_key_mangler_impl(self):
-        reg = self._region(init_args={"key_mangler":key_mangler})
-
-        reg.set("some key", "some value")
-        eq_(list(reg.backend._cache), ["HI!some key"])
-        eq_(reg.get("some key"), "some value")
-        eq_(reg.get_or_create("some key", lambda: "some new value"), "some value")
-        reg.delete("some key")
-        eq_(reg.get("some key"), NO_VALUE)
-
-    def test_no_config(self):
-        reg = CacheRegion()
-        assert_raises_message(
-            Exception,
-            "No backend is configured on this region.",
-            getattr, reg, "backend"
-        )
-
-    def test_set_get_value(self):
-        reg = self._region()
-        reg.set("some key", "some value")
-        eq_(reg.get("some key"), "some value")
-
-    def test_set_get_nothing(self):
-        reg = self._region()
-        eq_(reg.get("some key"), NO_VALUE)
-
-    def test_creator(self):
-        reg = self._region()
-        def creator():
-            return "some value"
-        eq_(reg.get_or_create("some key", creator), "some value")
-
-    def test_remove(self):
-        reg = self._region()
-        reg.set("some key", "some value")
-        reg.delete("some key")
-        reg.delete("some key")
-        eq_(reg.get("some key"), NO_VALUE)
-
-    def test_expire(self):
-        reg = self._region(config_args={"expiration_time":1})
-        counter = itertools.count(1)
-        def creator():
-            return "some value %d" % next(counter)
-        eq_(reg.get_or_create("some key", creator), "some value 1")
-        time.sleep(1)
-        eq_(reg.get("some key"), "some value 1")
-        eq_(reg.get_or_create("some key", creator), "some value 2")
-        eq_(reg.get("some key"), "some value 2")
-
-    def test_expire_override(self):
-        reg = self._region(config_args={"expiration_time":5})
-        counter = itertools.count(1)
-        def creator():
-            return "some value %d" % next(counter)
-        eq_(reg.get_or_create("some key", creator, expiration_time=1), 
-                    "some value 1")
-        time.sleep(1)
-        eq_(reg.get("some key"), "some value 1")
-        eq_(reg.get_or_create("some key", creator, expiration_time=1), 
-                    "some value 2")
-        eq_(reg.get("some key"), "some value 2")
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.