Commits

Ivan Zakrevsky committed 4bc4d96

Improved transactions support

Comments (0)

Files changed (1)

cache_tagging/tagging.py

 import os
 import hashlib
 import random
+import socket
 import threading
 import time
 import warnings
                 or 'value' not in data:
             return data  # Returns native API
 
+        # Just idea, and, maybe, not good idea.
+        # Cache can be many times rewrited on highload with
+        # high overlap of transactions.
+        # if self.transaction.scopes:
+        #     transaction_start_time = self.transaction.scopes[0]['time']
+        #     if self.transaction.delay:
+        #         transaction_start_time -= self.transaction.delay
+        #     if transaction_start_time <= data['time'] and data['thread_id'] != get_thread_id():
+        #         return default
+
         if len(data['tag_versions']):
             tag_caches = self.cache.get_many(
                 list(map(tag_prepare_name, list(data['tag_versions'].keys()))),
         data = {
             'tag_versions': tag_versions,
             'value': value,
+            # 'time': time.time(),
+            # 'thread_id': get_thread_id(),
         }
 
         self.finish(name, tags, version=version)
         return getattr(self.cache, name)
 
 
+def get_thread_id():
+    """Returs id for current thread."""
+    return '{0}.{1}.{2}'.format(
+        socket.gethostname(), os.getpid(), _thread.get_ident()
+    )
+
+
 def tag_prepare_name(name):
     """Adds prefixed namespace for tag name"""
     version = str(__version__).replace('.', '')
 
 def tag_generate_version():
     """ Generates a new unique identifier for tag version."""
-    pid = os.getpid()
-    tid = _thread.get_ident()
-    hash = hashlib.md5("{0}{1}{2}{3}".format(
-        randrange(0, MAX_TAG_KEY),
-        pid,
-        tid,
-        time.time()
+    hash = hashlib.md5("{0}{1}{2}".format(
+        randrange(0, MAX_TAG_KEY), get_thread_id(), time.time()
     )).hexdigest()
     return hash
 
 class Transaction(object):
 
     LOCK_PREFIX = "lock"
+    STATUS_INVALIDATION = 0
+    STATUS_COMMIT = 1
 
     def __init__(self, cache, delay=None, nonrepeatable_reads=False):
         """Constructor of Transaction instance."""
     def get_locked_tag_name(self, tag):
         return '{0}_{1}'.format(self.LOCK_PREFIX, tag)
 
-    def lock_tags(self, tags, version=None):
+    def lock_tags(self, tags, status, version=None):
         """Locks tags for concurrent transactions."""
         if self.nonrepeatable_reads:
-            now = time.time()
             timeout = TAG_LOCKING_TIMEOUT
             if self.delay:
                 timeout += self.delay
+            data = (time.time(), status, get_thread_id())
             self.cache.set_many(
-                {self.get_locked_tag_name(tag): now for tag in tags},
+                {self.get_locked_tag_name(tag): data for tag in tags},
                 timeout, version
             )
 
         if self.nonrepeatable_reads and self.scopes:
             top_scope = self.scopes[0]
             top_scope_tags = top_scope['tags'].get(version, set())
-            concurrent_tags = list(set(tags) - top_scope_tags)
-            cache_names += list(map(self.get_locked_tag_name, concurrent_tags))
+            cache_names += list(map(self.get_locked_tag_name, top_scope_tags))
 
         caches = self.cache.get_many(cache_names, version) or {}
         tag_caches = {k: v for k, v in caches.items() if k in tags}
         locked_tag_caches = {k: v for k, v in caches.items() if k not in tags}
+
         if locked_tag_caches:
             transaction_start_time = top_scope['time']
             if self.delay:
                 transaction_start_time -= self.delay
-            for tag_time in locked_tag_caches.values():
-                if transaction_start_time <= tag_time:
+            current_tread_id = get_thread_id()
+            for tag_time, tag_status, tag_thread_id in locked_tag_caches.values():
+                if (current_tread_id != tag_thread_id
+                    and (transaction_start_time <= tag_time
+                         or tag_status == self.STATUS_INVALIDATION)):
                     raise TagLocked
         return tag_caches
 
         """Just helper for async. Actual for DB replication (slave delay)."""
         for version, tags in scope['tags'].items():
             self.cache.delete_many(list(tags), version=version)
-            self.lock_tags(tags, version)
+            self.lock_tags(tags, self.STATUS_COMMIT, version)
 
     def finish(self):
         """Handles database transaction commit or rollback.
         """Adds cache names to current scope."""
         for scope in self.scopes:
             scope['tags'].setdefault(version, set()).update(tags)
-        self.lock_tags(tags, version)
+        self.lock_tags(tags, self.STATUS_INVALIDATION, version)