Mike Bayer avatar Mike Bayer committed d666f1c

- Fixed some deep "column correspondence" issues which could
impact a Query made against a selectable containing
multiple versions of the same table, as well as
unions and similar which contained the same table columns
in different column positions at different levels.

Comments (0)

Files changed (5)

       next compile() call.  This issue occurs frequently
       when using declarative.
+    - Fixed some deep "column correspondence" issues which could
+      impact a Query made against a selectable containing
+      multiple versions of the same table, as well as 
+      unions and similar which contained the same table columns
+      in different column positions at different levels.
+      [ticket:1268]
     - Custom comparator classes used in conjunction with 
       column_property(), relation() etc. can define 
       new comparison methods on the Comparator, which will


         result = util.column_dict()
         def visit_binary(binary):
             if binary.operator == operators.eq:


 from sqlalchemy.sql import operators
 from sqlalchemy.sql.visitors import Visitable, cloned_traverse
 from sqlalchemy import types as sqltypes
+import operator
 functions, schema, sql_util = None, None, None
 DefaultDialect, ClauseAdapter, Annotated = None, None, None
         for c in cols:
             i = c.proxy_set.intersection(target_set)
             if i and \
-                (not require_embedded or c.proxy_set.issuperset(target_set)) and \
-                (intersect is None or len(i) > len(intersect)):
-                col, intersect = c, i
+                (not require_embedded or c.proxy_set.issuperset(target_set)):
+                if col is None:
+                    # no corresponding column yet, pick this one.
+                    col, intersect = c, i
+                elif len(i) > len(intersect):
+                    # 'c' has a larger field of correspondence than 'col'.
+                    # i.e. selectable.c.a1_x->a1.c.x->table.c.x matches a1.c.x->table.c.x better than 
+                    # selectable.c.x->table.c.x does.
+                    col, intersect = c, i
+                elif i == intersect:
+                    # they have the same field of correspondence.
+                    # see which proxy_set has fewer columns in it, which indicates a
+                    # closer relationship with the root column.  Also take into account the 
+                    # "weight" attribute which CompoundSelect() uses to give higher precedence to
+                    # columns based on vertical position in the compound statement, and discard columns
+                    # that have no reference to the target column (also occurs with CompoundSelect)
+                    col_distance = util.reduce(operator.add, 
+                                        [sc._annotations.get('weight', 1) for sc in col.proxy_set if sc.shares_lineage(column)]
+                                    )
+                    c_distance = util.reduce(operator.add, 
+                                        [sc._annotations.get('weight', 1) for sc in c.proxy_set if sc.shares_lineage(column)]
+                                    )
+                    if \
+                        c_distance < col_distance:
+                        col, intersect = c, i
         return col
     def _populate_column_collection(self):
         for cols in zip(*[s.c for s in self.selects]):
             proxy = cols[0]._make_proxy(self, name=self.use_labels and cols[0]._label or None)
-            proxy.proxies = cols
+            # place a 'weight' annotation corresponding to how low in the list of select()s
+            # the column occurs, so that the corresponding_column() operation
+            # can resolve conflicts
+            proxy.proxies = [c._annotate({'weight':i + 1}) for i, c in enumerate(cols)]
     def _copy_internals(self, clone=_clone):
         self.selects = [clone(s) for s in self.selects]


         sess = create_session()
         oalias = aliased(Order)
         for q in [
             sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
             sess.query(Order, oalias)._from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
+            # same thing, but reversed.  
+            sess.query(oalias, Order)._from_self().filter(oalias.user_id==Order.user_id).filter(oalias.user_id==7).filter(Order.id<oalias.id).order_by(oalias.id, Order.id),
             # here we go....two layers of aliasing
             sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
             (Node(data='n122'), Node(data='n12'), Node(data='n1'))
+        # same, change order around
+        self.assertEquals(
+            sess.query(parent, grandparent, Node).\
+                join((Node.parent, parent), (parent.parent, grandparent)).\
+                    filter(Node.data=='n122').filter(parent.data=='n12').\
+                    filter(grandparent.data=='n1')._from_self().first(),
+            (Node(data='n12'), Node(data='n1'), Node(data='n122'))
+        )
             sess.query(Node, parent, grandparent).\
                 join((Node.parent, parent), (parent.parent, grandparent)).\


-"""tests that various From objects properly export their columns, as well as
-useable primary keys and foreign keys.  Full relational algebra depends on
-every selectable unit behaving nicely with others.."""
+"""Test various algorithmic properties of selectables."""
 import testenv; testenv.configure_for_tests()
 from sqlalchemy import *
 class SelectableTest(TestBase, AssertsExecutionResults):
-    def test_distance(self):
+    def test_distance_on_labels(self):
         # same column three times
         s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')])
         assert s.corresponding_column(s.c.col1) is s.c.col1
         assert s.corresponding_column(s.c.c1) is s.c.c1
+    def test_distance_on_aliases(self):
+        a1 = table1.alias('a1')
+        for s in (
+            select([a1, table1], use_labels=True),
+            select([table1, a1], use_labels=True)
+        ):
+            assert s.corresponding_column(table1.c.col1) is s.c.table1_col1
+            assert s.corresponding_column(a1.c.col1) is s.c.a1_col1
     def test_join_against_self(self):
         jj = select([table1.c.col1.label('bar_col1')])
         jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
         assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
-        # test alias of the join, targets the column with the least
-        # "distance" between the requested column and the returned column
-        # (i.e. there is less indirection between j2.c.table1_col1 and table1.c.col1, than
-        # there is from j2.c.bar_col1 to table1.c.col1)
+        # test alias of the join
         j2 = jjj.alias('foo')
         assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
         assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
         j2 = jjj.alias('foo')
-        print j2.corresponding_column(jjj.c.table1_col1)
         assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
         assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
         s1 = table1.select(use_labels=True)
         s2 = table2.select(use_labels=True)
-        print ["%d %s" % (id(c),c.key) for c in u.c]
         c = u.corresponding_column(s1.c.table1_col2)
-        print "%d %s" % (id(c), c.key)
-        print id(u.corresponding_column(s1.c.table1_col2).table)
-        print id(u.c.col2.table)
         assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
         assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+    def test_union_precedence(self):
+        # conflicting column correspondence should be resolved based on 
+        # the order of the select()s in the union
+        s1 = select([table1.c.col1, table1.c.col2])
+        s2 = select([table1.c.col2, table1.c.col1])
+        s3 = select([table1.c.col3, table1.c.colx])
+        s4 = select([table1.c.colx, table1.c.col3])
+        u1 = union(s1, s2)
+        assert u1.corresponding_column(table1.c.col1) is u1.c.col1
+        assert u1.corresponding_column(table1.c.col2) is u1.c.col2
+        u1 = union(s1, s2, s3, s4)
+        assert u1.corresponding_column(table1.c.col1) is u1.c.col1
+        assert u1.corresponding_column(table1.c.col2) is u1.c.col2
+        assert u1.corresponding_column(table1.c.colx) is u1.c.col2
+        assert u1.corresponding_column(table1.c.col3) is u1.c.col1
     def test_singular_union(self):
         u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
     def test_select_labels(self):
         a = table1.select(use_labels=True)
-        print str(a.select())
         j = join(a, table2)
         criterion = a.c.table1_col1 == table2.c.col2
         j3 = a.join(j2, j2.c.aid==a.c.id)
         j4 = select([j3]).alias('foo')
-        print j4
-        print j4.corresponding_column(j2.c.aid)
-        print j4.c.aid
         assert j4.corresponding_column(j2.c.aid) is j4.c.aid
         assert j4.corresponding_column(a.c.id) is j4.c.id
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.