Commits

Mike Bayer committed 709cd69

- a "having" clause would be copied from the
inside to the outside query if from_self()
were used; in particular this would break
an 0.7 style count() query [ticket:2130].
(also in 0.6.7)

Comments (0)

Files changed (3)

     distinguish between a None set and no actual 
     change, affects [ticket:2127] as well.
 
+  - a "having" clause would be copied from the
+    inside to the outside query if from_self()
+    were used; in particular this would break
+    an 0.7 style count() query [ticket:2130].
+    (also in 0.6.7)
+
 - sql
   - Restored the "catchall" constructor on the base
     TypeEngine class, with a deprecation warning.

lib/sqlalchemy/orm/query.py

 
     @_generative()
     def _from_selectable(self, fromclause):
-        for attr in ('_statement', '_criterion', '_order_by', '_group_by',
-                '_limit', '_offset', '_joinpath', '_joinpoint', 
-                '_distinct'
+        for attr in (
+                '_statement', '_criterion', 
+                '_order_by', '_group_by',
+                '_limit', '_offset', 
+                '_joinpath', '_joinpoint', 
+                '_distinct', '_having'
         ):
             self.__dict__.pop(attr, None)
         self._set_select_from(fromclause)

test/orm/test_froms.py

 
 
 class FromSelfTest(QueryTest, AssertsCompiledSQL):
+    __dialect__ = 'default'
     def test_filter(self):
         User = self.classes.User
 
+        eq_(
+            [User(id=8), User(id=9)],
+            create_session().
+                query(User).
+                filter(User.id.in_([8,9])).
+                from_self().all()
+        )
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9])).from_self().all()
-
-        assert [User(id=8), User(id=9)] == create_session().query(User).order_by(User.id).slice(1,3).from_self().all()
-        assert [User(id=8)] == list(create_session().query(User).filter(User.id.in_([8,9])).from_self().order_by(User.id)[0:1])
+        eq_(
+            [User(id=8), User(id=9)],
+            create_session().query(User).
+                order_by(User.id).slice(1,3).
+                from_self().all()
+        )
+        eq_(
+            [User(id=8)],
+            list(
+                create_session().
+                query(User).
+                filter(User.id.in_([8,9])).
+                from_self().order_by(User.id)[0:1]
+            )
+        )
 
     def test_join(self):
         User, Address = self.classes.User, self.classes.Address
 
-        assert [
+        eq_(
+        [
             (User(id=8), Address(id=2)),
             (User(id=8), Address(id=3)),
             (User(id=8), Address(id=4)),
             (User(id=9), Address(id=5))
-        ] == create_session().query(User).filter(User.id.in_([8,9])).from_self().\
-            join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
+        ],
+            create_session().
+                query(User).
+                filter(User.id.in_([8,9])).
+                from_self().
+                join('addresses').
+                add_entity(Address).
+                order_by(User.id, Address.id).all()
+        )
 
     def test_group_by(self):
         Address = self.classes.Address
 
         eq_(
-            create_session().query(Address.user_id, func.count(Address.id).label('count')).\
-                            group_by(Address.user_id).order_by(Address.user_id).all(),
+            create_session().query(Address.user_id, 
+                            func.count(Address.id).label('count')).\
+                            group_by(Address.user_id).
+                            order_by(Address.user_id).all(),
             [(7, 1), (8, 3), (9, 1)]
         )
 
         eq_(
             create_session().query(Address.user_id, Address.id).\
-                            from_self(Address.user_id, func.count(Address.id)).\
-                            group_by(Address.user_id).order_by(Address.user_id).all(),
+                            from_self(Address.user_id, 
+                                func.count(Address.id)).\
+                            group_by(Address.user_id).
+                            order_by(Address.user_id).all(),
             [(7, 1), (8, 3), (9, 1)]
         )
 
+    def test_having(self):
+        User = self.classes.User
+
+        s = create_session()
+
+        self.assert_compile(
+            s.query(User.id).group_by(User.id).having(User.id>5).
+                    from_self(),
+            "SELECT anon_1.users_id AS anon_1_users_id FROM "
+            "(SELECT users.id AS users_id FROM users GROUP "
+            "BY users.id HAVING users.id > :id_1) AS anon_1"
+        )
+
     def test_no_joinedload(self):
-        """test that joinedloads are pushed outwards and not rendered in subqueries."""
+        """test that joinedloads are pushed outwards and not rendered in
+        subqueries."""
 
         User = self.classes.User
 
 
         s = create_session()
 
-        oracle_as = not testing.against('oracle') and "AS " or ""
-
         self.assert_compile(
-            s.query(User).options(joinedload(User.addresses)).from_self().statement,
-            "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, addresses_1.user_id, "\
-            "addresses_1.email_address FROM (SELECT users.id AS users_id, users.name AS users_name FROM users) %(oracle_as)sanon_1 "\
-            "LEFT OUTER JOIN addresses %(oracle_as)saddresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id" % {
-                'oracle_as':oracle_as
-            }
+            s.query(User).options(joinedload(User.addresses)).
+                from_self().statement,
+            "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, "
+            "addresses_1.user_id, addresses_1.email_address FROM "
+            "(SELECT users.id AS users_id, users.name AS "
+            "users_name FROM users) AS anon_1 LEFT OUTER JOIN "
+            "addresses AS addresses_1 ON anon_1.users_id = "
+            "addresses_1.user_id ORDER BY addresses_1.id"
         )
 
     def test_aliases(self):
 
         ualias = aliased(User)
         eq_(
-            s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).
+            s.query(User, ualias).filter(User.id > ualias.id).
+                    from_self(User.name, ualias.name).
                     order_by(User.name, ualias.name).all(),
             [
                 (u'chuck', u'ed'), 
         )
 
         eq_(
-            s.query(User, ualias).filter(User.id > ualias.id).from_self(User.name, ualias.name).filter(ualias.name=='ed')\
+            s.query(User, ualias).
+                    filter(User.id > ualias.id).
+                    from_self(User.name, ualias.name).
+                    filter(ualias.name=='ed')\
                 .order_by(User.name, ualias.name).all(),
             [(u'chuck', u'ed'), (u'fred', u'ed')]
         )
 
         eq_(
-            s.query(User, ualias).filter(User.id > ualias.id).from_self(ualias.name, Address.email_address).
-                    join(ualias.addresses).order_by(ualias.name, Address.email_address).all(),
+            s.query(User, ualias).
+                    filter(User.id > ualias.id).
+                    from_self(ualias.name, Address.email_address).
+                    join(ualias.addresses).
+                    order_by(ualias.name, Address.email_address).all(),
             [
                 (u'ed', u'fred@fred.com'), 
                 (u'jack', u'ed@bettyboop.com'),