result_map targeting when an embedded subquery has cols conflicting with top level cols

Issue #2552 resolved
Mike Bayer repo owner created an issue
from sqlalchemy import *

metadata = MetaData()

caso = Table('caso', metadata,
    Column('id', Integer, primary_key=True),

)

caso_vinculo = Table('caso_vinculo', metadata,
        Column('caso_1_id', Integer, ForeignKey('caso.id')),
        Column('caso_2_id', Integer, ForeignKey('caso.id'))
    )

subq1 = select([               caso_vinculo.c.caso_2_id.label('id2')](caso_vinculo.c.caso_1_id.label('id1'),
)).union(
                    select([caso_vinculo.c.caso_1_id](caso_vinculo.c.caso_2_id,)))
subq2 = subq1.alias()

ca = caso.alias()

stmt = select([ca.c.id](caso.c.id,))

#assert ca.c.id in stmt.apply_labels().compile().result_map['caso_1_id']('caso_1_id')[1](1)

stmt = stmt.select_from(
        caso.outerjoin(subq2, caso.c.id == subq2.c.id1).\
            outerjoin(ca, ca.c.id == subq2.c.id2)
    )

# fails
#assert ca.c.id in stmt.apply_labels().compile().result_map['caso_1_id']('caso_1_id')[1](1)


e = create_engine("sqlite://", echo='debug')
metadata.create_all(e)

e.execute(caso.insert(), {'id': 1}, {'id':2})
e.execute(caso_vinculo.insert(), {'caso_11_id': 1, 'caso_22_id':2})


if False:
    row = e.execute(stmt).first()
    # should give the "ambiguous column" error, 
    # right now gives TypeError: expected string or Unicode object, Column found (c extensions only ! yikes)
    row[caso.c.id](caso.c.id)

stmt = stmt.apply_labels()

result = e.execute(stmt)
row = result.first()
import pdb
pdb.set_trace()
print row[caso.c.id](caso.c.id)
print row[ca.c.id](ca.c.id)

Comments (3)

  1. Mike Bayer reporter

    patch, needs to be adapted for 0.7:

    diff -r 0c101f764d5ba0e86fa35cc2f8cea0f13e8fbc90 lib/sqlalchemy/sql/compiler.py
    --- a/lib/sqlalchemy/sql/compiler.py    Mon Aug 20 18:28:32 2012 -0400
    +++ b/lib/sqlalchemy/sql/compiler.py    Wed Aug 22 02:53:59 2012 -0400
    @@ -584,9 +584,10 @@
             return func.clause_expr._compiler_dispatch(self, **kwargs)
    
         def visit_compound_select(self, cs, asfrom=False,
    -                            parens=True, compound_index=1, **kwargs):
    +                            parens=True, compound_index=0, **kwargs):
             entry = self.stack and self.stack[-1](-1) or {}
    -        self.stack.append({'from':entry.get('from', None), 'iswrapper':True})
    +        self.stack.append({'from': entry.get('from', None),
    +                    'iswrapper': not entry})
    
             keyword = self.compound_keywords.get(cs.keyword)
    
    @@ -607,7 +608,7 @@
                             self.limit_clause(cs) or ""
    
             if self.ctes and \
    -            compound_index == 1 and not entry:
    +            compound_index == 0 and not entry:
                 text = self._render_cte_clause() + text
    
             self.stack.pop(-1)
    @@ -1011,7 +1012,7 @@
    
         def visit_select(self, select, asfrom=False, parens=True,
                                 iswrapper=False, fromhints=None,
    -                            compound_index=1,
    +                            compound_index=0,
                                 positional_names=None, **kwargs):
    
             entry = self.stack and self.stack[-1](-1) or {}
    @@ -1027,11 +1028,14 @@
             # to outermost if existingfroms: correlate_froms =
             # correlate_froms.union(existingfroms)
    
    +        populate_result_map = compound_index == 0 and (
    +                                not entry or \
    +                                entry.get('iswrapper', False)
    +                            )
    +
             self.stack.append({'from': correlate_froms,
                                 'iswrapper': iswrapper})
    
    -        populate_result_map = compound_index == 1 and not entry or \
    -                                entry.get('iswrapper', False)
             column_clause_args = {'positional_names': positional_names}
    
             # the actual list of columns to print in the SELECT column list.
    @@ -1106,7 +1110,7 @@
                 text += self.for_update_clause(select)
    
             if self.ctes and \
    -            compound_index == 1 and not entry:
    +            compound_index == 0 and not entry:
                 text = self._render_cte_clause() + text
    
             self.stack.pop(-1)
    diff -r 0c101f764d5ba0e86fa35cc2f8cea0f13e8fbc90 test/sql/test_compiler.py
    --- a/test/sql/test_compiler.py Mon Aug 20 18:28:32 2012 -0400
    +++ b/test/sql/test_compiler.py Wed Aug 22 02:53:59 2012 -0400
    @@ -1,6 +1,6 @@
     #! coding:utf-8
    
    -from test.lib.testing import eq_, assert_raises, assert_raises_message
    +from test.lib.testing import eq_, is_, assert_raises, assert_raises_message
     import datetime, re, operator, decimal
     from sqlalchemy import *
     from sqlalchemy import exc, sql, util, types, schema
    @@ -3205,3 +3205,54 @@
             self.assert_compile(and_(t.c.id == 1, null()),
                                 "foo.id = :id_1 AND NULL")
    
    +
    +class ResultMapTest(fixtures.TestBase):
    +    """test the behavior of the 'entry stack' and the determination
    +    when the result_map needs to be populated.
    +
    +    """
    +    def test_compound_populates(self):
    +        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
    +        stmt = select([t](t)).union(select([t](t)))
    +        comp = stmt.compile()
    +        eq_(
    +            comp.result_map,
    +             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
    +             'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)}
    +        )
    +
    +    def test_compound_not_toplevel_doesnt_populate(self):
    +        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
    +        subq = select([t](t)).union(select([t](t)))
    +        stmt = select([t.c.a](t.c.a)).select_from(t.join(subq, t.c.a == subq.c.a))
    +        comp = stmt.compile()
    +        eq_(
    +            comp.result_map,
    +             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}
    +        )
    +
    +    def test_compound_only_top_populates(self):
    +        t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
    +        stmt = select([t.c.a](t.c.a)).union(select([t.c.b](t.c.b)))
    +        comp = stmt.compile()
    +        eq_(
    +            comp.result_map,
    +             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)},
    +        )
    +
    +    def test_label_conflict_union(self):
    +        t1 = Table('t1', MetaData(), Column('a', Integer), Column('b', Integer))
    +        t2 = Table('t2', MetaData(), Column('t1_a', Integer))
    +        union = select([t2](t2)).union(select([t2](t2))).alias()
    +
    +        t1_alias = t1.alias()
    +        stmt = select([t1_alias](t1,)).select_from(
    +                        t1.join(union, t1.c.a == union.c.t1_a)).apply_labels()
    +        comp = stmt.compile()
    +        eq_(
    +            set(comp.result_map),
    +            set(['t1_1_a', 't1_a', 't1_b']('t1_1_b',))
    +        )
    +        is_(
    +            comp.result_map['t1_a']('t1_a')[1](1)[1](1), t1.c.a
    +        )
    \ No newline at end of file
    
  2. Log in to comment