Commits

zzzeek  committed d08712a

[svn] - simplified some unit tests that had platform-dependent assumptions
- fixed Synchronizer so that the Synchronizer itself can be shared among threads
in all cases
- removed thread-localness of NamespaceManager object as stored in ContainerContext

  • Participants
  • Parent commits bd778da

Comments (0)

Files changed (4)

File beaker/container.py

     def __init__(self):
         self.registry = {}
 
-
     def get_namespace_manager(self, namespace, container_class, **kwargs):
         """return a NamespaceManager corresponding to the given namespace name and container class.
         
                 additional keyword arguments will be used as constructor arguments for the
                 Namespace, in the case that one does not exist already.
         """
-        key = str(_thread.get_ident()) + "|" + container_class.__name__ + "|" + namespace
+        key = container_class.__name__ + "|" + namespace
         try:
             return self.registry[key]
         except KeyError:
             return self.registry.setdefault(key, container_class.create_namespace(namespace, **kwargs))
     
-
-
+    def clear(self):
+        self.registry.clear()
+        
 class Container(object):
     """represents a value, its stored time, and a value creation function corresponding to 
     a particular key in a particular namespace.
             self.release_write_lock()
         
 class CreationAbortedError(Exception):
-    """a special exception that allows a creation function to abort what its doing"""
+    """an exception that allows a creation function to abort what its doing"""
     
     pass
 

File beaker/synchronization.py

     except ImportError:
         has_flock = False
 
-from beaker.util import ThreadLocal, WeakValuedRegistry, encoded_path
+from beaker import util
 from beaker.exceptions import LockError
 
 class NameLock(object):
     the name alone, and synchronize operations related to that name.
     """
      
-    locks = WeakValuedRegistry()
+    locks = util.WeakValuedRegistry()
 
     class NLContainer:
         """cant put Lock as a weakref"""
 
 
 
-synchronizers = WeakValuedRegistry()
 
+class Synchronizer(object):
+    """a read-many/single-writer synchronizer which globally synchronizes on a given string name."""
     
-def Synchronizer(identifier = None, use_files = False, lock_dir = None, digest_filenames = True):
-    """
-    returns an object that synchronizes a block against many simultaneous 
-    read operations and several synchronized write operations. 
+    conditions = util.WeakValuedRegistry()
 
-    Write operations
-    are assumed to be much less frequent than read operations,
-    and receive precedence when they request a write lock.
+    def __init__(self, identifier = None, use_files = False, lock_dir = None, digest_filenames = True):
+        if not has_flock:
+            use_files = False
 
-    uses strategies to determine if locking is performed via threading objects
-    or file objects.
-    
-    the identifier identifies a name this Synchronizer is synchronizing against.
-    All synchronizers of the same identifier will lock against each other, within
-    the effective thread/process scope.
-    
-    use_files determines if this synchronizer will lock against thread mutexes
-    or file locks.  this sets the effective scope of the synchronizer, i.e. 
-    it will lock against other synchronizers in the same process, or against
-    other synchronizers referencing the same filesystem referenced by lock_dir.
-    
-    the acquire/relase methods support nested/reentrant operation within a single 
-    thread via a recursion counter, so that only the outermost call to 
-    acquire/release has any effect.  
-    """
+        if use_files:
+            syncs = util.ThreadLocal(creator=lambda: FileSynchronizer(identifier, lock_dir, digest_filenames))
+            self._get_impl = lambda:syncs.get()
+        else:
+            condition = Synchronizer.conditions.sync_get("condition_%s" % identifier, lambda: ConditionSynchronizer(identifier))
+            self._get_impl = lambda:condition
 
-    if not has_flock:
-        use_files = False
+    def release_read_lock(self):
+        self._get_impl().release_read_lock()
+        
+    def acquire_read_lock(self, wait=True):
+        return self._get_impl().acquire_read_lock(wait=wait)
 
-    if use_files:
-        # FileSynchronizer is one per thread
-        return synchronizers.sync_get("file_%s_%s" % (identifier, _thread.get_ident()), lambda: FileSynchronizer(identifier, lock_dir, digest_filenames))
-    else:
-        # ConditionSynchronizer is shared among threads
-        return synchronizers.sync_get("condition_%s" % identifier, lambda: ConditionSynchronizer(identifier))
-
-
+    def acquire_write_lock(self, wait=True):
+        return self._get_impl().acquire_write_lock(wait=wait)
+        
+    def release_write_lock(self):
+        self._get_impl().release_write_lock()
+        
 class SyncState(object):
-    """used to track the current thread's reading/writing state as well as reentrant block counting"""
+    """used to track the current thread's reading/writing state as well as reentrant block counting."""
     
     def __init__(self):
         self.reentrantcount = 0
         self.reading = False
 
 class SynchronizerImpl(object):
-    """base for the synchronizer implementations.  the acquire/release methods keep track of re-entrant
-    calls within the current thread, and delegate to the do_XXX methods when appropriate."""
+    """base class for synchronizers.  the release/acquire methods may or may not be threadsafe
+    depending on whether the 'state' accessor returns a thread-local instance."""
     
-    def __init__(self, *args, **params):
-        pass
-
     def release_read_lock(self):
         state = self.state
 
         raise NotImplementedError()
     
 class FileSynchronizer(SynchronizerImpl):
-    """a synchronizer using lock files.   as it relies upon flock(), which
-    is not safe to use with the same file descriptor among multiple threads (one file descriptor
-    per thread is OK), 
-    a separate FileSynchronizer must exist in each thread."""
+    """a synchronizer using lock files.   
+    
+    as it relies upon flock, its inherently not threadsafe.  The 
+    Synchronizer container will maintain a unique FileSynchronizer per Synchronizer instance per thread.
+    This works out since the synchronizers are all locking on a file on the filesystem.
+    """
     
     def __init__(self, identifier, lock_dir, digest_filenames):
         self.state = SyncState()
         else:
             lock_dir = lock_dir
 
-        self.filename = encoded_path(lock_dir, [identifier], extension = '.lock', digest = digest_filenames)
+        self.filename = util.encoded_path(lock_dir, [identifier], extension = '.lock', digest = digest_filenames)
 
         self.opened = False
         self.filedesc = None
 
 
 class ConditionSynchronizer(SynchronizerImpl):
-    """a synchronizer using a Condition.  this synchronizer is based on threading.Lock() objects and
-    therefore must be shared among threads."""
+    """a synchronizer using a Condition.  
+    
+    this synchronizer is based on threading.Lock() objects and
+    therefore must be shared among threads, so it is also threadsafe.
+    the "state" variable referenced by the base SynchronizerImpl class
+    is turned into a thread local, and all the do_XXXX methods are synchronized
+    on the condition object.
+    
+    The Synchronizer container will maintain a registry of ConditionSynchronizer
+    objects keyed to the name of the synchronizer.
+    """
     
     def __init__(self, identifier):
-        self.tlocalstate = ThreadLocal(creator = lambda: SyncState())
+        self.tlocalstate = util.ThreadLocal(creator = lambda: SyncState())
 
         # counts how many asynchronous methods are executing
         self.async = 0

File tests/test_container.py

     finally:
         baton = None
         
-def threadtest(cclass, id, statusdict, expiretime, delay, params):
+def threadtest(cclass, id, statusdict, expiretime, delay, threadlocal):
     print "create thread %d starting" % id
     statusdict[id] = True
 
 
     try:
-        container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)
-
+        if threadlocal:
+            container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime)
+        else:
+            container = global_container
+            
         global running
         global totalgets
         try:
         statusdict[id] = False
     
 
-def runtest(cclass, totaltime, expiretime, delay, **params):
+def runtest(cclass, totaltime, expiretime, delay, threadlocal):
 
     statusdict = {}
     global totalcreates
     global totalgets
     totalgets = 0
 
-    container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)
-    container.clear_value()
+    global global_container
+    global_container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime)
+    global_container.clear_value()
 
     global running
     running = True    
     for t in range(1, 20):
-        thread.start_new_thread(threadtest, (cclass, t, statusdict, expiretime, delay, params))
+        thread.start_new_thread(threadtest, (cclass, t, statusdict, expiretime, delay, threadlocal))
         
     time.sleep(totaltime)
     
     print "total object gets %d" % totalgets
 
 class ContainerTest(test_base.MyghtyTest):
-    def _runtest(self, cclass, totaltime, expiretime, delay, **params):
+    def _runtest(self, cclass, totaltime, expiretime, delay):
         print "\ntesting %s for %d secs with expiretime %s delay %d" % (
             cclass, totaltime, expiretime, delay)
         
-        runtest(cclass, totaltime, expiretime, delay, **params)
+        runtest(cclass, totaltime, expiretime, delay, threadlocal=False)
 
         if expiretime is None:
             self.assert_(totalcreates == 1)
 
     def testDbmContainer3(self):
         self.testDbmContainer(expiretime=5, delay=2)
+

File tests/test_syncdict.py

 
 assert(totalremoves + 1 == totalcreates)
 
-
-# the goofydict is designed to act like a weakvaluedictionary,
-# where its values are dereferenced and disposed of 
-# in between a has_key() and a 
-# __getitem__() operation 50% of the time.    
-# the number of creates should be about half of what the 
-# number of gets is.
-class goofydict(dict):
-    def has_key(self, key):
-        if dict.has_key(self, key):
-            if random.random() > 0.5:
-                del self[key]
-            return True
-        else:
-            return False
-
-print "\ntesting with goofy dict"
-runtest(SyncDict(thread.allocate_lock(), goofydict()))
-assert(float(totalcreates) / float(totalgets) < .52
-    and float(totalcreates) / float(totalgets) > .48)
-
-# the weakvaluedictionary test involves newly created items
-# that are instantly disposed since no strong reference exists to them.
-# the number of creates should be equal to the number of gets.
 print "\ntesting with weak dict"
 runtest(SyncDict(thread.allocate_lock(), weakref.WeakValueDictionary()))
-assert(totalcreates == totalgets)