Commits

Anonymous committed c1509ca

* memcached backend uses pylibmc.ThreadMappedPool to ensure thread-local
usage of pylibmc when that library is installed.

Comments (0)

Files changed (4)

   the session value in the environ vars when using Beaker with WebTest.
 * Defer running of pkg_resources to look for external cache modules
   until requested. #66
+* memcached backend uses pylibmc.ThreadMappedPool to ensure thread-local
+  usage of pylibmc when that library is installed.
 
 Release 1.5.4 (6/16/2010)
 =========================
 class NamespaceManager(object):
     """Handles dictionary operations and locking for a namespace of
     values.
-    
+
     The implementation for setting and retrieving the namespace data is
     handled by subclasses.
-    
+
     NamespaceManager may be used alone, or may be privately accessed by
     one or more Container objects.  Container objects provide per-key
     services like expiration times and automatic recreation of values.
-    
+
     Multiple NamespaceManagers created with a particular name will all
     share access to the same underlying datasource and will attempt to
     synchronize against a common mutex object.  The scope of this
     sharing may be within a single process or across multiple
     processes, depending on the type of NamespaceManager used.
-    
+
     The NamespaceManager itself is generally threadsafe, except in the
     case of the DBMNamespaceManager in conjunction with the gdbm dbm
     implementation.
 
     """
-    
+
     @classmethod
     def _init_dependencies(cls):
         pass
-        
+
     def __init__(self, namespace):
         self._init_dependencies()
         self.namespace = namespace
-        
+
     def get_creation_lock(self, key):
         raise NotImplementedError()
 
 
     def __getitem__(self, key):
         raise NotImplementedError()
-        
+
     def __setitem__(self, key, value):
         raise NotImplementedError()
-    
+
     def set_value(self, key, value, expiretime=None):
         """Optional set_value() method called by Value.
-        
+
         Allows an expiretime to be passed, for namespace
         implementations which can prune their collections
         using expiretime.
-        
+
         """
         self[key] = value
-        
+
     def __contains__(self, key):
         raise NotImplementedError()
 
     def __delitem__(self, key):
         raise NotImplementedError()
-    
+
     def keys(self):
         raise NotImplementedError()
-    
+
     def remove(self):
         self.do_remove()
-        
+
 
 class OpenResourceNamespaceManager(NamespaceManager):
     """A NamespaceManager where read/write operations require opening/
     closing of a resource which is possibly mutexed.
-    
+
     """
     def __init__(self, namespace):
         NamespaceManager.__init__(self, namespace)
         except:
             self.access_lock.release_read_lock()
             raise
-            
+
     def release_read_lock(self):
         try:
             self.close(checkcount = True)
         finally:
             self.access_lock.release_read_lock()
-        
+
     def acquire_write_lock(self, wait=True): 
         r = self.access_lock.acquire_write_lock(wait)
         try:
         except:
             self.access_lock.release_write_lock()
             raise
-            
+
     def release_write_lock(self): 
         try:
             self.close(checkcount=True)
 
         """
         self.namespace.acquire_read_lock()
-        try:    
+        try:
             return self.namespace.has_key(self.key)
         finally:
             self.namespace.release_read_lock()
 
     def can_have_value(self):
-        return self.has_current_value() or self.createfunc is not None  
+        return self.has_current_value() or self.createfunc is not None
 
     def has_current_value(self):
         self.namespace.acquire_read_lock()
-        try:    
+        try:
             has_value = self.namespace.has_key(self.key)
             if has_value:
                 try:
                 except KeyError:
                     # guard against un-mutexed backends raising KeyError
                     has_value = False
-                    
+
             if not self.createfunc:
                 raise KeyError(self.key)
         finally:
             # occurs when the value is None.  memcached 
             # may yank the rug from under us in which case 
             # that's the result
-            raise KeyError(self.key)            
+            raise KeyError(self.key)
         return stored, expired, value
 
     def set_value(self, value, storedtime=None):
 
 class AbstractDictionaryNSManager(NamespaceManager):
     """A subclassable NamespaceManager that places data in a dictionary.
-    
+
     Subclasses should provide a "dictionary" attribute or descriptor
     which returns a dict-like object.   The dictionary will store keys
     that are local to the "namespace" attribute of this manager, so
     ensure that the dictionary will not be used by any other namespace.
 
     e.g.::
-    
+
         import collections
         cached_data = collections.defaultdict(dict)
-        
+
         class MyDictionaryManager(AbstractDictionaryNSManager):
             def __init__(self, namespace):
                 AbstractDictionaryNSManager.__init__(self, namespace)
                 self.dictionary = cached_data[self.namespace]
-                
+
     The above stores data in a global dictionary called "cached_data",
     which is structured as a dictionary of dictionaries, keyed
     first on namespace name to a sub-dictionary, then on actual
     cache key to value.
-    
+
     """
-    
+
     def get_creation_lock(self, key):
         return NameLock(
             identifier="memorynamespace/funclock/%s/%s" % (self.namespace, key),
 
     def has_key(self, key): 
         return self.dictionary.__contains__(key)
-        
+
     def __setitem__(self, key, value):
         self.dictionary[key] = value
-    
+
     def __delitem__(self, key):
         del self.dictionary[key]
 
     def do_remove(self):
         self.dictionary.clear()
-        
+
     def keys(self):
         return self.dictionary.keys()
-    
+
 class MemoryNamespaceManager(AbstractDictionaryNSManager):
     namespaces = util.SyncDict()
 
     def __init__(self, namespace, dbmmodule=None, data_dir=None, 
             dbm_dir=None, lock_dir=None, digest_filenames=True, **kwargs):
         self.digest_filenames = digest_filenames
-        
+
         if not dbm_dir and not data_dir:
             raise MissingCacheParameter("data_dir or dbm_dir is required")
         elif dbm_dir:
         else:
             self.dbm_dir = data_dir + "/container_dbm"
         util.verify_directory(self.dbm_dir)
-        
+
         if not lock_dir and not data_dir:
             raise MissingCacheParameter("data_dir or lock_dir is required")
         elif lock_dir:
                                       identifiers=[self.namespace],
                                       extension='.dbm',
                                       digest_filenames=self.digest_filenames)
-        
+
         debug("data file %s", self.file)
         self._checkfile()
 
     def get_access_lock(self):
         return file_synchronizer(identifier=self.namespace,
                                  lock_dir=self.lock_dir)
-                                 
+
     def get_creation_lock(self, key):
         return file_synchronizer(
                     identifier = "dbmcontainer/funclock/%s" % self.namespace, 
             for ext in ('db', 'dat', 'pag', 'dir'):
                 if os.access(file + os.extsep + ext, os.F_OK):
                     return True
-                    
+
         return False
-    
+
     def _checkfile(self):
         if not self.file_exists(self.file):
             g = self.dbmmodule.open(self.file, 'c') 
             g.close()
-                
+
     def get_filenames(self):
         list = []
         if os.access(self.file, os.F_OK):
             list.append(self.file)
-            
+
         for ext in ('pag', 'dir', 'db', 'dat'):
             if os.access(self.file + os.extsep + ext, os.F_OK):
                 list.append(self.file + os.extsep + ext)
         if self.dbm is not None:
             debug("closing dbm file %s", self.file)
             self.dbm.close()
-        
+
     def do_remove(self):
         for f in self.get_filenames():
             os.remove(f)
-        
+
     def __getitem__(self, key): 
         return cPickle.loads(self.dbm[key])
 
     def __contains__(self, key): 
         return self.dbm.has_key(key)
-        
+
     def __setitem__(self, key, value):
         self.dbm[key] = cPickle.dumps(value)
 
     def __init__(self, namespace, data_dir=None, file_dir=None, lock_dir=None,
                  digest_filenames=True, **kwargs):
         self.digest_filenames = digest_filenames
-        
+
         if not file_dir and not data_dir:
             raise MissingCacheParameter("data_dir or file_dir is required")
         elif file_dir:
                                       extension='.cache',
                                       digest_filenames=self.digest_filenames)
         self.hash = {}
-        
+
         debug("data file %s", self.file)
 
     def get_access_lock(self):
         return file_synchronizer(identifier=self.namespace,
                                  lock_dir=self.lock_dir)
-                                 
+
     def get_creation_lock(self, key):
         return file_synchronizer(
                 identifier = "filecontainer/funclock/%s" % self.namespace, 
                 lock_dir = self.lock_dir
                 )
-        
+
     def file_exists(self, file):
         return os.access(file, os.F_OK)
 
             fh.close()
 
         self.flags = flags
-        
+
     def do_close(self):
         if self.flags == 'c' or self.flags == 'w':
             fh = open(self.file, 'wb')
 
         self.hash = {}
         self.flags = None
-                
+
     def do_remove(self):
         try:
             os.remove(self.file)
             # but client code has asked for a clear() operation...
             pass
         self.hash = {}
-        
+
     def __getitem__(self, key): 
         return self.hash[key]
 
     def __contains__(self, key): 
         return self.hash.has_key(key)
-        
+
     def __setitem__(self, key, value):
         self.hash[key] = value
 
 namespace_classes = {}
 
 ContainerContext = dict
-    
+
 class ContainerMeta(type):
     def __init__(cls, classname, bases, dict_):
         namespace_classes[cls] = cls.namespace_class

beaker/ext/memcached.py

 from beaker.util import verify_directory, SyncDict
 import warnings
 
-memcache = None
+memcache = pylibmc = None
 
 class MemcachedNamespaceManager(NamespaceManager):
     clients = SyncDict()
         global memcache
         if memcache is not None:
             return
+
         try:
-            import pylibmc as memcache
+            import pylibmc
+            memcache = pylibmc
         except ImportError:
             try:
                 import cmemcache as memcache
                 try:
                     import memcache
                 except ImportError:
-                    raise InvalidCacheBackendError("Memcached cache backend requires either "
-                                                        "the 'memcache' or 'cmemcache' library")
+                    raise InvalidCacheBackendError(
+                            "Memcached cache backend requires one "
+                            "of: 'pylibmc' or 'memcache' to be installed.")
+
+    def __new__(cls, *args, **kwargs):
+        cls._init_dependencies()
+        if pylibmc is not None:
+            return object.__new__(PyLibMCNamespaceManager)
+        else:
+            return object.__new__(MemcachedNamespaceManager)
 
     def __init__(self, namespace, url=None, data_dir=None, lock_dir=None, **params):
         NamespaceManager.__init__(self, namespace)
 
     def get_creation_lock(self, key):
         return file_synchronizer(
-            identifier="memcachedcontainer/funclock/%s" % self.namespace,lock_dir = self.lock_dir)
+            identifier="memcachedcontainer/funclock/%s" % 
+                    self.namespace,lock_dir = self.lock_dir)
 
     def _format_key(self, key):
         return self.namespace + '_' + key.replace(' ', '\302\267')
     def keys(self):
         raise NotImplementedError("Memcache caching does not support iteration of all cache keys")
 
+class PyLibMCNamespaceManager(MemcachedNamespaceManager):
+    """Provide thread-local support for pylibmc."""
+
+    def __init__(self, *arg, **kw):
+        super(PyLibMCNamespaceManager, self).__init__(*arg, **kw)
+        self.pool = pylibmc.ThreadMappedPool(self.mc)
+
+    def __getitem__(self, key):
+        with self.pool.reserve() as mc:
+            return mc.get(self._format_key(key))
+
+    def __contains__(self, key):
+        with self.pool.reserve() as mc:
+            value = mc.get(self._format_key(key))
+            return value is not None
+
+    def has_key(self, key):
+        return key in self
+
+    def set_value(self, key, value, expiretime=None):
+        with self.pool.reserve() as mc:
+            if expiretime:
+                mc.set(self._format_key(key), value, time=expiretime)
+            else:
+                mc.set(self._format_key(key), value)
+
+    def __setitem__(self, key, value):
+        self.set_value(key, value)
+
+    def __delitem__(self, key):
+        with self.pool.reserve() as mc:
+            mc.delete(self._format_key(key))
+
+    def do_remove(self):
+        with self.pool.reserve() as mc:
+            mc.flush_all()
+
 class MemcachedContainer(Container):
     namespace_class = MemcachedNamespaceManager

tests/test_memcached.py

 from beaker.exceptions import InvalidCacheBackendError
 from nose import SkipTest
 from webtest import TestApp
-
+import unittest
 
 try:
     clsmap['ext:memcached']._init_dependencies()
     assert cache.has_key('test')
     assert 'test' in cache
     assert cache.has_key('fred')
-    
+
     # Directly nuke the actual key, to simulate it being removed by memcached
     cache.namespace.mc.delete('test_test')
     assert not cache.has_key('test')
     assert cache.has_key('fred')
-    
+
     # Nuke the keys dict, it might die, who knows
     cache.namespace.mc.delete('test:keys')
     assert cache.has_key('fred')
-    
+
     # And we still need clear to work, even if it won't work well
     cache.clear()
 
 def test_deleting_keys():
     cache = Cache('test', data_dir='./cache', url=mc_url, type='ext:memcached')
     cache.set_value('test', 20)
-    
+
     # Nuke the keys dict, it might die, who knows
     cache.namespace.mc.delete('test:keys')
-    
+
     assert cache.has_key('test')
-    
+
     # make sure we can still delete keys even though our keys dict got nuked
     del cache['test']
-    
+
     assert not cache.has_key('test')
 
 def test_has_key_multicache():
     cache = Cache('test', data_dir='./cache', url=mc_url, type='ext:memcached')
     assert cache.has_key("test")
 
-def test_unicode_keys():    
+def test_unicode_keys():
     cache = Cache('test', data_dir='./cache', url=mc_url, type='ext:memcached')
     o = object()
     cache.set_value(u'hiŏ', o)
     cache.remove_value(u'hiŏ')
     assert u'hiŏ' not in cache
 
-def test_spaces_in_unicode_keys():    
+def test_spaces_in_unicode_keys():
     cache = Cache('test', data_dir='./cache', url=mc_url, type='ext:memcached')
     o = object()
     cache.set_value(u'hi ŏ', o)
     assert 'current value is: 2' in res
     res = app.get('/')
     assert 'current value is: 3' in res
-    
+
     app = TestApp(CacheMiddleware(simple_app))
     res = app.get('/', extra_environ={'beaker.clear':True})
     assert 'current value is: 1' in res
     assert 'current value is: 10' in res
     res = app.get('/')
     assert 'current value is: None' in res
+
+class TestPylibmcInit(unittest.TestCase):
+    def setUp(self):
+
+        from beaker.ext import memcached
+        try:
+            import pylibmc as memcache
+            memcached.pylibmc = memcache
+        except:
+            import memcache
+            memcached.pylibmc = memcache
+            from contextlib import contextmanager
+            class ThreadMappedPool(dict):
+                "a mock of pylibmc's ThreadMappedPool"
+
+                def __init__(self, master):
+                    self.master = master
+
+                @contextmanager
+                def reserve(self):
+                    yield self.master
+            memcached.pylibmc.ThreadMappedPool = ThreadMappedPool
+
+    def tearDown(self):
+        from beaker.ext import memcached
+        memcached.pylibmc = memcached.memcache = None
+
+    def test_uses_pylibmc_client(self):
+        from beaker.ext import memcached
+        cache = Cache('test', data_dir='./cache', url=mc_url, type="ext:memcached")
+        assert isinstance(cache.namespace, memcached.PyLibMCNamespaceManager)
+
+    def test_dont_use_pylibmc_client(self):
+        from beaker.ext import memcached
+        memcached.pylibmc = None
+        cache = Cache('test', data_dir='./cache', url=mc_url, type="ext:memcached")
+        assert not isinstance(cache.namespace, memcached.PyLibMCNamespaceManager)
+        assert isinstance(cache.namespace, memcached.MemcachedNamespaceManager)
+
+    def test_client(self):
+        cache = Cache('test', data_dir='./cache', url=mc_url, type="ext:memcached")
+        o = object()
+        cache.set_value("test", o)
+        assert cache.has_key("test")
+        assert "test" in cache
+        assert not cache.has_key("foo")
+        assert "foo" not in cache
+        cache.remove_value("test")
+        assert not cache.has_key("test")