Commits

Anonymous committed c5582d8

fixed foreignkey__isnull=True queries by rewriting them to not use a JOIN

Comments (0)

Files changed (2)

djangotoolbox/db/basecompiler.py

 from django.conf import settings
+from django.db import models
 from django.db.models.sql import aggregates as sqlaggregates
 from django.db.models.sql.compiler import SQLCompiler
-from django.db.models.sql.constants import LOOKUP_SEP, MULTI, SINGLE
-from django.db.models.sql.where import AND, OR
+from django.db.models.sql.constants import LOOKUP_SEP, MULTI, SINGLE, LHS_ALIAS
+from django.db.models.sql.where import AND, OR, Constraint
 from django.db.utils import DatabaseError, IntegrityError
 from django.utils.tree import Node
 import random
 
+def __repr__(self):
+    return '%s, %s, %s, %s' % (self.alias, self.col, self.field.name, self.field.model.__name__)
+Constraint.__repr__ = __repr__
+
 EMULATED_OPS = {
     'exact': lambda x, y: y in x if isinstance(x, (list,tuple)) else x == y,
     'iexact': lambda x, y: x.lower() == y.lower(),
         """
         Returns an iterator over the results from executing this query.
         """
+        self.fix_fk_null_filters(self.query.where)
         self.check_query()
         fields = self.get_fields()
         low_mark = self.query.low_mark
                                     'by non-relational DBs.')
         return fields
 
+    def fix_fk_null_filters(self, filters):
+        # Django doesn't generate correct code for ForeignKey__isnull.
+        # It becomes a JOIN with pk__isnull which won't work on nonrel DBs,
+        # so we rewrite the JOIN here.
+        for index, child in enumerate(filters.children[:]):
+            if isinstance(child, Node):
+                self.fix_fk_null_filters(child)
+                continue
+
+            constraint, lookup_type, annotation, value = child
+            if lookup_type == 'isnull' and \
+                    isinstance(constraint.field, models.ForeignKey) and \
+                    constraint.field.column != constraint.col:
+                constraint.col = constraint.field.column
+                alias = constraint.alias
+                while True:
+                    next_alias = self.query.alias_map[alias][LHS_ALIAS]
+                    if not next_alias:
+                        break
+                    self.query.unref_alias(alias)
+                    if self.query.alias_refcount[alias] < 1:
+                        del self.query.alias_refcount[alias]
+                        del self.query.alias_map[alias]
+                        del self.query.join_map[self.query.rev_join_map[alias]]
+                        del self.query.rev_join_map[alias]
+                        self.query.used_aliases -= set([alias])
+                    alias = next_alias
+                constraint.alias = alias
+ 
     def _get_ordering(self):
         if not self.query.default_ordering:
             ordering = self.query.order_by

djangotoolbox/tests.py

 #from django.db.utils import DatabaseError
 from django.test import TestCase
 
-
 class ListModel(models.Model):
     floating_point = models.FloatField()
     names = ListField(models.CharField(max_length=500))
     names_with_default = ListField(models.CharField(max_length=500), default=[])
     names_nullable = ListField(models.CharField(max_length=500), null=True)
 
+class FKModel(models.Model):
+    fk = models.ForeignKey(ListModel, null=True, blank=True)
+    fk2 = models.ForeignKey('self', null=True, blank=True)
+
 class FilterTest(TestCase):
     floats = [5.3, 2.6, 9.1, 1.58]
     names = [u'Kakashi', u'Naruto', u'Sasuke', u'Sakura',]
         for i, float in enumerate(FilterTest.floats):
             ListModel(floating_point=float, names=FilterTest.names[:i+1]).save()
 
+    def test_fk_null_rewrite(self):
+        # Django doesn't generate correct code for ForeignKey__isnull.
+        # It becomes a JOIN with pk__isnull which won't work on nonrel DBs,
+        # so we rewrite the JOIN in the base backend classes.
+        FKModel().save()
+        FKModel(fk=ListModel.objects.all()[0]).save()
+        self.assertEqual(len(FKModel.objects.filter(fk=None)), 1)
+        self.assertEqual(len(FKModel.objects.filter(fk2=None)), 2)
+        self.assertEqual(len(FKModel.objects.exclude(fk=None)), 1)
+        self.assertEqual(len(FKModel.objects.exclude(fk2=None)), 0)
+
     def test_equals_empty(self):
         self.assertEqual(ListModel.objects.filter(names=[]).count(), 0)