Commits

Mike Bayer committed ded95d7

- expand the check to determine if a selectable column is embedded
in the corresponding selectable to take into account clones
of the target column. fixes [ticket:2419]
- have _make_proxy() copy out the _is_clone_of attribute on the
new column so that even more corresponding_column() checks
work as expected for cloned elements.
- add a new test fixture so that mapped tests can be specified
using declarative.

Comments (0)

Files changed (6)

     @collection.internally_instrumented.  
     [ticket:2406]
 
+  - [bug] Fixed bug whereby SQL adaption mechanics
+    would fail in a very nested scenario involving
+    joined-inheritance, joinedload(), limit(), and a
+    derived function in the columns clause.  
+    [ticket:2419]
+
   - [feature] Added the ability to query for
     Table-bound column names when using 
     query(sometable).filter_by(colname=value).  

lib/sqlalchemy/schema.py

 
         c.table = selectable
         selectable._columns.add(c)
+        if selectable._is_clone_of is not None:
+            c._is_clone_of = selectable._is_clone_of.columns[c.name]
         if self.primary_key:
             selectable.primary_key.add(c)
         c.dispatch.after_parent_attach(c, selectable)

lib/sqlalchemy/sql/expression.py

     supports_execution = False
     _from_objects = []
     bind = None
+    _is_clone_of = None
 
     def _clone(self):
         """Create a shallow copy of this ClauseElement.
         f = self
         while f is not None:
             s.add(f)
-            f = getattr(f, '_is_clone_of', None)
+            f = f._is_clone_of
         return s
 
     def __getstate__(self):
                             type_=getattr(self,
                           'type', None))
         co.proxies = [self]
+        if selectable._is_clone_of is not None:
+            co._is_clone_of = \
+                selectable._is_clone_of.columns[key]
         selectable._columns[key] = co
         return co
 
 
         """
 
+        def embedded(expanded_proxy_set, target_set):
+            for t in target_set.difference(expanded_proxy_set):
+                if not set(_expand_cloned([t])
+                            ).intersection(expanded_proxy_set):
+                    return False
+            return True
+
         # dont dig around if the column is locally present
         if self.c.contains_column(column):
             return column
         target_set = column.proxy_set
         cols = self.c
         for c in cols:
-            i = target_set.intersection(itertools.chain(*[p._cloned_set
-                    for p in c.proxy_set]))
+            expanded_proxy_set = set(_expand_cloned(c.proxy_set))
+            i = target_set.intersection(expanded_proxy_set)
             if i and (not require_embedded
-                      or c.proxy_set.issuperset(target_set)):
+                      or embedded(expanded_proxy_set, target_set)):
                 if col is None:
 
                     # no corresponding column yet, pick this one.
                     is_literal=is_literal
                 )
         c.proxies = [self]
+        if selectable._is_clone_of is not None:
+            c._is_clone_of = \
+                selectable._is_clone_of.columns[c.name]
 
         if attach:
             selectable._columns[c.name] = c

test/lib/fixtures.py

 import sys
 import sqlalchemy as sa
 from test.lib.entities import BasicEntity, ComparableEntity
+from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
 
 class TestBase(object):
     # A sequence of database names to always run, regardless of the
 
     @classmethod
     def teardown_class(cls):
-        cls.classes.clear()
-        _ORMTest.teardown_class()
+        cls._teardown_once_class()
         cls._teardown_once_metadata_bind()
 
     def setup(self):
         self._teardown_each_tables()
 
     @classmethod
+    def _teardown_once_class(cls):
+        cls.classes.clear()
+        _ORMTest.teardown_class()
+
+
+    @classmethod
     def _setup_once_classes(cls):
         if cls.run_setup_classes == 'once':
             cls._with_register_classes(cls.setup_classes)
                 cls_registry[classname] = cls
                 return type.__init__(cls, classname, bases, dict_)
 
+
         class _Base(object):
             __metaclass__ = FindFixture
         class Basic(BasicEntity, _Base):
     def setup_mappers(cls):
         pass
 
+class DeclarativeMappedTest(MappedTest):
+    declarative_meta = None
+
+    @classmethod
+    def setup_class(cls):
+        if cls.declarative_meta is None:
+            cls.declarative_meta = sa.MetaData()
+
+        super(DeclarativeMappedTest, cls).setup_class()
+
+    @classmethod
+    def _teardown_once_class(cls):
+        if cls.declarative_meta.tables:
+            cls.declarative_meta.drop_all(testing.db)
+        super(DeclarativeMappedTest, cls)._teardown_once_class()
+
+    @classmethod
+    def _with_register_classes(cls, fn):
+        cls_registry = cls.classes
+        class FindFixtureDeclarative(DeclarativeMeta):
+            def __init__(cls, classname, bases, dict_):
+                cls_registry[classname] = cls
+                return DeclarativeMeta.__init__(
+                        cls, classname, bases, dict_)
+        _DeclBase = declarative_base(metadata=cls.declarative_meta, 
+                            metaclass=FindFixtureDeclarative)
+        class DeclarativeBasic(BasicEntity):
+            pass
+        cls.DeclarativeBasic = _DeclBase
+        fn()
+        if cls.declarative_meta.tables:
+            cls.declarative_meta.create_all(testing.db)

test/orm/inheritance/test_assorted_poly.py

-"""Very old inheritance-related tests.
-
+"""Miscellaneous inheritance-related tests, many very old.
+These are generally tests derived from specific user issues.
 
 """
 
             }
         )
         assert Dude.supervisor.property.direction is MANYTOONE
-        self._dude_roundtrip()
+        self._dude_roundtrip()
+
+
+class Ticket2419Test(fixtures.DeclarativeMappedTest):
+    """Test [ticket:2419]'s test case."""
+
+    @classmethod
+    def setup_classes(cls):
+        Base = cls.DeclarativeBasic
+        class A(Base):
+            __tablename__ = "a"
+
+            id = Column(Integer, primary_key=True)
+
+        class B(Base):
+            __tablename__ = "b"
+
+            id = Column(Integer, primary_key=True)
+            ds = relationship("D")
+            es = relationship("E")
+
+        class C(A):
+            __tablename__ = "c"
+
+            id = Column(Integer, ForeignKey('a.id'), primary_key=True)
+            b_id = Column(Integer, ForeignKey('b.id'))
+            b = relationship("B", primaryjoin=b_id==B.id)
+
+        class D(Base):
+            __tablename__ = "d"
+
+            id = Column(Integer, primary_key=True)
+            b_id = Column(Integer, ForeignKey('b.id'))
+
+        class E(Base):
+            __tablename__ = 'e'
+            id = Column(Integer, primary_key=True)
+            b_id = Column(Integer, ForeignKey('b.id'))
+
+    def test_join_w_eager_w_any(self):
+        A, B, C, D, E = self.classes.A, self.classes.B, \
+                        self.classes.C, self.classes.D, \
+                        self.classes.E
+        s = Session(testing.db)
+
+        b = B(ds=[D()])
+        s.add_all([
+            C(
+                b=b
+            )
+
+        ])
+
+        s.commit()
+
+        q = s.query(B, B.ds.any(D.id==1)).options(joinedload_all("es"))
+        q = q.join(C, C.b_id==B.id)
+        q = q.limit(5)
+        eq_(
+            q.all(),
+            [(b, True)]
+        )

test/sql/test_generative.py

 from sqlalchemy.sql.visitors import *
 from sqlalchemy import util, exc
 from sqlalchemy.sql import util as sql_util
-from test.lib.testing import eq_, assert_raises
+from test.lib.testing import eq_, ne_, assert_raises
 
 class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
     """test ClauseVisitor's traversal, particularly its 
 
     @classmethod
     def setup_class(cls):
-        global t1, t2
+        global t1, t2, t3
         t1 = table("table1",
             column("col1"),
             column("col2"),
             column("col2"),
             column("col3"),
             )
+        t3 = Table('table3', MetaData(), 
+            Column('col1', Integer),
+            Column('col2', Integer)
+        )
 
     def test_binary(self):
         clause = t1.c.col2 == t2.c.col2
             str(f)
         )
 
+    def test_aliased_cloned_column_adapt_inner(self):
+        clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')])
+
+        aliased1 = select([clause.c.col1, clause.c.foo])
+        aliased2 = clause
+        aliased2.c.col1, aliased2.c.foo
+        aliased3 = cloned_traverse(aliased2, {}, {})
+
+        # fixed by [ticket:2419].   the inside columns
+        # on aliased3 have _is_clone_of pointers to those of
+        # aliased2.  corresponding_column checks these 
+        # now.
+        adapter = sql_util.ColumnAdapter(aliased1)
+        f1 = select([
+            adapter.columns[c]
+            for c in aliased2._raw_columns
+        ])
+        f2 = select([
+            adapter.columns[c]
+            for c in aliased3._raw_columns
+        ])
+        eq_(
+            str(f1), str(f2)
+        )
+
+    def test_aliased_cloned_column_adapt_exported(self):
+        clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')])
+
+        aliased1 = select([clause.c.col1, clause.c.foo])
+        aliased2 = clause
+        aliased2.c.col1, aliased2.c.foo
+        aliased3 = cloned_traverse(aliased2, {}, {})
+
+        # also fixed by [ticket:2419].  When we look at the
+        # *outside* columns of aliased3, they previously did not 
+        # have an _is_clone_of pointer.   But we now modified _make_proxy
+        # to assign this.
+        adapter = sql_util.ColumnAdapter(aliased1)
+        f1 = select([
+            adapter.columns[c]
+            for c in aliased2.c
+        ])
+        f2 = select([
+            adapter.columns[c]
+            for c in aliased3.c
+        ])
+        eq_(
+            str(f1), str(f2)
+        )
+
+    def test_aliased_cloned_schema_column_adapt_exported(self):
+        clause = select([t3.c.col1, func.foo(t3.c.col2).label('foo')])
+
+        aliased1 = select([clause.c.col1, clause.c.foo])
+        aliased2 = clause
+        aliased2.c.col1, aliased2.c.foo
+        aliased3 = cloned_traverse(aliased2, {}, {})
+
+        # also fixed by [ticket:2419].  When we look at the
+        # *outside* columns of aliased3, they previously did not 
+        # have an _is_clone_of pointer.   But we now modified _make_proxy
+        # to assign this.
+        adapter = sql_util.ColumnAdapter(aliased1)
+        f1 = select([
+            adapter.columns[c]
+            for c in aliased2.c
+        ])
+        f2 = select([
+            adapter.columns[c]
+            for c in aliased3.c
+        ])
+        eq_(
+            str(f1), str(f2)
+        )
 
     def test_text(self):
         clause = text(
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.