Commits

Mike Bayer committed 3daebff

- rework of Query._adapt_clause to support [ticket:2155],
be clearer in its intent.
- Fine tuning of Query clause adaptation when
from_self(), union(), or other "select from
myself" operation, such that plain SQL expression
elements added to filter(), order_by() etc.
which are present in the nested "from myself"
query *will* be adapted in the same way an ORM
expression element will, since these
elements are otherwise not easily accessible.
[ticket:2155]

  • Participants
  • Parent commits 3251e30

Comments (0)

Files changed (4)

     fixed up some of the error messages tailored
     in [ticket:2069]
 
+  - Fine tuning of Query clause adaptation when
+    from_self(), union(), or other "select from
+    myself" operation, such that plain SQL expression
+    elements added to filter(), order_by() etc.
+    which are present in the nested "from myself" 
+    query *will* be adapted in the same way an ORM
+    expression element will, since these
+    elements are otherwise not easily accessible.
+    [ticket:2155]
+
   - It is an error to call query.get() when the
     given entity is not a single, full class 
     entity or mapper (i.e. a column).  This is

File lib/sqlalchemy/orm/query.py

 from sqlalchemy.orm.util import (
     AliasedClass, ORMAdapter, _entity_descriptor, _entity_info,
     _is_aliased_class, _is_mapped_class, _orm_columns, _orm_selectable,
-    join as orm_join,with_parent, _attr_as_key
+    join as orm_join,with_parent, _attr_as_key, aliased
     )
 
 
 __all__ = ['Query', 'QueryContext', 'aliased']
 
 
-aliased = AliasedClass
-
 def _generative(*assertions):
     """Mark a method as generative."""
 
             if alias:
                 return alias.adapt_clause(element)
 
-    def __replace_element(self, adapters):
-        def replace(elem):
-            if '_halt_adapt' in elem._annotations:
-                return elem
-
-            for adapter in adapters:
-                e = adapter(elem)
-                if e is not None:
-                    return e
-        return replace
-
-    def __replace_orm_element(self, adapters):
-        def replace(elem):
-            if '_halt_adapt' in elem._annotations:
-                return elem
-
-            if "_orm_adapt" in elem._annotations \
-                    or "parententity" in elem._annotations:
-                for adapter in adapters:
-                    e = adapter(elem)
-                    if e is not None:
-                        return e
-        return replace
-
-    @_generative()
-    def _adapt_all_clauses(self):
-        self._disable_orm_filtering = True
-
     def _adapt_col_list(self, cols):
         return [
                     self._adapt_clause(
                     for o in cols
                 ]
 
+    @_generative()
+    def _adapt_all_clauses(self):
+        self._orm_only_adapt = False
+
     def _adapt_clause(self, clause, as_filter, orm_only):
+        """Adapt incoming clauses to transformations which have been applied 
+        within this query."""
+
         adapters = []
+
+        # do we adapt all expression elements or only those
+        # tagged as 'ORM' constructs ?
+        orm_only = getattr(self, '_orm_only_adapt', orm_only)
+
         if as_filter and self._filter_aliases:
             for fa in self._filter_aliases._visitor_iterator:
-                adapters.append(fa.replace)
+                adapters.append(
+                    (
+                        orm_only, fa.replace
+                    )
+                )
 
         if self._from_obj_alias:
-            adapters.append(self._from_obj_alias.replace)
+            # for the "from obj" alias, apply extra rule to the
+            # 'ORM only' check, if this query were generated from a 
+            # subquery of itself, i.e. _from_selectable(), apply adaption
+            # to all SQL constructs.
+            adapters.append(
+                (
+                    getattr(self, '_orm_only_from_obj_alias', orm_only), 
+                    self._from_obj_alias.replace
+                )
+            )
 
         if self._polymorphic_adapters:
-            adapters.append(self.__adapt_polymorphic_element)
+            adapters.append(
+                (
+                    orm_only, self.__adapt_polymorphic_element
+                )
+            )
 
         if not adapters:
             return clause
 
-        if getattr(self, '_disable_orm_filtering', not orm_only):
-            return visitors.replacement_traverse(
-                                clause, 
-                                {'column_collections':False}, 
-                                self.__replace_element(adapters)
-                            )
-        else:
-            return visitors.replacement_traverse(
-                                clause, 
-                                {'column_collections':False}, 
-                                self.__replace_orm_element(adapters)
-                            )
+        def replace(elem):
+            if '_halt_adapt' in elem._annotations:
+                return elem
+
+            for _orm_only, adapter in adapters:
+                # if 'orm only', look for ORM annotations
+                # in the element before adapting.
+                if not _orm_only or \
+                    '_orm_adapt' in elem._annotations or \
+                    "parententity" in elem._annotations:
+
+                    e = adapter(elem)
+                    if e is not None:
+                        return e
+
+        return visitors.replacement_traverse(
+                            clause, 
+                            {'column_collections':False}, 
+                            replace
+                        )
 
     def _entity_zero(self):
         return self._entities[0]
         ):
             self.__dict__.pop(attr, None)
         self._set_select_from(fromclause)
+
+        # this enables clause adaptation for non-ORM
+        # expressions.
+        self._orm_only_from_obj_alias = False
+
         old_entities = self._entities
         self._entities = []
         for e in old_entities:
         # until reset_joinpoint() is called.
         if need_adapter:
             self._filter_aliases = ORMAdapter(right,
-                        equivalents=right_mapper._equivalent_columns,
+                        equivalents=right_mapper and right_mapper._equivalent_columns or {},
                         chain_to=self._filter_aliases)
 
         # if the onclause is a ClauseElement, adapt it with any 

File test/orm/test_froms.py

 
         )
 
+class ColumnAccessTest(QueryTest, AssertsCompiledSQL):
+    """test access of columns after _from_selectable has been applied"""
+
+    __dialect__ = 'default'
+
+    def test_from_self(self):
+        User = self.classes.User
+        sess = create_session()
+
+        q = sess.query(User).from_self()
+        self.assert_compile(
+            q.filter(User.name=='ed'),
+            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+            "anon_1_users_name FROM (SELECT users.id AS users_id, users.name "
+            "AS users_name FROM users) AS anon_1 WHERE anon_1.users_name = "
+            ":name_1"
+        )
+
+    def test_from_self_twice(self):
+        User = self.classes.User
+        sess = create_session()
+
+        q = sess.query(User).from_self(User.id, User.name).from_self()
+        self.assert_compile(
+            q.filter(User.name=='ed'),
+            "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id, "
+            "anon_1.anon_2_users_name AS anon_1_anon_2_users_name FROM "
+            "(SELECT anon_2.users_id AS anon_2_users_id, anon_2.users_name "
+            "AS anon_2_users_name FROM (SELECT users.id AS users_id, "
+            "users.name AS users_name FROM users) AS anon_2) AS anon_1 "
+            "WHERE anon_1.anon_2_users_name = :name_1"
+        )
+
+    def test_select_from(self):
+        User = self.classes.User
+        sess = create_session()
+
+        q = sess.query(User)
+        q = sess.query(User).select_from(q.statement)
+        self.assert_compile(
+            q.filter(User.name=='ed'),
+            "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name "
+            "FROM (SELECT users.id AS id, users.name AS name FROM "
+            "users) AS anon_1 WHERE anon_1.name = :name_1"
+        )
+
+    def test_anonymous_expression(self):
+        from sqlalchemy.sql import column
+
+        sess = create_session()
+        c1, c2 = column('c1'), column('c2')
+        q1 = sess.query(c1, c2).filter(c1 == 'dog')
+        q2 = sess.query(c1, c2).filter(c1 == 'cat')
+        q3 = q1.union(q2)
+        self.assert_compile(
+            q3.order_by(c1),
+            "SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
+            "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
+            "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+            "WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
+        )
+
+    def test_anonymous_expression_from_self_twice(self):
+        from sqlalchemy.sql import column
+
+        sess = create_session()
+        c1, c2 = column('c1'), column('c2')
+        q1 = sess.query(c1, c2).filter(c1 == 'dog')
+        q1 = q1.from_self().from_self()
+        self.assert_compile(
+            q1.order_by(c1),
+            "SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS "
+            "anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 "
+            "AS anon_2_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE c1 = :c1_1) AS "
+            "anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1"
+        )
+
+    def test_anonymous_expression_union(self):
+        from sqlalchemy.sql import column
+
+        sess = create_session()
+        c1, c2 = column('c1'), column('c2')
+        q1 = sess.query(c1, c2).filter(c1 == 'dog')
+        q2 = sess.query(c1, c2).filter(c1 == 'cat')
+        q3 = q1.union(q2)
+        self.assert_compile(
+            q3.order_by(c1),
+            "SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
+            "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
+            "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+            "WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
+        )
+
+    def test_table_anonymous_expression_from_self_twice(self):
+        from sqlalchemy.sql import column, table
+
+        sess = create_session()
+        t1 = table('t1', column('c1'), column('c2'))
+        q1 = sess.query(t1.c.c1, t1.c.c2).filter(t1.c.c1 == 'dog')
+        q1 = q1.from_self().from_self()
+        self.assert_compile(
+            q1.order_by(t1.c.c1),
+            "SELECT anon_1.anon_2_t1_c1 AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 "
+            "AS anon_1_anon_2_t1_c2 FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, "
+            "anon_2.t1_c2 AS anon_2_t1_c2 FROM (SELECT t1.c1 AS t1_c1, t1.c2 "
+            "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 ORDER BY "
+            "anon_1.anon_2_t1_c1"
+        )
+
+    def test_anonymous_labeled_expression(self):
+        from sqlalchemy.sql import column
+
+        sess = create_session()
+        c1, c2 = column('c1'), column('c2')
+        q1 = sess.query(c1.label('foo'), c2.label('bar')).filter(c1 == 'dog')
+        q2 = sess.query(c1.label('foo'), c2.label('bar')).filter(c1 == 'cat')
+        q3 = q1.union(q2)
+        self.assert_compile(
+            q3.order_by(c1),
+            "SELECT anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar FROM "
+            "(SELECT c1 AS foo, c2 AS bar WHERE c1 = :c1_1 UNION SELECT "
+            "c1 AS foo, c2 AS bar WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.foo"
+        )
+
+    def test_anonymous_expression_plus_aliased_join(self):
+        """test that the 'dont alias non-ORM' rule remains for other 
+        kinds of aliasing when _from_selectable() is used."""
+
+        User = self.classes.User
+        Address = self.classes.Address
+        addresses = self.tables.addresses
+
+        sess = create_session()
+        q1 = sess.query(User.id).filter(User.id > 5)
+        q1 = q1.from_self()
+        q1 = q1.join(User.addresses, aliased=True).\
+                order_by(User.id, Address.id, addresses.c.id)
+        self.assert_compile(
+            q1,
+            "SELECT anon_1.users_id AS anon_1_users_id "
+            "FROM (SELECT users.id AS users_id FROM users "
+            "WHERE users.id > :id_1) AS anon_1 JOIN addresses AS addresses_1 "
+            "ON anon_1.users_id = addresses_1.user_id "
+            "ORDER BY anon_1.users_id, addresses_1.id, addresses.id"
+        )
+
 class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL):
     run_setup_mappers = 'once'
 

File test/orm/test_query.py

             use_default_dialect=True
         )
 
+    def test_order_by_anonymous_col(self):
+        User = self.classes.User
+
+        s = Session()
+
+        c1, c2 = column('c1'), column('c2')
+        f = c1.label('foo')
+        q1 = s.query(User, f, c2.label('bar'))
+        q2 = s.query(User, c1.label('foo'), c2.label('bar'))
+        q3 = q1.union(q2)
+
+        self.assert_compile(
+            q3.order_by(c1),
+            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
+            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
+            "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
+            "FROM users) AS anon_1 ORDER BY anon_1.foo"
+        )
+
+        self.assert_compile(
+            q3.order_by(f),
+            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
+            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
+            "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
+            "FROM users) AS anon_1 ORDER BY anon_1.foo"
+        )
+
     def test_union_mapped_colnames_preserved_across_subquery(self):
         User = self.classes.User