Commits

Mike Bayer committed 7db029c

- [bug] Fixed a regression since 0.6 regarding
result-row targeting. It should be possible
to use a select() statement with string
based columns in it, that is
select(['id', 'name']).select_from('mytable'),
and have this statement be targetable by
Column objects with those names; this is the
mechanism by which
query(MyClass).from_statement(some_statement)
works. At some point the specific case of
using select(['id']), which is equivalent to
select([literal_column('id')]), stopped working
here, so this has been re-instated and of
course tested. [ticket:2558]

Comments (0)

Files changed (4)

     ultimate name as a name inside the embedded
     UNION. [ticket:2552]
 
+  - [bug] Fixed a regression since 0.6 regarding
+    result-row targeting.   It should be possible
+    to use a select() statement with string
+    based columns in it, that is
+    select(['id', 'name']).select_from('mytable'),
+    and have this statement be targetable by
+    Column objects with those names; this is the
+    mechanism by which
+    query(MyClass).from_statement(some_statement)
+    works.  At some point the specific case of
+    using select(['id']), which is equivalent to
+    select([literal_column('id')]), stopped working
+    here, so this has been re-instated and of
+    course tested. [ticket:2558]
+
 - engine
   - [bug] Fixed bug whereby
     a disconnect detect + dispose that occurs

lib/sqlalchemy/sql/expression.py

         self.is_literal = is_literal
 
     def _compare_name_for_result(self, other):
-        # TODO: this still isn't 100% correct
-        if self.table is not None and hasattr(other, 'proxy_set'):
-            return self.proxy_set.intersection(other.proxy_set)
-        else:
+        if self.is_literal or \
+            self.table is None or \
+            not hasattr(other, 'proxy_set') or (
+            isinstance(other, ColumnClause) and other.is_literal
+        ):
             return super(ColumnClause, self).\
                     _compare_name_for_result(other)
+        else:
+            return other.proxy_set.intersection(self.proxy_set)
 
     def _get_table(self):
         return self.__dict__['table']

test/orm/test_query.py

     def test_fulltext(self):
         User = self.classes.User
 
-        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users order by id").all()
-
-        assert User(id=7) == create_session().query(User).from_statement("select * from users order by id").first()
-        assert None == create_session().query(User).from_statement("select * from users where name='nonexistent'").first()
+        eq_(
+            create_session().query(User).
+            from_statement("select * from users order by id").all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
+
+        eq_(
+            create_session().query(User).
+                from_statement("select * from users order by id").first(),
+            User(id=7)
+        )
+        eq_(
+            create_session().query(User).
+                from_statement(
+                    "select * from users where name='nonexistent'").first(),
+            None
+        )
 
     def test_fragment(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all()
-
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all()
-
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all()
+        eq_(
+            create_session().query(User).filter("id in (8, 9)").all(),
+            [User(id=8), User(id=9)]
+
+        )
+
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter("id=9").all(),
+            [User(id=9)]
+        )
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter(User.id == 9).all(),
+            [User(id=9)]
+        )
 
     def test_binds(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
+        eq_(
+            create_session().query(User).filter("id in (:id1, :id2)").\
+                params(id1=8, id2=9).all(),
+            [User(id=8), User(id=9)]
+        )
 
     def test_as_column(self):
         User = self.classes.User
 
         s = create_session()
-        assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name"))
-
-        eq_(s.query(User.id, "name").order_by(User.id).all(), [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
+        assert_raises(sa_exc.InvalidRequestError, s.query,
+                    User.id, text("users.name"))
+
+        eq_(s.query(User.id, "name").order_by(User.id).all(),
+                [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
+
+    def test_via_select(self):
+        User = self.classes.User
+        s = create_session()
+        eq_(
+            s.query(User).from_statement(
+                select(['id', 'name']).select_from('users').order_by('id'),
+            ).all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
 
 class ParentTest(QueryTest, AssertsCompiledSQL):
     __dialect__ = 'default'

test/sql/test_query.py

 
         row = testing.db.execute(select([content.c.type.label("content_type")])).first()
         assert content.c.type in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
         row = testing.db.execute(select([func.now().label("content_type")])).first()
         assert content.c.type not in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
     def test_pickled_rows(self):
             dict(user_id=1, user_name='john'),
             dict(user_id=2, user_name='jack')
         )
-        r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select * from query_users where user_id=2")
+            ).first()
+        self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+        self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+
+    def test_column_accessor_textual_select(self):
+        users.insert().execute(
+            dict(user_id=1, user_name='john'),
+            dict(user_id=2, user_name='jack')
+        )
+        # this will create column() objects inside
+        # the select(), these need to match on name anyway
+        r = testing.db.execute(
+            select(['user_id', 'user_name']).select_from('query_users').
+                where('user_id=2')
+        ).first()
         self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
 
 
         # test a little sqlite weirdness - with the UNION,
         # cols come back as "query_users.user_id" in cursor.description
-        r = text("select query_users.user_id, query_users.user_name from query_users "
-            "UNION select query_users.user_id, query_users.user_name from query_users",
-            bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select query_users.user_id, query_users.user_name from query_users "
+                "UNION select query_users.user_id, query_users.user_name from query_users"
+                )
+            ).first()
         eq_(r['user_id'], 1)
         eq_(r['user_name'], "john")
         eq_(r.keys(), ["user_id", "user_name"])