1. daviddavis
  2. sqlalchemy

Commits

Mike Bayer  committed 981448a Merge

merge default

  • Participants
  • Parent commits b0a77b7, 82d1db9
  • Branches rel_0_8

Comments (0)

Files changed (16)

File doc/build/changelog/changelog_07.rst

View file
     :released:
 
     .. change::
+        :tags: sql, bug
+        :tickets: 2631
+
+      Fixed bug where using server_onupdate=<FetchedValue|DefaultClause>
+      without passing the "for_update=True" flag would apply the default
+      object to the server_default, blowing away whatever was there.
+      The explicit for_update=True argument shouldn't be needed with this usage
+      (especially since the documentation shows an example without it being
+      used) so it is now arranged internally using a copy of the given default
+      object, if the flag isn't set to what corresponds to that argument.
+
+    .. change::
         :tags: oracle, bug
         :tickets: 2620
 

File doc/build/changelog/changelog_08.rst

View file
     :version: 0.8.0b2
 
     .. change::
+        :tags: sql, bug
+        :tickets: 2631
+
+      Fixed bug where using server_onupdate=<FetchedValue|DefaultClause>
+      without passing the "for_update=True" flag would apply the default
+      object to the server_default, blowing away whatever was there.
+      The explicit for_update=True argument shouldn't be needed with this usage
+      (especially since the documentation shows an example without it being
+      used) so it is now arranged internally using a copy of the given default
+      object, if the flag isn't set to what corresponds to that argument.
+      Also in 0.7.10.
+
+    .. change::
+        :tags: sql, bug
+        :tickets: 2610
+
+      Fixed bug whereby using a label_length on dialect that was smaller
+      than the size of actual column identifiers would fail to render
+      the columns correctly in a SELECT statement.
+
+    .. change::
         :tags: sql, feature
         :tickets: 2623
 

File doc/build/core/schema.rst

View file
         Column('def', String(20), server_onupdate=FetchedValue())
     )
 
+.. versionchanged:: 0.8.0b2,0.7.10
+    The ``for_update`` argument on :class:`.FetchedValue` is set automatically
+    when specified as the ``server_onupdate`` argument.  If using an older version,
+    specify the onupdate above as ``server_onupdate=FetchedValue(for_update=True)``.
+
 These markers do not emit a "default" clause when the table is created,
 however they do set the same internal flags as a static ``server_default``
 clause, providing hints to higher-level tools that a "post-fetch" of these

File lib/sqlalchemy/dialects/mysql/base.py

View file
 
     supports_sane_rowcount = True
     supports_sane_multi_rowcount = False
-    supports_multirow_insert = True
+    supports_multivalues_insert = True
 
     default_paramstyle = 'format'
     colspecs = colspecs

File lib/sqlalchemy/dialects/postgresql/base.py

View file
 
     supports_default_values = True
     supports_empty_insert = False
-    supports_multirow_insert = True
+    supports_multivalues_insert = True
     default_paramstyle = 'pyformat'
     ischema_names = ischema_names
     colspecs = colspecs

File lib/sqlalchemy/dialects/sqlite/base.py

View file
     supports_default_values = True
     supports_empty_insert = False
     supports_cast = True
-    supports_multirow_insert = True
+    supports_multivalues_insert = True
 
     default_paramstyle = 'qmark'
     execution_ctx_cls = SQLiteExecutionContext
                                 self.dbapi.sqlite_version_info >= (3, 3, 8)
             self.supports_cast = \
                                 self.dbapi.sqlite_version_info >= (3, 2, 3)
+            self.supports_multivalues_insert = \
+                                self.dbapi.sqlite_version_info >= (3, 7, 11)
+                                #  http://www.sqlite.org/releaselog/3_7_11.html
 
     _isolation_lookup = {
         'READ UNCOMMITTED': 1,

File lib/sqlalchemy/engine/default.py

View file
     default_paramstyle = 'named'
     supports_default_values = False
     supports_empty_insert = True
-    supports_multirow_insert = False
+    supports_multivalues_insert = False
 
     server_version_info = None
 

File lib/sqlalchemy/schema.py

View file
 
         if self.server_default is not None:
             if isinstance(self.server_default, FetchedValue):
-                args.append(self.server_default)
+                args.append(self.server_default._as_for_update(False))
             else:
                 args.append(DefaultClause(self.server_default))
 
 
         if self.server_onupdate is not None:
             if isinstance(self.server_onupdate, FetchedValue):
-                args.append(self.server_onupdate)
+                args.append(self.server_onupdate._as_for_update(True))
             else:
                 args.append(DefaultClause(self.server_onupdate,
                                             for_update=True))
         c.dispatch._update(self.dispatch)
         return c
 
-    def _make_proxy(self, selectable, name=None, key=None):
+    def _make_proxy(self, selectable, name=None, key=None,
+                            name_is_truncatable=False, **kw):
         """Create a *proxy* for this column.
 
         This is a copy of this ``Column`` referenced by a different parent
                     "been assigned.")
         try:
             c = self._constructor(
-                expression._as_truncated(name or self.name),
+                expression._as_truncated(name or self.name) if \
+                                name_is_truncatable else (name or self.name),
                 self.type,
                 key=key if key else name if name else self.key,
                 primary_key=self.primary_key,
     def __init__(self, for_update=False):
         self.for_update = for_update
 
+    def _as_for_update(self, for_update):
+        if for_update == self.for_update:
+            return self
+        else:
+            return self._clone(for_update)
+
+    def _clone(self, for_update):
+        n = self.__class__.__new__(self.__class__)
+        n.__dict__.update(self.__dict__)
+        n.__dict__.pop('column', None)
+        n.for_update = for_update
+        return n
+
     def _set_parent(self, column):
         self.column = column
         if self.for_update:

File lib/sqlalchemy/sql/compiler.py

View file
                                     self.dialect.name)
 
         if insert_stmt._has_multi_parameters:
-            if not self.dialect.supports_multirow_insert:
+            if not self.dialect.supports_multivalues_insert:
                 raise exc.CompileError("The '%s' dialect with current database "
                                     "version settings does not support "
                                     "in-place multirow inserts." %

File lib/sqlalchemy/sql/expression.py

View file
         return hasattr(other, 'name') and hasattr(self, 'name') and \
                 other.name == self.name
 
-    def _make_proxy(self, selectable, name=None, **kw):
+    def _make_proxy(self, selectable, name=None, name_is_truncatable=False, **kw):
         """Create a new :class:`.ColumnElement` representing this
         :class:`.ColumnElement` as it appears in the select list of a
         descending selectable.
             key = str(self)
         else:
             key = name
-        co = ColumnClause(_as_truncated(name),
+        co = ColumnClause(_as_truncated(name) if name_is_truncatable else name,
                             selectable,
                             type_=getattr(self,
                           'type', None))
                                 _compared_to_type=self.type,
                                 unique=True)
 
-    def _make_proxy(self, selectable, name=None, attach=True, **kw):
+    def _make_proxy(self, selectable, name=None, attach=True,
+                            name_is_truncatable=False, **kw):
         # propagate the "is_literal" flag only if we are keeping our name,
         # otherwise its considered to be a label
         is_literal = self.is_literal and (name is None or name == self.name)
         c = self._constructor(
-                    _as_truncated(name if name else self.name),
+                    _as_truncated(name or self.name) if \
+                                    name_is_truncatable else \
+                                    (name or self.name),
                     selectable=selectable,
                     type_=self.type,
                     is_literal=is_literal
             if hasattr(c, '_make_proxy'):
                 c._make_proxy(self,
                         name=c._label if self.use_labels else None,
-                        key=c._key_label if self.use_labels else None)
+                        key=c._key_label if self.use_labels else None,
+                        name_is_truncatable=True)
 
     def _refresh_for_new_column(self, column):
         for fromclause in self._froms:
                     if our_label not in self.c:
                         return col._make_proxy(self,
                             name=col._label if self.use_labels else None,
-                            key=col._key_label if self.use_labels else None)
+                            key=col._key_label if self.use_labels else None,
+                            name_is_truncatable=True)
                 return None
         return None
 

File lib/sqlalchemy/testing/requirements.py

View file
                 )
 
     @property
-    def multirow_inserts(self):
+    def multivalues_inserts(self):
         """target database must support multiple VALUES clauses in an
         INSERT statement."""
 
         return exclusions.skip_if(
-                    lambda: not self.db.dialect.supports_multirow_insert,
+                    lambda: not self.db.dialect.supports_multivalues_insert,
                     "Backend does not support multirow inserts."
                 )
 

File test/sql/test_compiler.py

View file
                     "INSERT INTO sometable (foo) VALUES (foobar())", params={})
 
     def test_empty_insert_default(self):
-        stmt = table1.insert().values()
+        stmt = table1.insert().values({})  # hide from 2to3
         self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()")
 
     def test_empty_insert_default_values(self):
-        stmt = table1.insert().values()
+        stmt = table1.insert().values({})  # hide from 2to3
         dialect = default.DefaultDialect()
         dialect.supports_empty_insert = dialect.supports_default_values = True
         self.assert_compile(stmt, "INSERT INTO mytable DEFAULT VALUES",
                         dialect=dialect)
 
     def test_empty_insert_not_supported(self):
-        stmt = table1.insert().values()
+        stmt = table1.insert().values({})  # hide from 2to3
         dialect = default.DefaultDialect()
         dialect.supports_empty_insert = dialect.supports_default_values = False
         assert_raises_message(
             stmt.compile, dialect=dialect
         )
 
-    def test_multirow_insert_not_supported(self):
+    def test_multivalues_insert_not_supported(self):
         stmt = table1.insert().values([{"myid": 1}, {"myid": 2}])
         dialect = default.DefaultDialect()
         assert_raises_message(
             stmt.compile, dialect=dialect
         )
 
-    def test_multirow_insert_named(self):
+    def test_multivalues_insert_named(self):
         stmt = table1.insert().\
                     values([{"myid": 1, "name": 'a', "description": 'b'},
                             {"myid": 2, "name": 'c', "description": 'd'},
                  "(:myid_2, :name_2, :description_2)"
 
         dialect = default.DefaultDialect()
-        dialect.supports_multirow_insert = True
+        dialect.supports_multivalues_insert = True
         self.assert_compile(stmt, result,
                 checkparams={
                     'description_2': 'f', 'name_2': 'e',
                 },
                 dialect=dialect)
 
-    def test_multirow_insert_positional(self):
+    def test_multivalues_insert_positional(self):
         stmt = table1.insert().\
                     values([{"myid": 1, "name": 'a', "description": 'b'},
                             {"myid": 2, "name": 'c', "description": 'd'},
                  "(%s, %s, %s)" \
 
         dialect = default.DefaultDialect()
-        dialect.supports_multirow_insert = True
+        dialect.supports_multivalues_insert = True
         dialect.paramstyle = "format"
         dialect.positional = True
         self.assert_compile(stmt, result,

File test/sql/test_labels.py

View file
-from sqlalchemy import *
+
 from sqlalchemy import exc as exceptions
 from sqlalchemy import testing
+from sqlalchemy.testing import engines
+from sqlalchemy import select, MetaData, Integer, or_
 from sqlalchemy.engine import default
 from sqlalchemy.sql import table, column
 from sqlalchemy.testing import assert_raises, eq_
 IDENT_LENGTH = 29
 
 
-class LabelTypeTest(fixtures.TestBase):
-    def test_type(self):
-        m = MetaData()
-        t = Table('sometable', m,
-            Column('col1', Integer),
-            Column('col2', Float))
-        assert isinstance(t.c.col1.label('hi').type, Integer)
-        assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
-                    Float)
+class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
 
-
-class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
-    run_inserts = 'once'
-    run_deletes = None
-
-    @classmethod
-    def define_tables(cls, metadata):
-        table1 = Table('some_large_named_table', metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            primary_key=True,
-                            test_needs_autoincrement=True),
-            Column('this_is_the_data_column', String(30))
-            )
-
-        table2 = Table('table_with_exactly_29_characs', metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            primary_key=True,
-                            test_needs_autoincrement=True),
-            Column('this_is_the_data_column', String(30))
-            )
-        cls.tables.table1 = table1
-        cls.tables.table2 = table2
-
-    @classmethod
-    def insert_data(cls):
-        table1 = cls.tables.table1
-        table2 = cls.tables.table2
-        for data in [
-            {'this_is_the_primarykey_column':1,
-                        'this_is_the_data_column':'data1'},
-            {'this_is_the_primarykey_column':2,
-                        'this_is_the_data_column':'data2'},
-            {'this_is_the_primarykey_column':3,
-                        'this_is_the_data_column':'data3'},
-            {'this_is_the_primarykey_column':4,
-                        'this_is_the_data_column':'data4'}
-        ]:
-            testing.db.execute(
-                table1.insert(),
-                **data
-            )
-        testing.db.execute(
-            table2.insert(),
-            {'this_is_the_primary_key_column': 1,
-            'this_is_the_data_column': 'data'}
+    table1 = table('some_large_named_table',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
         )
 
-    @classmethod
-    def setup_class(cls):
-        super(LongLabelsTest, cls).setup_class()
-        cls.maxlen = testing.db.dialect.max_identifier_length
-        testing.db.dialect.max_identifier_length = IDENT_LENGTH
+    table2 = table('table_with_exactly_29_characs',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
 
-    @classmethod
-    def teardown_class(cls):
-        testing.db.dialect.max_identifier_length = cls.maxlen
-        super(LongLabelsTest, cls).teardown_class()
+    __dialect__ = 'DefaultDialect'
 
-    def test_too_long_name_disallowed(self):
-        m = MetaData(testing.db)
-        t1 = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
-                        m, Column('foo', Integer))
-        assert_raises(exceptions.IdentifierError, m.create_all)
-        assert_raises(exceptions.IdentifierError, m.drop_all)
-        assert_raises(exceptions.IdentifierError, t1.create)
-        assert_raises(exceptions.IdentifierError, t1.drop)
 
-    def test_basic_result(self):
-        table1 = self.tables.table1
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column])
+    def _length_fixture(self, length=IDENT_LENGTH, positional=False):
+        dialect = default.DefaultDialect()
+        dialect.max_identifier_length = length
+        if positional:
+            dialect.paramstyle = 'format'
+            dialect.positional = True
+        return dialect
 
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (1, 'data1'),
-            (2, 'data2'),
-            (3, 'data3'),
-            (4, 'data4'),
-        ])
-
-    def test_result_limit(self):
-        table1 = self.tables.table1
-        # some dialects such as oracle (and possibly ms-sql in a future
-        # version) generate a subquery for limits/offsets. ensure that the
-        # generated result map corresponds to the selected table, not the
-        # select query
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column]).\
-                        limit(2)
-
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (1, 'data1'),
-            (2, 'data2'),
-        ])
-
-    @testing.requires.offset
-    def test_result_limit_offset(self):
-        table1 = self.tables.table1
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column]).\
-                        limit(2).offset(1)
-
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (2, 'data2'),
-            (3, 'data3'),
-        ])
+    def _engine_fixture(self, length=IDENT_LENGTH):
+        eng = engines.testing_engine()
+        eng.dialect.max_identifier_length = length
+        return eng
 
     def test_table_alias_1(self):
-        table2 = self.tables.table2
-        if testing.against('oracle'):
-            self.assert_compile(table2.alias().select(),
-                'SELECT '
-                    'table_with_exactly_29_c_1.'
-                    'this_is_the_primarykey_column, '
-                    'table_with_exactly_29_c_1.this_is_the_data_column '
-                'FROM '
-                    'table_with_exactly_29_characs '
-                'table_with_exactly_29_c_1'
-            )
-        else:
-            self.assert_compile(
-                table2.alias().select(),
-                'SELECT '
-                    'table_with_exactly_29_c_1.'
-                    'this_is_the_primarykey_column, '
-                    'table_with_exactly_29_c_1.this_is_the_data_column '
-                'FROM '
-                    'table_with_exactly_29_characs '
-                'AS table_with_exactly_29_c_1'
-            )
+        self.assert_compile(
+            self.table2.alias().select(),
+            'SELECT '
+                'table_with_exactly_29_c_1.'
+                'this_is_the_primarykey_column, '
+                'table_with_exactly_29_c_1.this_is_the_data_column '
+            'FROM '
+                'table_with_exactly_29_characs '
+            'AS table_with_exactly_29_c_1',
+            dialect=self._length_fixture()
+        )
 
     def test_table_alias_2(self):
-        table1 = self.tables.table1
-        table2 = self.tables.table2
+        table1 = self.table1
+        table2 = self.table2
         ta = table2.alias()
-        dialect = default.DefaultDialect()
-        dialect.max_identifier_length = IDENT_LENGTH
         on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
         self.assert_compile(
             select([table1, ta]).select_from(table1.join(ta, on)).\
             'WHERE '
                 'table_with_exactly_29_c_1.this_is_the_data_column = '
                     ':this_is_the_data_column_1',
-            dialect=dialect
+            dialect=self._length_fixture()
         )
 
-    def test_table_alias_3(self):
-        table2 = self.tables.table2
-        eq_(
-            testing.db.execute(table2.alias().select()).first(),
-            (1, 'data')
-        )
+    def test_too_long_name_disallowed(self):
+        m = MetaData()
+        t = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
+                        m, Column('foo', Integer))
+        eng = self._engine_fixture()
+        for meth in (
+                    t.create,
+                    t.drop,
+                    m.create_all,
+                    m.drop_all
+                ):
+            assert_raises(
+                    exceptions.IdentifierError,
+                    meth, eng
+                )
 
-    def test_colbinds(self):
-        table1 = self.tables.table1
-        r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
-                    execute()
-        assert r.fetchall() == [(4, 'data4')]
+    def _assert_labeled_table1_select(self, s):
+        table1 = self.table1
+        compiled = s.compile(dialect=self._length_fixture())
 
-        r = table1.select(or_(
-            table1.c.this_is_the_primarykey_column == 4,
-            table1.c.this_is_the_primarykey_column == 2
-        )).execute()
-        assert r.fetchall() == [(2, 'data2'), (4, 'data4')]
+        assert set(compiled.result_map['some_large_named_table__2'][1]).\
+                issuperset(
+                    [
+                        'some_large_named_table_this_is_the_data_column',
+                        'some_large_named_table__2',
+                        table1.c.this_is_the_data_column
+                    ]
+                )
 
-    @testing.provide_metadata
-    def test_insert_no_pk(self):
-        t = Table('some_other_large_named_table', self.metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            Sequence('this_is_some_large_seq'),
-                            primary_key=True),
-            Column('this_is_the_data_column', String(30))
-            )
-        t.create(testing.db, checkfirst=True)
-        testing.db.execute(t.insert(),
-                **{'this_is_the_data_column': 'data1'})
+        assert set(compiled.result_map['some_large_named_table__1'][1]).\
+                issuperset(
+                    [
+                        'some_large_named_table_this_is_the_primarykey_column',
+                        'some_large_named_table__1',
+                        table1.c.this_is_the_primarykey_column
+                    ]
+                )
 
-    @testing.requires.subqueries
-    def test_subquery(self):
-        table1 = self.tables.table1
-        q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
-                        alias('foo')
-        eq_(
-            list(testing.db.execute(select([q]))),
-            [(4, u'data4')]
-        )
+    def test_result_map_use_labels(self):
+        table1 = self.table1
+        s = table1.select().apply_labels().\
+                order_by(table1.c.this_is_the_primarykey_column)
 
-    @testing.requires.subqueries
-    def test_anon_alias(self):
-        table1 = self.tables.table1
-        compile_dialect = default.DefaultDialect()
-        compile_dialect.max_identifier_length = IDENT_LENGTH
+        self._assert_labeled_table1_select(s)
+
+    def test_result_map_limit(self):
+        table1 = self.table1
+        # some dialects such as oracle (and possibly ms-sql in a future
+        # version) generate a subquery for limits/offsets. ensure that the
+        # generated result map corresponds to the selected table, not the
+        # select query
+        s = table1.select(use_labels=True,
+                        order_by=[table1.c.this_is_the_primarykey_column]).\
+                        limit(2)
+        self._assert_labeled_table1_select(s)
+
+    def test_result_map_subquery(self):
+        table1 = self.table1
+        s = table1.select(
+                    table1.c.this_is_the_primarykey_column == 4).\
+                    alias('foo')
+        s2 = select([s])
+        compiled = s2.compile(dialect=self._length_fixture())
+        assert \
+            set(compiled.result_map['this_is_the_data_column'][1]).\
+            issuperset(['this_is_the_data_column',
+                    s.c.this_is_the_data_column])
+        assert \
+            set(compiled.result_map['this_is_the_primarykey_column'][1]).\
+            issuperset(['this_is_the_primarykey_column',
+                    s.c.this_is_the_primarykey_column])
+
+    def test_result_map_anon_alias(self):
+        table1 = self.table1
+        dialect = self._length_fixture()
 
         q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
-        x = select([q], use_labels=True)
+        s = select([q]).apply_labels()
 
-        self.assert_compile(x,
+        self.assert_compile(s,
             'SELECT '
                 'anon_1.this_is_the_primarykey_column '
                     'AS anon_1_this_is_the_prim_1, '
                         '= :this_is_the_primarykey__1'
                 ') '
             'AS anon_1',
+            dialect=dialect)
+        compiled = s.compile(dialect=dialect)
+        assert set(compiled.result_map['anon_1_this_is_the_data_2'][1]).\
+                issuperset([
+                        'anon_1_this_is_the_data_2',
+                        q.corresponding_column(
+                                table1.c.this_is_the_data_column)
+                    ])
+
+        assert set(compiled.result_map['anon_1_this_is_the_prim_1'][1]).\
+                issuperset([
+                        'anon_1_this_is_the_prim_1',
+                        q.corresponding_column(
+                                table1.c.this_is_the_primarykey_column)
+                    ])
+
+    def test_column_bind_labels_1(self):
+        table1 = self.table1
+
+        s = table1.select(table1.c.this_is_the_primarykey_column == 4)
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__1",
+            checkparams={'this_is_the_primarykey__1': 4},
+            dialect=self._length_fixture()
+        )
+
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s",
+            checkpositional=(4, ),
+            checkparams={'this_is_the_primarykey__1': 4},
+            dialect=self._length_fixture(positional=True)
+        )
+
+    def test_column_bind_labels_2(self):
+        table1 = self.table1
+
+        s = table1.select(or_(
+            table1.c.this_is_the_primarykey_column == 4,
+            table1.c.this_is_the_primarykey_column == 2
+        ))
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__1 OR "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__2",
+            checkparams={
+                'this_is_the_primarykey__1': 4,
+                'this_is_the_primarykey__2': 2
+            },
+            dialect=self._length_fixture()
+        )
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s OR "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s",
+            checkparams={
+                'this_is_the_primarykey__1': 4,
+                'this_is_the_primarykey__2': 2
+            },
+            checkpositional=(4, 2),
+            dialect=self._length_fixture(positional=True)
+        )
+
+class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
+
+    table1 = table('some_large_named_table',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
+
+    table2 = table('table_with_exactly_29_characs',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
+
+    __dialect__ = 'DefaultDialect'
+
+    def test_adjustable_1(self):
+        table1 = self.table1
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).alias('foo')
+        x = select([q])
+        compile_dialect = default.DefaultDialect(label_length=10)
+        self.assert_compile(x,
+            'SELECT '
+                'foo.this_1, foo.this_2 '
+            'FROM ('
+                'SELECT '
+                    'some_large_named_table.this_is_the_primarykey_column '
+                        'AS this_1, '
+                    'some_large_named_table.this_is_the_data_column '
+                        'AS this_2 '
+                'FROM '
+                    'some_large_named_table '
+                'WHERE '
+                    'some_large_named_table.this_is_the_primarykey_column '
+                        '= :this_1'
+                ') '
+            'AS foo',
             dialect=compile_dialect)
 
-        eq_(
-            list(testing.db.execute(x)),
-            [(4, u'data4')]
-        )
-
-    def test_adjustable(self):
-        table1 = self.tables.table1
+    def test_adjustable_2(self):
+        table1 = self.table1
 
         q = table1.select(
             table1.c.this_is_the_primarykey_column == 4).alias('foo')
             'AS foo',
             dialect=compile_dialect)
 
+    def test_adjustable_3(self):
+        table1 = self.table1
+
         compile_dialect = default.DefaultDialect(label_length=4)
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).alias('foo')
+        x = select([q])
+
         self.assert_compile(x,
             'SELECT '
                 'foo._1, foo._2 '
             'AS foo',
         dialect=compile_dialect)
 
+    def test_adjustable_4(self):
+        table1 = self.table1
+
         q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
         x = select([q], use_labels=True)
 
             'AS anon_1',
             dialect=compile_dialect)
 
+    def test_adjustable_5(self):
+        table1 = self.table1
+        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+        x = select([q], use_labels=True)
+
         compile_dialect = default.DefaultDialect(label_length=4)
         self.assert_compile(x,
             'SELECT '
             'AS _1',
             dialect=compile_dialect)
 
-    def test_adjustable_result_schema_column(self):
-        table1 = self.tables.table1
+
+    def test_adjustable_result_schema_column_1(self):
+        table1 = self.table1
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).apply_labels().\
+                alias('foo')
+        dialect = default.DefaultDialect(label_length=10)
+
+        compiled = q.compile(dialect=dialect)
+        assert set(compiled.result_map['some_2'][1]).issuperset([
+                    table1.c.this_is_the_data_column,
+                    'some_large_named_table_this_is_the_data_column',
+                    'some_2'
+
+                ])
+        assert set(compiled.result_map['some_1'][1]).issuperset([
+                    table1.c.this_is_the_primarykey_column,
+                    'some_large_named_table_this_is_the_primarykey_column',
+                    'some_1'
+
+                ])
+
+    def test_adjustable_result_schema_column_2(self):
+        table1 = self.table1
 
         q = table1.select(
             table1.c.this_is_the_primarykey_column == 4).alias('foo')
         x = select([q])
 
-        e = testing_engine(options={'label_length': 10})
-        e.pool = testing.db.pool
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
-        eq_(row['this_1'], 4)
+        dialect = default.DefaultDialect(label_length=10)
 
-        q = table1.select(
-            table1.c.this_is_the_primarykey_column == 4).alias('foo')
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
+        compiled = x.compile(dialect=dialect)
+        assert set(compiled.result_map['this_2'][1]).issuperset([
+                    q.corresponding_column(table1.c.this_is_the_data_column),
+                    'this_is_the_data_column',
+                    'this_2'
 
-    def test_adjustable_result_lightweight_column(self):
+                ])
+        assert set(compiled.result_map['this_1'][1]).issuperset([
+                    q.corresponding_column(table1.c.this_is_the_primarykey_column),
+                    'this_is_the_primarykey_column',
+                    'this_1'
 
-        table1 = table('some_large_named_table',
-            column('this_is_the_primarykey_column'),
-            column('this_is_the_data_column')
+                ])
+
+    def test_table_plus_column_exceeds_length(self):
+        """test that the truncation only occurs when tablename + colname are
+        concatenated, if they are individually under the label length.
+
+        """
+
+        compile_dialect = default.DefaultDialect(label_length=30)
+        a_table = table(
+            'thirty_characters_table_xxxxxx',
+            column('id')
         )
 
-        q = table1.select(
-            table1.c.this_is_the_primarykey_column == 4).alias('foo')
-        x = select([q])
-
-        e = testing_engine(options={'label_length': 10})
-        e.pool = testing.db.pool
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
-
-    def test_table_plus_column_exceeds_length(self):
-        '''test that the truncation occurs if tablename / colname are only
-        greater than the max when concatenated.'''
-
-        compile_dialect = default.DefaultDialect(label_length=30)
-        m = MetaData()
-        a_table = Table(
-            'thirty_characters_table_xxxxxx',
-            m,
-            Column('id', Integer, primary_key=True)
-        )
-
-        other_table = Table(
+        other_table = table(
             'other_thirty_characters_table_',
-            m,
-            Column('id', Integer, primary_key=True),
-            Column('thirty_characters_table_id',
-                Integer,
-                ForeignKey('thirty_characters_table_xxxxxx.id'),
-                primary_key=True
-            )
+            column('id'),
+            column('thirty_characters_table_id')
         )
 
         anon = a_table.alias()
+
+        j1 = other_table.outerjoin(anon,
+                anon.c.id == other_table.c.thirty_characters_table_id)
+
         self.assert_compile(
             select([other_table, anon]).
-                    select_from(other_table.outerjoin(anon)).apply_labels(),
+                    select_from(j1).apply_labels(),
             'SELECT '
                 'other_thirty_characters_table_.id '
                     'AS other_thirty_characters__1, '
                 'other_thirty_characters_table_.thirty_characters_table_id',
             dialect=compile_dialect)
 
+
+    def test_colnames_longer_than_labels_lowercase(self):
+        t1 = table('a', column('abcde'))
+        self._test_colnames_longer_than_labels(t1)
+
+    def test_colnames_longer_than_labels_uppercase(self):
+        m = MetaData()
+        t1 = Table('a', m, Column('abcde', Integer))
+        self._test_colnames_longer_than_labels(t1)
+
+    def _test_colnames_longer_than_labels(self, t1):
+        dialect = default.DefaultDialect(label_length=4)
+        a1 = t1.alias(name='asdf')
+
+        # 'abcde' is longer than 4, but rendered as itself
+        # needs to have all characters
+        s = select([a1])
         self.assert_compile(
-            select([other_table, anon]).
-                select_from(other_table.outerjoin(anon)).apply_labels(),
-            'SELECT '
-                'other_thirty_characters_table_.id '
-                    'AS other_thirty_characters__1, '
-                'other_thirty_characters_table_.thirty_characters_table_id '
-                    'AS other_thirty_characters__2, '
-                'thirty_characters_table__1.id '
-                    'AS thirty_characters_table__3 '
-            'FROM '
-                'other_thirty_characters_table_ '
-            'LEFT OUTER JOIN '
-                'thirty_characters_table_xxxxxx AS thirty_characters_table__1 '
-            'ON thirty_characters_table__1.id = '
-                'other_thirty_characters_table_.thirty_characters_table_id',
-            dialect=compile_dialect
+            select([a1]),
+            "SELECT asdf.abcde FROM a AS asdf",
+            dialect=dialect
         )
+        compiled = s.compile(dialect=dialect)
+        assert set(compiled.result_map['abcde'][1]).issuperset([
+                    'abcde',
+                    a1.c.abcde,
+                    'abcde'
+                ])
+
+        # column still there, but short label
+        s = select([a1]).apply_labels()
+        self.assert_compile(
+            s,
+            "SELECT asdf.abcde AS _1 FROM a AS asdf",
+            dialect=dialect
+        )
+        compiled = s.compile(dialect=dialect)
+        assert set(compiled.result_map['_1'][1]).issuperset([
+                    'asdf_abcde',
+                    a1.c.abcde,
+                    '_1'
+                ])
+
+

File test/sql/test_metadata.py

View file
         assert c.server_default is target
         assert target.column is c
 
+    def test_onupdate_default_not_server_default_one(self):
+        target1 = schema.DefaultClause('y')
+        target2 = schema.DefaultClause('z')
+
+        c = self._fixture(server_default=target1, server_onupdate=target2)
+        eq_(c.server_default.arg, 'y')
+        eq_(c.server_onupdate.arg, 'z')
+
+    def test_onupdate_default_not_server_default_two(self):
+        target1 = schema.DefaultClause('y', for_update=True)
+        target2 = schema.DefaultClause('z', for_update=True)
+
+        c = self._fixture(server_default=target1, server_onupdate=target2)
+        eq_(c.server_default.arg, 'y')
+        eq_(c.server_onupdate.arg, 'z')
+
+    def test_onupdate_default_not_server_default_three(self):
+        target1 = schema.DefaultClause('y', for_update=False)
+        target2 = schema.DefaultClause('z', for_update=True)
+
+        c = self._fixture(target1, target2)
+        eq_(c.server_default.arg, 'y')
+        eq_(c.server_onupdate.arg, 'z')
+
+    def test_onupdate_default_not_server_default_four(self):
+        target1 = schema.DefaultClause('y', for_update=False)
+
+        c = self._fixture(server_onupdate=target1)
+        is_(c.server_default, None)
+        eq_(c.server_onupdate.arg, 'y')
+
     def test_server_default_keyword_as_schemaitem(self):
         target = schema.DefaultClause('y')
         c = self._fixture(server_default=target)

File test/sql/test_query.py

View file
     def teardown_class(cls):
         metadata.drop_all()
 
-    @testing.requires.multirow_inserts
-    def test_multirow_insert(self):
+    @testing.requires.multivalues_inserts
+    def test_multivalues_insert(self):
         users.insert(values=[{'user_id':7, 'user_name':'jack'},
             {'user_id':8, 'user_name':'ed'}]).execute()
         rows = users.select().execute().fetchall()

File test/sql/test_returning.py

View file
             eq_(result2.fetchall(), [(2, 2, False, None), (3, 3, True, None)])
 
 
-    @testing.requires.multirow_inserts
+    @testing.requires.multivalues_inserts
     def test_multirow_returning(self):
         ins = table.insert().returning(table.c.id, table.c.persons).values(
                             [