Commits

Ivan Zakrevsky  committed 89ce487

Now supports NONREPEATABLE READS, so works with transaction isolation levels like "Repeatable read" and "Serializable"

  • Participants
  • Parent commits 56fb0a2

Comments (0)

Files changed (4)

File cache_tagging/django_cache_tagging/__init__.py

             self._caches.caches = {}
 
         if key not in self._caches.caches:
+            options = getattr(settings, 'CACHE_TAGGING', {}).get(backend, {})
+            delay = options.get('DELAY', None)
+            nonrepeatable_reads = options.get('NONREPEATABLE_READS', False)
             self._caches.caches['key'] = CacheTagging(
-                django_get_cache(backend, *args, **kwargs)
+                django_get_cache(backend, *args, **kwargs),
+                delay,
+                nonrepeatable_reads
             )
         return self._caches.caches['key']
 

File cache_tagging/django_cache_tagging/tests/__init__.py

     urls = 'cache_tagging.django_cache_tagging.tests.urls'
 
     def setUp(self):
+        cache.clear()
         self.obj1 = FirstTestModel.objects.create(title='title1')
         self.obj2 = SecondTestModel.objects.create(title='title2')
         if 'cache_tagging.middleware.TransactionMiddleware' in settings.MIDDLEWARE_CLASSES:
         self.assertEqual(cache.get('name1', None), None)
 
         # tests for cache.transaction.flush()
+        cache.clear()
         cache.transaction.begin()  # 1
         cache.transaction.begin()  # 2
         cache.set('name1', 'value1', ('tag1', ), 120)

File cache_tagging/tagging.py

 import threading
 import time
 import warnings
+from datetime import datetime, timedelta
 from functools import wraps
 from threading import local
 
     randrange = random.randrange
 
 TAG_TIMEOUT = 24 * 3600
+TAG_LOCKING_TIMEOUT = 5
 MAX_TAG_KEY = 18446744073709551616     # 2 << 63
 
 
 class CacheTagging(object):
     """Tags support for Django cache."""
 
-    def __init__(self, cache):
+    def __init__(self, cache, delay=None, nonrepeatable_reads=False):
         """Constructor of cache instance."""
         self.cache = cache
         self.ignore_descendants = False
         self.ctx = local()
-        self.transaction = Transaction(self)
+        self.transaction = Transaction(self, delay, nonrepeatable_reads)
 
     def get_or_set_callback(self, name, callback, tags=[], timeout=None,
                             version=None, args=None, kwargs=None):
 
         tag_versions = {}
         if len(tags):
+            tag_cache_names = list(map(tag_prepare_name, tags))
+            if self.transaction.is_locked_any_tag(tag_cache_names, version):
+                self.finish(name, tags, version=version)
+                return
             tag_caches = self.cache.get_many(
-                list(map(tag_prepare_name, tags))
+                tag_cache_names
             ) or {}
             tag_new_dict = {}
             for tag in tags:
 
 class Transaction(object):
 
-    def __init__(self, cache):
+    def __init__(self, cache, delay=None, nonrepeatable_reads=False):
         """Constructor of Transaction instance."""
         self.cache = cache
-        self.delay = None
+        self.delay = delay
+        self.nonrepeatable_reads = nonrepeatable_reads
         self.ctx = local()
 
     def __call__(self, f=None):
         self.finish()
         return False
 
+    def get_locked_tag_name(self, tag):
+        return 'lock_{0}'.format(tag)
+
+    def lock_tags(self, tags, version=None):
+        """Locks tags for concurrent transactions."""
+        date = datetime.now()
+        self.cache.set_many(
+            {self.get_locked_tag_name(tag): date for tag in tags},
+            TAG_LOCKING_TIMEOUT,
+            version
+        )
+
+    def is_locked_any_tag(self, tags, version=None):
+        """Returns True, if current transaction has been started earlier than
+
+        any tag has been invalidated by concurent process.
+        Actual for SERIALIZABLE and REPEATABLE READ transaction levels.
+        """
+        if self.nonrepeatable_reads and self.scopes:
+            top_scope = self.scopes[0]
+            thread_tags = top_scope['tags'].get(version, set())
+            concurrent_tags = list(set(tags) - thread_tags)
+            if concurrent_tags:
+                locked_tags = self.cache.get_many(
+                    list(map(self.get_locked_tag_name, concurrent_tags))
+                ) or {}
+                transaction_start_date = top_scope['date']
+                if self.delay:
+                    transaction_start_date -= timedelta(seconds=self.delay)
+                for tag_date in locked_tags.values():
+                    if transaction_start_date <= tag_date:
+                        return True
+        return False
+
     def begin(self):
         """Handles database transaction begin."""
-        self.scopes.append({})
+        self.scopes.append({'date': datetime.now(), 'tags': {}})
         return self
 
     def _finish_delayed(self, scope):
-        """Just helper for async"""
-        for version, tags in scope.items():
+        """Just helper for async. Actual for DB replication (slave delay)."""
+        for version, tags in scope['tags'].items():
             self.cache.delete_many(list(tags), version=version)
+            self.lock_tags(tags, version)
 
     def finish(self):
         """Handles database transaction commit or rollback.
         """
         scope = self.scopes.pop()
         self._finish_delayed(scope)
-        if self.delay:
+        if self.delay and not self.nonrepeatable_reads:
             threading.Timer(self.delay, self._finish_delayed, [scope]).start()
         return self
 
     def add_tags(self, tags, version=None):
         """Adds cache names to current scope."""
         for scope in self.scopes:
-            scope.setdefault(version, set()).update(tags)
+            scope['tags'].setdefault(version, set()).update(tags)
+            self.lock_tags(tags, version)
                 'NAME': ':memory:'
             }
         },
+        CACHE_TAGGING_ = {
+            'default': {
+                'NONREPEATABLE_READS': True,
+                'DELAY': 5,
+            }
+        },
         INSTALLED_APPS = [
             'cache_tagging.django_cache_tagging',
         ],