Commits

Mike Bayer committed e485bf8

- refactor test_labels into all compiler/defaultdialect tests

Comments (0)

Files changed (1)

test/sql/test_labels.py

-from sqlalchemy import *
+
 from sqlalchemy import exc as exceptions
 from sqlalchemy import testing
+from sqlalchemy.testing import engines
+from sqlalchemy import select, MetaData, Integer, or_
 from sqlalchemy.engine import default
 from sqlalchemy.sql import table, column
 from sqlalchemy.testing import assert_raises, eq_
 IDENT_LENGTH = 29
 
 
-class LabelTypeTest(fixtures.TestBase):
-    def test_type(self):
-        m = MetaData()
-        t = Table('sometable', m,
-            Column('col1', Integer),
-            Column('col2', Float))
-        assert isinstance(t.c.col1.label('hi').type, Integer)
-        assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
-                    Float)
+class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
 
-
-class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
-    run_inserts = 'once'
-    run_deletes = None
-
-    @classmethod
-    def define_tables(cls, metadata):
-        table1 = Table('some_large_named_table', metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            primary_key=True,
-                            test_needs_autoincrement=True),
-            Column('this_is_the_data_column', String(30))
-            )
-
-        table2 = Table('table_with_exactly_29_characs', metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            primary_key=True,
-                            test_needs_autoincrement=True),
-            Column('this_is_the_data_column', String(30))
-            )
-        cls.tables.table1 = table1
-        cls.tables.table2 = table2
-
-    @classmethod
-    def insert_data(cls):
-        table1 = cls.tables.table1
-        table2 = cls.tables.table2
-        for data in [
-            {'this_is_the_primarykey_column':1,
-                        'this_is_the_data_column':'data1'},
-            {'this_is_the_primarykey_column':2,
-                        'this_is_the_data_column':'data2'},
-            {'this_is_the_primarykey_column':3,
-                        'this_is_the_data_column':'data3'},
-            {'this_is_the_primarykey_column':4,
-                        'this_is_the_data_column':'data4'}
-        ]:
-            testing.db.execute(
-                table1.insert(),
-                **data
-            )
-        testing.db.execute(
-            table2.insert(),
-            {'this_is_the_primary_key_column': 1,
-            'this_is_the_data_column': 'data'}
+    table1 = table('some_large_named_table',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
         )
 
-    @classmethod
-    def setup_class(cls):
-        super(LongLabelsTest, cls).setup_class()
-        cls.maxlen = testing.db.dialect.max_identifier_length
-        testing.db.dialect.max_identifier_length = IDENT_LENGTH
+    table2 = table('table_with_exactly_29_characs',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
 
-    @classmethod
-    def teardown_class(cls):
-        testing.db.dialect.max_identifier_length = cls.maxlen
-        super(LongLabelsTest, cls).teardown_class()
+    __dialect__ = 'DefaultDialect'
 
-    def test_too_long_name_disallowed(self):
-        m = MetaData(testing.db)
-        t1 = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
-                        m, Column('foo', Integer))
-        assert_raises(exceptions.IdentifierError, m.create_all)
-        assert_raises(exceptions.IdentifierError, m.drop_all)
-        assert_raises(exceptions.IdentifierError, t1.create)
-        assert_raises(exceptions.IdentifierError, t1.drop)
 
-    def test_basic_result(self):
-        table1 = self.tables.table1
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column])
+    def _length_fixture(self, length=IDENT_LENGTH, positional=False):
+        dialect = default.DefaultDialect()
+        dialect.max_identifier_length = length
+        if positional:
+            dialect.paramstyle = 'format'
+            dialect.positional = True
+        return dialect
 
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (1, 'data1'),
-            (2, 'data2'),
-            (3, 'data3'),
-            (4, 'data4'),
-        ])
-
-    def test_result_limit(self):
-        table1 = self.tables.table1
-        # some dialects such as oracle (and possibly ms-sql in a future
-        # version) generate a subquery for limits/offsets. ensure that the
-        # generated result map corresponds to the selected table, not the
-        # select query
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column]).\
-                        limit(2)
-
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (1, 'data1'),
-            (2, 'data2'),
-        ])
-
-    @testing.requires.offset
-    def test_result_limit_offset(self):
-        table1 = self.tables.table1
-        s = table1.select(use_labels=True,
-                        order_by=[table1.c.this_is_the_primarykey_column]).\
-                        limit(2).offset(1)
-
-        result = [
-            (row[table1.c.this_is_the_primarykey_column],
-            row[table1.c.this_is_the_data_column])
-            for row in testing.db.execute(s)
-        ]
-        eq_(result, [
-            (2, 'data2'),
-            (3, 'data3'),
-        ])
+    def _engine_fixture(self, length=IDENT_LENGTH):
+        eng = engines.testing_engine()
+        eng.dialect.max_identifier_length = length
+        return eng
 
     def test_table_alias_1(self):
-        table2 = self.tables.table2
-        if testing.against('oracle'):
-            self.assert_compile(table2.alias().select(),
-                'SELECT '
-                    'table_with_exactly_29_c_1.'
-                    'this_is_the_primarykey_column, '
-                    'table_with_exactly_29_c_1.this_is_the_data_column '
-                'FROM '
-                    'table_with_exactly_29_characs '
-                'table_with_exactly_29_c_1'
-            )
-        else:
-            self.assert_compile(
-                table2.alias().select(),
-                'SELECT '
-                    'table_with_exactly_29_c_1.'
-                    'this_is_the_primarykey_column, '
-                    'table_with_exactly_29_c_1.this_is_the_data_column '
-                'FROM '
-                    'table_with_exactly_29_characs '
-                'AS table_with_exactly_29_c_1'
-            )
+        self.assert_compile(
+            self.table2.alias().select(),
+            'SELECT '
+                'table_with_exactly_29_c_1.'
+                'this_is_the_primarykey_column, '
+                'table_with_exactly_29_c_1.this_is_the_data_column '
+            'FROM '
+                'table_with_exactly_29_characs '
+            'AS table_with_exactly_29_c_1',
+            dialect=self._length_fixture()
+        )
 
     def test_table_alias_2(self):
-        table1 = self.tables.table1
-        table2 = self.tables.table2
+        table1 = self.table1
+        table2 = self.table2
         ta = table2.alias()
-        dialect = default.DefaultDialect()
-        dialect.max_identifier_length = IDENT_LENGTH
         on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
         self.assert_compile(
             select([table1, ta]).select_from(table1.join(ta, on)).\
             'WHERE '
                 'table_with_exactly_29_c_1.this_is_the_data_column = '
                     ':this_is_the_data_column_1',
-            dialect=dialect
+            dialect=self._length_fixture()
         )
 
-    def test_table_alias_3(self):
-        table2 = self.tables.table2
-        eq_(
-            testing.db.execute(table2.alias().select()).first(),
-            (1, 'data')
-        )
+    def test_too_long_name_disallowed(self):
+        m = MetaData()
+        t = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
+                        m, Column('foo', Integer))
+        eng = self._engine_fixture()
+        for meth in (
+                    t.create,
+                    t.drop,
+                    m.create_all,
+                    m.drop_all
+                ):
+            assert_raises(
+                    exceptions.IdentifierError,
+                    meth, eng
+                )
 
-    def test_colbinds(self):
-        table1 = self.tables.table1
-        r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
-                    execute()
-        assert r.fetchall() == [(4, 'data4')]
+    def _assert_labeled_table1_select(self, s):
+        table1 = self.table1
+        compiled = s.compile(dialect=self._length_fixture())
 
-        r = table1.select(or_(
-            table1.c.this_is_the_primarykey_column == 4,
-            table1.c.this_is_the_primarykey_column == 2
-        )).execute()
-        assert r.fetchall() == [(2, 'data2'), (4, 'data4')]
+        assert set(compiled.result_map['some_large_named_table__2'][1]).\
+                issuperset(
+                    [
+                        'some_large_named_table_this_is_the_data_column',
+                        'some_large_named_table__2',
+                        table1.c.this_is_the_data_column
+                    ]
+                )
 
-    @testing.provide_metadata
-    def test_insert_no_pk(self):
-        t = Table('some_other_large_named_table', self.metadata,
-            Column('this_is_the_primarykey_column', Integer,
-                            Sequence('this_is_some_large_seq'),
-                            primary_key=True),
-            Column('this_is_the_data_column', String(30))
-            )
-        t.create(testing.db, checkfirst=True)
-        testing.db.execute(t.insert(),
-                **{'this_is_the_data_column': 'data1'})
+        assert set(compiled.result_map['some_large_named_table__1'][1]).\
+                issuperset(
+                    [
+                        'some_large_named_table_this_is_the_primarykey_column',
+                        'some_large_named_table__1',
+                        table1.c.this_is_the_primarykey_column
+                    ]
+                )
 
-    @testing.requires.subqueries
-    def test_subquery(self):
-        table1 = self.tables.table1
-        q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
-                        alias('foo')
-        eq_(
-            list(testing.db.execute(select([q]))),
-            [(4, u'data4')]
-        )
+    def test_result_map_use_labels(self):
+        table1 = self.table1
+        s = table1.select().apply_labels().\
+                order_by(table1.c.this_is_the_primarykey_column)
 
-    @testing.requires.subqueries
-    def test_anon_alias(self):
-        table1 = self.tables.table1
-        compile_dialect = default.DefaultDialect()
-        compile_dialect.max_identifier_length = IDENT_LENGTH
+        self._assert_labeled_table1_select(s)
+
+    def test_result_map_limit(self):
+        table1 = self.table1
+        # some dialects such as oracle (and possibly ms-sql in a future
+        # version) generate a subquery for limits/offsets. ensure that the
+        # generated result map corresponds to the selected table, not the
+        # select query
+        s = table1.select(use_labels=True,
+                        order_by=[table1.c.this_is_the_primarykey_column]).\
+                        limit(2)
+        self._assert_labeled_table1_select(s)
+
+    def test_result_map_subquery(self):
+        table1 = self.table1
+        s = table1.select(
+                    table1.c.this_is_the_primarykey_column == 4).\
+                    alias('foo')
+        s2 = select([s])
+        compiled = s2.compile(dialect=self._length_fixture())
+        assert \
+            set(compiled.result_map['this_is_the_data_column'][1]).\
+            issuperset(['this_is_the_data_column',
+                    s.c.this_is_the_data_column])
+        assert \
+            set(compiled.result_map['this_is_the_primarykey_column'][1]).\
+            issuperset(['this_is_the_primarykey_column',
+                    s.c.this_is_the_primarykey_column])
+
+    def test_result_map_anon_alias(self):
+        table1 = self.table1
+        dialect = self._length_fixture()
 
         q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
-        x = select([q], use_labels=True)
+        s = select([q]).apply_labels()
 
-        self.assert_compile(x,
+        self.assert_compile(s,
             'SELECT '
                 'anon_1.this_is_the_primarykey_column '
                     'AS anon_1_this_is_the_prim_1, '
                         '= :this_is_the_primarykey__1'
                 ') '
             'AS anon_1',
+            dialect=dialect)
+        compiled = s.compile(dialect=dialect)
+        assert set(compiled.result_map['anon_1_this_is_the_data_2'][1]).\
+                issuperset([
+                        'anon_1_this_is_the_data_2',
+                        q.corresponding_column(
+                                table1.c.this_is_the_data_column)
+                    ])
+
+        assert set(compiled.result_map['anon_1_this_is_the_prim_1'][1]).\
+                issuperset([
+                        'anon_1_this_is_the_prim_1',
+                        q.corresponding_column(
+                                table1.c.this_is_the_primarykey_column)
+                    ])
+
+    def test_column_bind_labels_1(self):
+        table1 = self.table1
+
+        s = table1.select(table1.c.this_is_the_primarykey_column == 4)
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__1",
+            checkparams={'this_is_the_primarykey__1': 4},
+            dialect=self._length_fixture()
+        )
+
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s",
+            checkpositional=(4, ),
+            checkparams={'this_is_the_primarykey__1': 4},
+            dialect=self._length_fixture(positional=True)
+        )
+
+    def test_column_bind_labels_2(self):
+        table1 = self.table1
+
+        s = table1.select(or_(
+            table1.c.this_is_the_primarykey_column == 4,
+            table1.c.this_is_the_primarykey_column == 2
+        ))
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__1 OR "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            ":this_is_the_primarykey__2",
+            checkparams={
+                'this_is_the_primarykey__1': 4,
+                'this_is_the_primarykey__2': 2
+            },
+            dialect=self._length_fixture()
+        )
+        self.assert_compile(
+            s,
+            "SELECT some_large_named_table.this_is_the_primarykey_column, "
+            "some_large_named_table.this_is_the_data_column "
+            "FROM some_large_named_table WHERE "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s OR "
+            "some_large_named_table.this_is_the_primarykey_column = "
+            "%s",
+            checkparams={
+                'this_is_the_primarykey__1': 4,
+                'this_is_the_primarykey__2': 2
+            },
+            checkpositional=(4, 2),
+            dialect=self._length_fixture(positional=True)
+        )
+
+class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
+
+    table1 = table('some_large_named_table',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
+
+    table2 = table('table_with_exactly_29_characs',
+            column('this_is_the_primarykey_column'),
+            column('this_is_the_data_column')
+        )
+
+    __dialect__ = 'DefaultDialect'
+
+    def test_adjustable_1(self):
+        table1 = self.table1
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).alias('foo')
+        x = select([q])
+        compile_dialect = default.DefaultDialect(label_length=10)
+        self.assert_compile(x,
+            'SELECT '
+                'foo.this_1, foo.this_2 '
+            'FROM ('
+                'SELECT '
+                    'some_large_named_table.this_is_the_primarykey_column '
+                        'AS this_1, '
+                    'some_large_named_table.this_is_the_data_column '
+                        'AS this_2 '
+                'FROM '
+                    'some_large_named_table '
+                'WHERE '
+                    'some_large_named_table.this_is_the_primarykey_column '
+                        '= :this_1'
+                ') '
+            'AS foo',
             dialect=compile_dialect)
 
-        eq_(
-            list(testing.db.execute(x)),
-            [(4, u'data4')]
-        )
-
-    def test_adjustable(self):
-        table1 = self.tables.table1
+    def test_adjustable_2(self):
+        table1 = self.table1
 
         q = table1.select(
             table1.c.this_is_the_primarykey_column == 4).alias('foo')
             'AS foo',
             dialect=compile_dialect)
 
+    def test_adjustable_3(self):
+        table1 = self.table1
+
         compile_dialect = default.DefaultDialect(label_length=4)
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).alias('foo')
+        x = select([q])
+
         self.assert_compile(x,
             'SELECT '
                 'foo._1, foo._2 '
             'AS foo',
         dialect=compile_dialect)
 
+    def test_adjustable_4(self):
+        table1 = self.table1
+
         q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
         x = select([q], use_labels=True)
 
             'AS anon_1',
             dialect=compile_dialect)
 
+    def test_adjustable_5(self):
+        table1 = self.table1
+        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+        x = select([q], use_labels=True)
+
         compile_dialect = default.DefaultDialect(label_length=4)
         self.assert_compile(x,
             'SELECT '
             'AS _1',
             dialect=compile_dialect)
 
-    def test_adjustable_result_schema_column(self):
-        table1 = self.tables.table1
+
+    def test_adjustable_result_schema_column_1(self):
+        table1 = self.table1
+        q = table1.select(
+            table1.c.this_is_the_primarykey_column == 4).apply_labels().\
+                alias('foo')
+        dialect = default.DefaultDialect(label_length=10)
+
+        compiled = q.compile(dialect=dialect)
+        assert set(compiled.result_map['some_2'][1]).issuperset([
+                    table1.c.this_is_the_data_column,
+                    'some_large_named_table_this_is_the_data_column',
+                    'some_2'
+
+                ])
+        assert set(compiled.result_map['some_1'][1]).issuperset([
+                    table1.c.this_is_the_primarykey_column,
+                    'some_large_named_table_this_is_the_primarykey_column',
+                    'some_1'
+
+                ])
+
+    def test_adjustable_result_schema_column_2(self):
+        table1 = self.table1
 
         q = table1.select(
             table1.c.this_is_the_primarykey_column == 4).alias('foo')
         x = select([q])
 
-        e = testing_engine(options={'label_length': 10})
-        e.pool = testing.db.pool
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
-        eq_(row['this_1'], 4)
+        dialect = default.DefaultDialect(label_length=10)
 
-        q = table1.select(
-            table1.c.this_is_the_primarykey_column == 4).alias('foo')
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
+        compiled = x.compile(dialect=dialect)
+        assert set(compiled.result_map['this_2'][1]).issuperset([
+                    q.corresponding_column(table1.c.this_is_the_data_column),
+                    'this_is_the_data_column',
+                    'this_2'
 
-    def test_adjustable_result_lightweight_column(self):
+                ])
+        assert set(compiled.result_map['this_1'][1]).issuperset([
+                    q.corresponding_column(table1.c.this_is_the_primarykey_column),
+                    'this_is_the_primarykey_column',
+                    'this_1'
 
-        table1 = table('some_large_named_table',
-            column('this_is_the_primarykey_column'),
-            column('this_is_the_data_column')
+                ])
+
+    def test_table_plus_column_exceeds_length(self):
+        """test that the truncation only occurs when tablename + colname are
+        concatenated, if they are individually under the label length.
+
+        """
+
+        compile_dialect = default.DefaultDialect(label_length=30)
+        a_table = table(
+            'thirty_characters_table_xxxxxx',
+            column('id')
         )
 
-        q = table1.select(
-            table1.c.this_is_the_primarykey_column == 4).alias('foo')
-        x = select([q])
-
-        e = testing_engine(options={'label_length': 10})
-        e.pool = testing.db.pool
-        row = e.execute(x).first()
-        eq_(row.this_is_the_primarykey_column, 4)
-        eq_(row.this_1, 4)
-
-    def test_table_plus_column_exceeds_length(self):
-        '''test that the truncation occurs if tablename / colname are only
-        greater than the max when concatenated.'''
-
-        compile_dialect = default.DefaultDialect(label_length=30)
-        m = MetaData()
-        a_table = Table(
-            'thirty_characters_table_xxxxxx',
-            m,
-            Column('id', Integer, primary_key=True)
-        )
-
-        other_table = Table(
+        other_table = table(
             'other_thirty_characters_table_',
-            m,
-            Column('id', Integer, primary_key=True),
-            Column('thirty_characters_table_id',
-                Integer,
-                ForeignKey('thirty_characters_table_xxxxxx.id'),
-                primary_key=True
-            )
+            column('id'),
+            column('thirty_characters_table_id')
         )
 
         anon = a_table.alias()
+
+        j1 = other_table.outerjoin(anon,
+                anon.c.id == other_table.c.thirty_characters_table_id)
+
         self.assert_compile(
             select([other_table, anon]).
-                    select_from(other_table.outerjoin(anon)).apply_labels(),
+                    select_from(j1).apply_labels(),
             'SELECT '
                 'other_thirty_characters_table_.id '
                     'AS other_thirty_characters__1, '
                 'other_thirty_characters_table_.thirty_characters_table_id',
             dialect=compile_dialect)
 
-        self.assert_compile(
-            select([other_table, anon]).
-                select_from(other_table.outerjoin(anon)).apply_labels(),
-            'SELECT '
-                'other_thirty_characters_table_.id '
-                    'AS other_thirty_characters__1, '
-                'other_thirty_characters_table_.thirty_characters_table_id '
-                    'AS other_thirty_characters__2, '
-                'thirty_characters_table__1.id '
-                    'AS thirty_characters_table__3 '
-            'FROM '
-                'other_thirty_characters_table_ '
-            'LEFT OUTER JOIN '
-                'thirty_characters_table_xxxxxx AS thirty_characters_table__1 '
-            'ON thirty_characters_table__1.id = '
-                'other_thirty_characters_table_.thirty_characters_table_id',
-            dialect=compile_dialect
-        )
+