Commits

adamv committed 73d1f84

Preliminary fixes for issue 26, with tests.

  • Participants
  • Parent commits 630dc5c

Comments (0)

Files changed (5)

File source/sqlserver_ado/query.py

                 meta = self.get_meta()
                 qn = self.connection.ops.quote_name
                 order = '%s.%s ASC' % (qn(meta.db_table), qn(meta.pk.attname))
+            
+            where_row_num = "%s < _row_num" % (self.low_mark)
+            if self.high_mark:
+                where_row_num += " and _row_num <= %s" % (self.high_mark)
                 
-            where = "%s <= _row_num" % (self.low_mark)
-            if self.high_mark:
-                where += " and _row_num < %s" % (self.high_mark)
-
-            sql = "SELECT * FROM ( SELECT ROW_NUMBER() OVER ( ORDER BY %s) as _row_num, %s) as QQQ where %s"\
-                 % (order, inner_select, where)
+            outer_select, inner_select = self._alias_columns(inner_select)
+            
+            sql = "SELECT _row_num, %s FROM ( SELECT ROW_NUMBER() OVER ( ORDER BY %s) as _row_num, %s) as QQQ where %s"\
+                 % (outer_select, order, inner_select, where_row_num)
             
             return sql, fields
 
+        def _alias_columns(self, sql):
+            """Return tuple of SELECT and FROM clauses, aliasing duplicate column names."""
+            qn = self.connection.ops.quote_name
+
+            outer = list()
+            inner = list()
+            
+            names_seen = list()
+            original_names = sql[0:sql.find(' FROM [')].split(',')
+            for col in original_names:
+                # Col looks like: "[app_table].[column]"; strip out just "column"
+                col_name = col.split('].[')[1][:-1]
+                
+                # If column name was already seen, alias it.
+                if col_name in names_seen:
+                    unique_col_name = qn('%s___%s' % (col_name, names_seen.count(col_name)))
+
+                    outer.append(unique_col_name)
+                    inner.append("%s as %s" % (col, unique_col_name))
+                else:
+                    outer.append(qn(col_name))
+                    inner.append(col)
+
+                names_seen.append(col_name)
+
+            # Add FROM clause back to inner select
+            return ', '.join(outer), ', '.join(inner) + sql[sql.find(' FROM ['):]
+
         def _insert_as_sql(self, *args, **kwargs):
             sql, params = self._parent_as_sql(*args,**kwargs)
             meta = self.get_meta()

File tests/test_main/paging/__init__.py

Empty file added.

File tests/test_main/paging/models.py

+import unittest
+from django.db import models
+from django.core.paginator import QuerySetPaginator
+
+class FirstTable(models.Model):
+    b = models.CharField(max_length=100)
+    # Add a reserved word column; this will get quoted correctly
+    # in queries, but need to make sure paging doesn't break:
+    # * Paging should re-quote alias names correctly
+    # * The string splitting on 'FROM' shouldn't break either
+    c = models.CharField(default=u'test', max_length=10, db_column=u'FROM')
+    
+class SecondTable(models.Model):
+    a = models.ForeignKey(FirstTable)
+    b = models.CharField(max_length=100)
+
+
+class PagingTestCase(unittest.TestCase):
+    def setupPagingData(self):
+        a1 = FirstTable(b='A1')
+        a1.save()
+        
+        a2 = FirstTable(b='A2')
+        a2.save()
+        
+        b1 = SecondTable(a=a1, b='B1')
+        b1.save()
+
+        b2 = SecondTable(a=a1, b='B2')
+        b2.save()
+
+        b3 = SecondTable(a=a1, b='B3')
+        b3.save()
+        
+        return a1.pk
+    
+    def testPagingWithDuplicateColumnNames(self):
+        a1_pk = self.setupPagingData()
+        
+        # Select related data so we get two 'b' columns per row
+        # (and two id columns, too)
+        data = SecondTable.objects.filter(a=a1_pk).order_by('b').select_related(depth=1)
+        
+        # Use a single item per page, to get multiple pages
+        paged_data = QuerySetPaginator(data, 1)
+
+        on_this_page = list(paged_data.page(1).object_list)
+        self.assertEquals(len(on_this_page), 1, 'Too many results on this page.')
+        self.assertEquals(on_this_page[0].b, 'B1')
+        
+        on_this_page = list(paged_data.page(2).object_list)
+        self.assertEquals(len(on_this_page), 1, 'Too many results on this page.')
+        self.assertEquals(on_this_page[0].b, 'B2')
+
+        on_this_page = list(paged_data.page(3).object_list)
+        self.assertEquals(len(on_this_page), 1, 'Too many results on this page.')
+        self.assertEquals(on_this_page[0].b, 'B3')

File tests/test_main/paging/views.py

+# Create your views here.

File tests/test_main/settings.py

 
 INSTALLED_APPS = (
 	'myapp',
+	'paging',
 )