Source

dogpile.cache-dict-config / tests / _fixtures.py

Full commit
Mike Bayer 8a6571a 
Mike Bayer d35be65 

Mike Bayer 8a6571a 



Mike Bayer d35be65 
Mike Bayer 8a6571a 

Mike Bayer d35be65 
Mike Bayer 8a6571a 


Mike Bayer d35be65 
Mike Bayer b9f7ff4 
Mike Bayer 8a6571a 

Mike Bayer b9f7ff4 




Mike Bayer 8a6571a 



Mike Bayer d35be65 




Mike Bayer 8a6571a 
Mike Bayer d35be65 
Mike Bayer 8a6571a 
Mike Bayer d35be65 


Mike Bayer 8a6571a 

Mike Bayer d35be65 





Mike Bayer b4b1b54 
Mike Bayer d35be65 

























Mike Bayer 8a6571a 



Mike Bayer d35be65 
Mike Bayer 8a6571a 


Mike Bayer d35be65 
Mike Bayer 8a6571a 




Mike Bayer d35be65 



























Mike Bayer 8a6571a 





Mike Bayer d35be65 

Mike Bayer 8a6571a 



Mike Bayer d35be65 
Mike Bayer 8a6571a 


Mike Bayer d35be65 






































from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
from dogpile.cache import register_backend, CacheRegion, util
from dogpile.cache.region import _backend_loader
from tests import eq_, assert_raises_message
import itertools
import time
from nose import SkipTest
from threading import Thread, Lock
from unittest import TestCase

class _GenericBackendFixture(object):
    @classmethod
    def setup_class(cls):
        try:
            backend_cls = _backend_loader.load(cls.backend)
            backend = backend_cls(cls.config_args.get('arguments', {}))
        except ImportError:
            raise SkipTest("Backend %s not installed" % cls.backend)
        cls._check_backend_available(backend)

    @classmethod
    def _check_backend_available(cls, backend):
        pass

    region_args = {}
    config_args = {}

    _region_inst = None
    _backend_inst = None

    def _region(self, region_args={}, config_args={}):
        _region_args = self.region_args.copy()
        _region_args.update(**region_args)
        _config_args = self.config_args.copy()
        _config_args.update(config_args)

        self._region_inst = reg = CacheRegion(**_region_args)
        reg.configure(self.backend, **_config_args)
        return reg

    def _backend(self):
        backend_cls = _backend_loader.load(self.backend)
        _config_args = self.config_args.copy()
        self._backend_inst = backend_cls(_config_args.get('arguments', {}))
        return self._backend_inst

class _GenericBackendTest(_GenericBackendFixture, TestCase):
    def tearDown(self):
        if self._region_inst:
            self._region_inst.delete("some key")
        elif self._backend_inst:
            self._backend_inst.delete("some_key")

    def test_backend_get_nothing(self):
        backend = self._backend()
        eq_(backend.get("some_key"), NO_VALUE)

    def test_backend_delete_nothing(self):
        backend = self._backend()
        backend.delete("some_key")

    def test_backend_set_get_value(self):
        backend = self._backend()
        backend.set("some_key", "some value")
        eq_(backend.get("some_key"), "some value")

    def test_backend_delete(self):
        backend = self._backend()
        backend.set("some_key", "some value")
        backend.delete("some_key")
        eq_(backend.get("some_key"), NO_VALUE)

    def test_region_set_get_value(self):
        reg = self._region()
        reg.set("some key", "some value")
        eq_(reg.get("some key"), "some value")

    def test_region_set_get_nothing(self):
        reg = self._region()
        eq_(reg.get("some key"), NO_VALUE)

    def test_region_creator(self):
        reg = self._region()
        def creator():
            return "some value"
        eq_(reg.get_or_create("some key", creator), "some value")

    def test_threaded_dogpile(self):
        # run a basic dogpile concurrency test.
        # note the concurrency of dogpile itself
        # is intensively tested as part of dogpile.
        reg = self._region(config_args={"expiration_time":.25})
        lock = Lock()
        canary = []
        def creator():
            ack = lock.acquire(False)
            canary.append(ack)
            time.sleep(.5)
            if ack:
                lock.release()
            return "some value"
        def f():
            for x in xrange(5):
                reg.get_or_create("some key", creator)
                time.sleep(.5)

        threads = [Thread(target=f) for i in xrange(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        assert len(canary) > 3
        assert False not in canary

    def test_region_delete(self):
        reg = self._region()
        reg.set("some key", "some value")
        reg.delete("some key")
        reg.delete("some key")
        eq_(reg.get("some key"), NO_VALUE)

    def test_region_expire(self):
        reg = self._region(config_args={"expiration_time":.25})
        counter = itertools.count(1)
        def creator():
            return "some value %d" % next(counter)
        eq_(reg.get_or_create("some key", creator), "some value 1")
        time.sleep(.4)
        eq_(reg.get("some key"), "some value 1")
        eq_(reg.get_or_create("some key", creator), "some value 2")
        eq_(reg.get("some key"), "some value 2")

class _GenericMutexTest(_GenericBackendFixture, TestCase):
    def test_mutex(self):
        backend = self._backend()
        mutex = backend.get_mutex("foo")

        ac = mutex.acquire()
        assert ac
        ac2 = mutex.acquire(wait=False)
        assert not ac2
        mutex.release()
        ac3 = mutex.acquire()
        assert ac3
        mutex.release()

    def test_mutex_threaded(self):
        backend = self._backend()
        mutex = backend.get_mutex("foo")

        lock = Lock()
        canary = []
        def f():
            for x in xrange(5):
                mutex = backend.get_mutex("foo")
                mutex.acquire()
                for y in xrange(5):
                    ack = lock.acquire(False)
                    canary.append(ack)
                    time.sleep(.002)
                    if ack:
                        lock.release()
                mutex.release()
                time.sleep(.02)

        threads = [Thread(target=f) for i in xrange(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        assert False not in canary