Commits

J.A. Roberts Tunney committed 88b07f0

added FOR UPDATE db row locking support

  • Participants
  • Parent commits 18ca4fc
  • Branches 1.0.2

Comments (0)

Files changed (13)

File django/db/backends/__init__.py

     # If True, don't use integer foreign keys referring to, e.g., positive
     # integer primary keys.
     related_fields_match_type = False
+    has_select_for_update = False
+    has_select_for_update_nowait = False
 
 class BaseDatabaseOperations(object):
     """
         """
         return '%s'
 
+    def for_update_sql(self, nowait=False):
+        """
+        Return FOR UPDATE SQL clause to lock row for update
+        """
+        if nowait:
+            nowaitstr = ' NOWAIT'
+        else:
+            nowaitstr = ''
+        return 'FOR UPDATE' + nowaitstr
+
     def fulltext_search_sql(self, field_name):
         """
         Returns the SQL WHERE clause to use in order to perform a full-text

File django/db/backends/mysql/base.py

     raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__)
 
 from MySQLdb.converters import conversions
-from MySQLdb.constants import FIELD_TYPE, FLAG
+from MySQLdb.constants import FIELD_TYPE, FLAG, ER
 
 from django.db.backends import *
 from django.db.backends.mysql.client import DatabaseClient
     empty_fetchmany_value = ()
     update_can_self_select = False
     related_fields_match_type = True
+    has_select_for_update = True
+    has_select_for_update_nowait = False
 
 class DatabaseOperations(BaseDatabaseOperations):
     def date_extract_sql(self, lookup_type, field_name):
         # MySQL doesn't support microseconds
         return unicode(value.replace(microsecond=0))
 
+    signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK
+
     def year_lookup_bounds(self, value):
         # Again, no microseconds
         first = '%s-01-01 00:00:00'

File django/db/backends/postgresql_psycopg2/base.py

 try:
     import psycopg2 as Database
     import psycopg2.extensions
+    from psycopg2 import errorcodes
 except ImportError, e:
     from django.core.exceptions import ImproperlyConfigured
     raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
 class DatabaseFeatures(BaseDatabaseFeatures):
     needs_datetime_string_cast = False
     uses_savepoints = True
+    has_select_for_update = True
+    has_select_for_update_nowait = True
 
 class DatabaseOperations(PostgresqlDatabaseOperations):
     def last_executed_query(self, cursor, sql, params):
         # http://www.initd.org/tracker/psycopg/wiki/psycopg2_documentation#postgresql-status-message-and-executed-query
         return cursor.query
 
+    signals_deadlock = lambda self, e: e.pgcode == errorcodes.DEADLOCK_DETECTED
+
+    signals_lock_not_available = lambda self, e: e.pgcode == errorcodes.LOCK_NOT_AVAILABLE
+    
+
 class DatabaseWrapper(BaseDatabaseWrapper):
     operators = {
         'exact': '= %s',

File django/db/models/__init__.py

 from django.db.models.fields.subclassing import SubfieldBase
 from django.db.models.fields.files import FileField, ImageField
 from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel
+from django.db.models.sql.query import LockNotAvailable
 from django.db.models import signals
 
 # Admin stages.

File django/db/models/manager.py

     def order_by(self, *args, **kwargs):
         return self.get_query_set().order_by(*args, **kwargs)
 
+    def select_for_update(self, *args, **kwargs):
+        return self.get_query_set().select_for_update(*args, **kwargs)
+        
     def select_related(self, *args, **kwargs):
         return self.get_query_set().select_related(*args, **kwargs)
 

File django/db/models/query.py

         del_query = self._clone()
 
         # Disable non-supported fields.
+        del_query.query.select_for_update = False
         del_query.query.select_related = False
         del_query.query.clear_ordering()
 
         else:
             return self._filter_or_exclude(None, **filter_obj)
 
+    def select_for_update(self, **kwargs):
+        """
+        Returns a new QuerySet instance that will select objects with a
+        FOR UPDATE lock.
+        """
+        # Default to false for nowait
+        nowait = kwargs.pop('nowait', False)
+        obj = self._clone()
+        obj.query.select_for_update = True
+        obj.query.select_for_update_nowait = nowait
+        return obj
+
     def select_related(self, *fields, **kwargs):
         """
         Returns a new QuerySet instance that will select related objects.

File django/db/models/sql/query.py

 from django.utils.tree import Node
 from django.utils.datastructures import SortedDict
 from django.utils.encoding import force_unicode
-from django.db import connection
+from django.db import connection, DatabaseError
 from django.db.models import signals
 from django.db.models.fields import FieldDoesNotExist
 from django.db.models.query_utils import select_related_descend
 except NameError:
     from sets import Set as set     # Python 2.3 fallback
 
-__all__ = ['Query', 'BaseQuery']
+__all__ = ['Query', 'BaseQuery', 'LockNotAvailable']
+
+class LockNotAvailable(DatabaseError):
+    '''
+    Raised when a query fails because a lock was not available.
+    '''
+    pass
 
 class BaseQuery(object):
     """
         self.order_by = []
         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
         self.distinct = False
+        self.select_for_update = False
+        self.select_for_update_nowait = False
         self.select_related = False
         self.related_select_cols = []
 
         obj.order_by = self.order_by[:]
         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
         obj.distinct = self.distinct
+        obj.select_for_update = self.select_for_update
+        obj.select_for_update_nowait = self.select_for_update_nowait
         obj.select_related = self.select_related
         obj.related_select_cols = []
         obj.max_depth = self.max_depth
         obj = self.clone()
         obj.clear_ordering(True)
         obj.clear_limits()
+        obj.select_for_update = False
         obj.select_related = False
         obj.related_select_cols = []
         obj.related_select_fields = []
                         result.append('LIMIT %d' % val)
                 result.append('OFFSET %d' % self.low_mark)
 
+        if self.select_for_update and self.connection.features.has_select_for_update:
+            nowait = self.select_for_update_nowait and self.connection.features.has_select_for_update
+            result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait))
+
         params.extend(self.extra_params)
         return ' '.join(result), tuple(params)
 
                 return
 
         cursor = self.connection.cursor()
-        cursor.execute(sql, params)
+        try:
+            cursor.execute(sql, params)
+        except DatabaseError, e:
+            if self.connection.features.has_select_for_update_nowait and self.connection.ops.signals_lock_not_available(e):
+                raise LockNotAvailable(*e.args)
+            raise
 
         if not result_type:
             return cursor

File django/views/decorators/deadlock.py

+"""
+Decorators for deadlock handling.
+"""
+import sys
+try:
+    from functools import wraps
+except ImportError:
+    from django.utils.functional import wraps  # Python 2.3, 2.4 fallback.
+
+from django.db import transaction, connection, DatabaseError
+
+class DeadlockError(Exception):
+    """
+    Thrown by a view decorated by handle_deadlock(max_retries) when a deadlock 
+    has been detected and the view won't be called again to retry the aborted 
+    transaction.
+    """
+    pass
+
+def handle_deadlocks(max_retries=2):
+    """
+    Decorator to retry a view when a database deadlock is detected.
+
+    When there are no retries left, raises DeadlockError with the traceback of 
+    the original, database backend-specific exception.
+
+    Views using querysets constructed with select_for_update() should use this 
+    decorator. If the backend does not support locking and/or deadlock 
+    handling, this doesn't do anything.
+    """
+    if connection.features.has_select_for_update:
+        signals_deadlock = connection.ops.signals_deadlock
+    else:
+        return lambda f: f
+    def decorator(func):
+        def inner(*args, **kwargs):
+            retries = 0
+            while 1:
+                try:
+                    return func(*args, **kwargs)
+                except DatabaseError, e:
+                    if signals_deadlock(e):
+                        if retries == max_retries:
+                            raise DeadlockError, 'Deadlock detected', sys.exc_info()[2]
+                        retries += 1
+                        # Rollback needed by PostgreSQL
+                        transaction.rollback()
+                        transaction.set_clean()
+                        continue
+                    raise
+        return wraps(func)(inner)
+    return decorator
+

File docs/ref/databases.txt

 matter unless you're printing out the field values and are expecting to see
 ``True`` and ``False.``.
 
+Row locking with ``QuerySet.select_for_update()``
+-------------------------------------------------
+
+MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 
+statement. However, you may call the ``select_for_update()`` method of a 
+queryset with ``nowait=True``. In that case, the argument will be silently 
+discarded and the generated query will block until the requested lock can be 
+acquired.
+
 .. _sqlite-notes:
 
 SQLite notes 

File docs/ref/models/querysets.txt

 
         Entry.objects.extra(where=['headline=%s'], params=['Lennon'])
 
+``select_for_update(nowait=False)``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Returns a queryset that will lock rows until the end of the transaction, 
+generating a SELECT ... FOR UPDATE statement on supported databases.
+
+For example::
+
+    entries = Entry.objects.select_for_update().filter(author=request.user)
+
+All matched entries will be locked until the end of the transaction block, 
+meaning that other transactions will be prevented from changing or acquiring 
+locks on them.
+
+Usually, if another transaction has already acquired a lock on one of the 
+selected rows, the query will block until the lock is released. If this is 
+not the behaviour you want, call ``select_for_update(nowait=True)``. This will 
+make the call non-blocking. If a conflicting lock is already acquired by 
+another transaction, ``django.db.models.LockNotAvailable`` will be raised when 
+the queryset is evaluated.
+
+Using blocking locks on a database can lead to deadlocks. This occurs when two 
+concurrent transactions are both waiting on a lock the other transaction 
+already holds. To deal with deadlocks, wrap your views that use 
+``select_for_update(nowait=False)`` with the 
+``django.views.decorators.deadlock.handle_deadlocks`` decorator. 
+
+For example::
+
+    from django.db import transaction
+    from django.views.decorators.deadlock import handle_deadlocks
+
+    @handle_deadlocks(max_retries=2)
+    @transaction.commit_on_success
+    def my_view(request):
+        ...
+
+If the database engine detects a deadlock involving ``my_view`` and decides 
+to abort its transaction, it will be automatically retried. If deadlocks keep 
+occurring after two repeated attempts, 
+``django.views.decorators.DeadlockError`` will be raised, which can be 
+propagated to the user or handled in a middleware.
+
+Currently the ``postgresql_psycopg2`` and ``mysql`` database backends 
+support ``select_for_update()`` but MySQL has no support for the ``nowait`` 
+argument. Other backends will simply generate queries as if 
+``select_for_update()`` had not been used.
+
 QuerySet methods that do not return QuerySets
 ---------------------------------------------
 

File tests/regressiontests/select_for_update/__init__.py

+
+

File tests/regressiontests/select_for_update/models.py

+from django.db import models
+
+class Tag(models.Model):
+    name = models.CharField(max_length=10)
+    parent = models.ForeignKey('self', blank=True, null=True,
+            related_name='children')
+
+    class Meta:
+        ordering = ['name']
+
+    def __unicode__(self):
+        return self.name
+

File tests/regressiontests/select_for_update/tests.py

+import time
+import threading
+from unittest import TestCase
+
+from django.conf import settings
+from django.db import transaction, connection
+from django.db.models import LockNotAvailable
+from django.views.decorators.deadlock import DeadlockError, handle_deadlocks
+
+from regressiontests.select_for_update.models import Tag
+
+class SelectForUpdateTests(TestCase):
+
+    def setUp(self):
+        Tag.objects.create(name='1')
+        Tag.objects.create(name='2')
+
+    def test_basics(self):
+        def test():
+            t = Tag(name='update')
+            t.save()
+            transaction.commit()
+            tfound = Tag.objects.select_for_update().get(pk=t.id)
+            tfound.name = 'update2'
+            tfound.save()
+            transaction.commit()
+            tfound = Tag.objects.select_for_update().get(pk=t.id)
+            tfound.delete()
+            transaction.commit()
+        test = transaction.commit_manually(test)
+        test()
+
+    def test_backend_features(self):
+        if settings.DATABASE_ENGINE == 'postgresql_psycopg2':
+            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
+            self.failUnless(hasattr(connection.ops, 'signals_lock_not_available'))
+        elif settings.DATABASE_ENGINE == 'mysql':
+            self.failUnless(hasattr(connection.ops, 'signals_deadlock'))
+    
+    def test_deadlock(self):
+        '''
+        This test will fail on MySQL if the storage engine is not InnoDB.
+        '''
+        # Don't look for deadlocks if the backend doesn't support SELECT FOR UPDATE
+        if not connection.features.has_select_for_update:
+            return
+        def test(max_retries):
+            vars = {0: None, 1: None}
+            def view0():
+                t1 = Tag.objects.select_for_update().get(pk=1)
+                time.sleep(1)
+                t2 = Tag.objects.select_for_update().get(pk=2)
+                transaction.commit()
+            view0 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view0))
+            def view1():
+                t2 = Tag.objects.select_for_update().get(pk=2)
+                time.sleep(1)
+                t1 = Tag.objects.select_for_update().get(pk=1)
+                transaction.commit()
+            view1 = handle_deadlocks(max_retries=max_retries)(transaction.commit_manually(view1))
+            def thread0(vars):
+                try:
+                    view0()
+                except Exception, e:
+                    vars[0] = e
+            def thread1(vars):
+                try:
+                    view1()
+                except Exception, e:
+                    vars[1] = e
+            t0 = threading.Thread(target=thread0, args=(vars,))
+            t1 = threading.Thread(target=thread1, args=(vars,))
+            t0.start()
+            t1.start()
+            t0.join()
+            t1.join()
+            return vars[0], vars[1]
+        # Make a deadlock and don't retry the aborted transaction
+        # We are expecting a DeadlockError
+        e0, e1 = test(0)
+        self.assertEqual(e0 or e1, e1 or e0)
+        self.assert_(isinstance(e0 or e1, DeadlockError))
+        # Make a deadlock and retry the aborted transaction
+        # We expect no errors
+        e0, e1 = test(1)
+        self.assertEqual(e0 or e1, None)
+
+    def test_nowait(self):
+        if not connection.features.has_select_for_update_nowait:
+            return
+        def view():
+            try:
+                t1 = Tag.objects.select_for_update(nowait=True).get(pk=1)
+                time.sleep(1)
+            finally:
+                transaction.rollback()
+        view = transaction.commit_manually(view)
+        t = threading.Thread(target=view)
+        t.start()
+        time.sleep(.25)
+        try:
+            view()
+        except LockNotAvailable:
+            pass
+        else:
+            self.fail('Expected view to raise LockNotAvailable')
+