Mike Bayer avatar Mike Bayer committed f83f21a

- [bug] Fixed bug whereby usage of a UNION
or similar inside of an embedded subquery
would interfere with result-column targeting,
in the case that a result-column had the same
ultimate name as a name inside the embedded
UNION. [ticket:2552]

Comments (0)

Files changed (3)

     could be incorrect in certain "clone+replace"
     scenarios.  [ticket:2518]
 
+  - [bug] Fixed bug whereby usage of a UNION
+    or similar inside of an embedded subquery
+    would interfere with result-column targeting,
+    in the case that a result-column had the same
+    ultimate name as a name inside the embedded
+    UNION. [ticket:2552]
+
 - engine
   - [bug] Fixed bug whereby
     a disconnect detect + dispose that occurs

lib/sqlalchemy/sql/compiler.py

         return func.clause_expr._compiler_dispatch(self, **kwargs)
 
     def visit_compound_select(self, cs, asfrom=False,
-                            parens=True, compound_index=1, **kwargs):
+                            parens=True, compound_index=0, **kwargs):
         entry = self.stack and self.stack[-1] or {}
-        self.stack.append({'from':entry.get('from', None), 'iswrapper':True})
+        self.stack.append({'from': entry.get('from', None),
+                    'iswrapper': not entry})
 
         keyword = self.compound_keywords.get(cs.keyword)
 
                         self.limit_clause(cs) or ""
 
         if self.ctes and \
-            compound_index == 1 and not entry:
+            compound_index == 0 and not entry:
             text = self._render_cte_clause() + text
 
         self.stack.pop(-1)
 
     def visit_select(self, select, asfrom=False, parens=True,
                             iswrapper=False, fromhints=None,
-                            compound_index=1,
+                            compound_index=0,
                             positional_names=None, **kwargs):
 
         entry = self.stack and self.stack[-1] or {}
         # to outermost if existingfroms: correlate_froms =
         # correlate_froms.union(existingfroms)
 
+        populate_result_map = compound_index == 0 and (
+                                not entry or \
+                                entry.get('iswrapper', False)
+                            )
+
         self.stack.append({'from': correlate_froms,
                             'iswrapper': iswrapper})
 
-        populate_result_map = compound_index == 1 and not entry or \
-                                entry.get('iswrapper', False)
         column_clause_args = {'positional_names': positional_names}
 
         # the actual list of columns to print in the SELECT column list.
             text += self.for_update_clause(select)
 
         if self.ctes and \
-            compound_index == 1 and not entry:
+            compound_index == 0 and not entry:
             text = self._render_cte_clause() + text
 
         self.stack.pop(-1)

test/sql/test_compiler.py

 #! coding:utf-8
 
-from test.lib.testing import eq_, assert_raises, assert_raises_message
+from test.lib.testing import eq_, is_, assert_raises, assert_raises_message
 import datetime, re, operator, decimal
 from sqlalchemy import *
 from sqlalchemy import exc, sql, util, types, schema
         self.assert_compile(and_(t.c.id == 1, null()),
                             "foo.id = :id_1 AND NULL")
 
+
+class ResultMapTest(fixtures.TestBase):
+    """test the behavior of the 'entry stack' and the determination
+    when the result_map needs to be populated.
+
+    """
+    def test_compound_populates(self):
+        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+        stmt = select([t]).union(select([t]))
+        comp = stmt.compile()
+        eq_(
+            comp.result_map,
+             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
+             'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)}
+        )
+
+    def test_compound_not_toplevel_doesnt_populate(self):
+        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+        subq = select([t]).union(select([t]))
+        stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a))
+        comp = stmt.compile()
+        eq_(
+            comp.result_map,
+             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}
+        )
+
+    def test_compound_only_top_populates(self):
+        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+        stmt = select([t.c.a]).union(select([t.c.b]))
+        comp = stmt.compile()
+        eq_(
+            comp.result_map,
+             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)},
+        )
+
+    def test_label_conflict_union(self):
+        t1 = Table('t1', MetaData(), Column('a', Integer), Column('b', Integer))
+        t2 = Table('t2', MetaData(), Column('t1_a', Integer))
+        union = select([t2]).union(select([t2])).alias()
+
+        t1_alias = t1.alias()
+        stmt = select([t1, t1_alias]).select_from(
+                        t1.join(union, t1.c.a == union.c.t1_a)).apply_labels()
+        comp = stmt.compile()
+        eq_(
+            set(comp.result_map),
+            set(['t1_1_b', 't1_1_a', 't1_a', 't1_b'])
+        )
+        is_(
+            comp.result_map['t1_a'][1][1], t1.c.a
+        )
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.