Commits

Mike Bayer committed ddc6abc Merge

merge tip

Comments (0)

Files changed (12)

     ultimate name as a name inside the embedded
     UNION. [ticket:2552]
 
+  - [bug] Fixed a regression since 0.6 regarding
+    result-row targeting.   It should be possible
+    to use a select() statement with string
+    based columns in it, that is
+    select(['id', 'name']).select_from('mytable'),
+    and have this statement be targetable by
+    Column objects with those names; this is the
+    mechanism by which
+    query(MyClass).from_statement(some_statement)
+    works.  At some point the specific case of
+    using select(['id']), which is equivalent to
+    select([literal_column('id')]), stopped working
+    here, so this has been re-instated and of
+    course tested. [ticket:2558]
+
 - engine
   - [bug] Fixed bug whereby
     a disconnect detect + dispose that occurs

lib/sqlalchemy/dialects/mssql/base.py

                 self.root_connection._cursor_execute(self.cursor,
                     "SET IDENTITY_INSERT %s ON" %
                     self.dialect.identifier_preparer.format_table(tbl),
-                    ())
+                    (), self)
 
     def post_exec(self):
         """Disable IDENTITY_INSERT if enabled."""
         if self._select_lastrowid:
             if self.dialect.use_scope_identity:
                 conn._cursor_execute(self.cursor,
-                    "SELECT scope_identity() AS lastrowid", ())
+                    "SELECT scope_identity() AS lastrowid", (), self)
             else:
                 conn._cursor_execute(self.cursor,
-                    "SELECT @@identity AS lastrowid", ())
+                    "SELECT @@identity AS lastrowid", (), self)
             # fetchall() ensures the cursor is consumed without closing it
             row = self.cursor.fetchall()[0]
             self._lastrowid = int(row[0])
                         "SET IDENTITY_INSERT %s OFF" %
                             self.dialect.identifier_preparer.
                                 format_table(self.compiled.statement.table),
-                        ()
-                        )
+                        (), self)
 
     def get_lastrowid(self):
         return self._lastrowid

lib/sqlalchemy/engine/base.py

             for fn in self.dispatch.before_cursor_execute:
                 statement, parameters = \
                             fn(self, cursor, statement, parameters,
-                                        context, context.executemany)
+                                        context, 
+                                        context.executemany 
+                                           if context is not None else False)
 
         if self._echo:
             self.engine.logger.info(statement)

lib/sqlalchemy/exc.py

         return ' '.join((SQLAlchemyError.__str__(self),
                          repr(self.statement), repr(params_repr)))
 
+    def __unicode__(self):
+        return self.__str__()
 
 class DBAPIError(StatementError):
     """Raised when the execution of a database operation fails.

lib/sqlalchemy/sql/compiler.py

                         isinstance(column, sql.Function)):
             result_expr = _CompileLabel(col_expr, column.anon_label)
         elif col_expr is not column:
-            result_expr = _CompileLabel(col_expr, column.anon_label)
+            # TODO: are we sure "column" has a .name and .key here ?
+            # assert isinstance(column, sql.ColumnClause)
+            result_expr = _CompileLabel(col_expr,
+                            sql._as_truncated(column.name),
+                            alt_names=(column.key,))
         else:
             result_expr = col_expr
 
                             iswrapper=False, fromhints=None,
                             compound_index=0,
                             positional_names=None, **kwargs):
-
         entry = self.stack and self.stack[-1] or {}
 
         existingfroms = entry.get('from', None)

lib/sqlalchemy/sql/expression.py

         self.is_literal = is_literal
 
     def _compare_name_for_result(self, other):
-        # TODO: this still isn't 100% correct
-        if self.table is not None and hasattr(other, 'proxy_set'):
-            return self.proxy_set.intersection(other.proxy_set)
-        else:
+        if self.is_literal or \
+            self.table is None or \
+            not hasattr(other, 'proxy_set') or (
+            isinstance(other, ColumnClause) and other.is_literal
+        ):
             return super(ColumnClause, self).\
                     _compare_name_for_result(other)
+        else:
+            return other.proxy_set.intersection(self.proxy_set)
 
     def _get_table(self):
         return self.__dict__['table']

test/engine/test_ddlevents.py

+from __future__ import with_statement
 from test.lib.testing import assert_raises, assert_raises_message
 from sqlalchemy.schema import DDL, CheckConstraint, AddConstraint, \
     DropConstraint
         default_from = testing.db.dialect.statement_compiler(
                             testing.db.dialect, None).default_from()
 
-        eq_(
-            testing.db.execute(
-                text("select 'foo%something'" + default_from)
-            ).scalar(),
-            'foo%something'
-        )
+        # We're abusing the DDL()
+        # construct here by pushing a SELECT through it
+        # so that we can verify the round trip.        
+        # the DDL() will trigger autocommit, which prohibits
+        # some DBAPIs from returning results (pyodbc), so we
+        # run in an explicit transaction.   
+        with testing.db.begin() as conn:
+            eq_(
+                conn.execute(
+                    text("select 'foo%something'" + default_from)
+                ).scalar(),
+                'foo%something'
+            )
 
-        eq_(
-            testing.db.execute(
-                DDL("select 'foo%%something'" + default_from)
-            ).scalar(),
-            'foo%something'
-        )
+            eq_(
+                conn.execute(
+                    DDL("select 'foo%%something'" + default_from)
+                ).scalar(),
+                'foo%something'
+            )
 
 
 

test/engine/test_execute.py

 
         for engine in [
             engines.testing_engine(options=dict(implicit_returning=False)),
-            engines.testing_engine(options=dict(implicit_returning=False,
-                                   strategy='threadlocal')),
-            engines.testing_engine(options=dict(implicit_returning=False)).\
-                connect()
+            #engines.testing_engine(options=dict(implicit_returning=False,
+            #                       strategy='threadlocal')),
+            #engines.testing_engine(options=dict(implicit_returning=False)).\
+            #    connect()
             ]:
             event.listen(engine, 'before_execute', execute)
             event.listen(engine, 'before_cursor_execute', cursor_execute)

test/orm/test_query.py

     def test_fulltext(self):
         User = self.classes.User
 
-        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users order by id").all()
-
-        assert User(id=7) == create_session().query(User).from_statement("select * from users order by id").first()
-        assert None == create_session().query(User).from_statement("select * from users where name='nonexistent'").first()
+        eq_(
+            create_session().query(User).
+            from_statement("select * from users order by id").all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
+
+        eq_(
+            create_session().query(User).
+                from_statement("select * from users order by id").first(),
+            User(id=7)
+        )
+        eq_(
+            create_session().query(User).
+                from_statement(
+                    "select * from users where name='nonexistent'").first(),
+            None
+        )
 
     def test_fragment(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all()
-
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all()
-
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all()
+        eq_(
+            create_session().query(User).filter("id in (8, 9)").all(),
+            [User(id=8), User(id=9)]
+
+        )
+
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter("id=9").all(),
+            [User(id=9)]
+        )
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter(User.id == 9).all(),
+            [User(id=9)]
+        )
 
     def test_binds(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
+        eq_(
+            create_session().query(User).filter("id in (:id1, :id2)").\
+                params(id1=8, id2=9).all(),
+            [User(id=8), User(id=9)]
+        )
 
     def test_as_column(self):
         User = self.classes.User
 
         s = create_session()
-        assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name"))
-
-        eq_(s.query(User.id, "name").order_by(User.id).all(), [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
+        assert_raises(sa_exc.InvalidRequestError, s.query,
+                    User.id, text("users.name"))
+
+        eq_(s.query(User.id, "name").order_by(User.id).all(),
+                [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
+
+    def test_via_select(self):
+        User = self.classes.User
+        s = create_session()
+        eq_(
+            s.query(User).from_statement(
+                select(['id', 'name']).select_from('users').order_by('id'),
+            ).all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
 
 class ParentTest(QueryTest, AssertsCompiledSQL):
     __dialect__ = 'default'

test/sql/test_query.py

 
         row = testing.db.execute(select([content.c.type.label("content_type")])).first()
         assert content.c.type in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
         row = testing.db.execute(select([func.now().label("content_type")])).first()
         assert content.c.type not in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
     def test_pickled_rows(self):
             dict(user_id=1, user_name='john'),
             dict(user_id=2, user_name='jack')
         )
-        r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select * from query_users where user_id=2")
+            ).first()
+        self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+        self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+
+    def test_column_accessor_textual_select(self):
+        users.insert().execute(
+            dict(user_id=1, user_name='john'),
+            dict(user_id=2, user_name='jack')
+        )
+        # this will create column() objects inside
+        # the select(), these need to match on name anyway
+        r = testing.db.execute(
+            select(['user_id', 'user_name']).select_from('query_users').
+                where('user_id=2')
+        ).first()
         self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
 
 
         # test a little sqlite weirdness - with the UNION,
         # cols come back as "query_users.user_id" in cursor.description
-        r = text("select query_users.user_id, query_users.user_name from query_users "
-            "UNION select query_users.user_id, query_users.user_name from query_users",
-            bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select query_users.user_id, query_users.user_name from query_users "
+                "UNION select query_users.user_id, query_users.user_name from query_users"
+                )
+            ).first()
         eq_(r['user_id'], 1)
         eq_(r['user_name'], "john")
         eq_(r.keys(), ["user_id", "user_name"])

test/sql/test_type_expressions.py

-from sqlalchemy import Table, Column, String, func, MetaData, select, TypeDecorator
+from sqlalchemy import Table, Column, String, func, MetaData, select, TypeDecorator, cast
 from test.lib import fixtures, AssertsCompiledSQL, testing
 from test.lib.testing import eq_
 
 
         self.assert_compile(
             select([table]),
-            "SELECT test_table.x, lower(test_table.y) AS y_1 FROM test_table"
+            "SELECT test_table.x, lower(test_table.y) AS y FROM test_table"
+        )
+
+    def test_anonymous_expr(self):
+        table = self._fixture()
+        self.assert_compile(
+            select([cast(table.c.y, String)]),
+            "SELECT CAST(test_table.y AS VARCHAR) AS anon_1 FROM test_table"
         )
 
     def test_select_cols_use_labels(self):
         table = self._fixture()
         self.assert_compile(
             select([table]).where(table.c.y == "hi"),
-            "SELECT test_table.x, lower(test_table.y) AS y_1 FROM "
-            "test_table WHERE test_table.y = lower(:y_2)"
+            "SELECT test_table.x, lower(test_table.y) AS y FROM "
+            "test_table WHERE test_table.y = lower(:y_1)"
         )
 
 class RoundTripTestBase(object):
             "Y1"
         )
 
+    def test_targeting_by_string(self):
+        testing.db.execute(
+            self.tables.test_table.insert(),
+            {"x": "X1", "y": "Y1"},
+        )
+        row = testing.db.execute(select([self.tables.test_table])).first()
+        eq_(
+            row["y"],
+            "Y1"
+        )
+
     def test_targeting_apply_labels(self):
         testing.db.execute(
             self.tables.test_table.insert(),

test/sql/test_types.py

         eq_(uni(unicodedata), unicodedata.encode('utf-8'))
 
     # Py3K
-    #@testing.fails_if(
-    #                    lambda: testing.db_spec("postgresql+pg8000")(testing.db),
-    #                    "pg8000 appropriately does not accept 'bytes' for a VARCHAR column."
+    #@testing.skip_if(
+    #                    lambda: testing.db_spec("postgresql")(testing.db),
+    #                    "pg8000 and psycopg2 both have issues here in py3k"
     #                    )
     def test_ignoring_unicode_error(self):
         """checks String(unicode_error='ignore') is passed to underlying codec."""