Commits

Tony Locke committed fe66ddb

PEP8 tidy of test/sql/test_types.py

  • Participants
  • Parent commits a88169c

Comments (0)

Files changed (1)

test/sql/test_types.py

 import decimal
 import datetime
 import os
-from sqlalchemy import *
+from sqlalchemy import (
+    Unicode, MetaData, PickleType, Boolean, TypeDecorator, Integer,
+    Interval, Float, Numeric, Text, CHAR, String, distinct, select, bindparam,
+    and_, func, Date, LargeBinary, literal, cast, text, Enum,
+    type_coerce, VARCHAR, Time, DateTime, BigInteger, SmallInteger, BOOLEAN,
+    BLOB, NCHAR, NVARCHAR, CLOB, TIME, DATE, DATETIME, TIMESTAMP, SMALLINT,
+    INTEGER, DECIMAL, NUMERIC, FLOAT, REAL)
 from sqlalchemy import exc, types, util, dialects
 for name in dialects.__all__:
     __import__("sqlalchemy.dialects.%s" % name)
 from sqlalchemy.testing.util import round_decimal
 from sqlalchemy.testing import fixtures
 
+
 class AdaptTest(fixtures.TestBase):
     def _all_dialect_modules(self):
         return [
     def _types_for_mod(self, mod):
         for key in dir(mod):
             typ = getattr(mod, key)
-            if not isinstance(typ, type) or not issubclass(typ, types.TypeEngine):
+            if not isinstance(typ, type) or \
+                    not issubclass(typ, types.TypeEngine):
                 continue
             yield typ
 
                 (TIME, ("TIME", "TIME WITHOUT TIME ZONE")),
                 (CLOB, "CLOB"),
                 (VARCHAR(10), ("VARCHAR(10)", "VARCHAR(10 CHAR)")),
-                (NVARCHAR(10), ("NVARCHAR(10)", "NATIONAL VARCHAR(10)",
-                                    "NVARCHAR2(10)")),
+                (NVARCHAR(10), (
+                    "NVARCHAR(10)", "NATIONAL VARCHAR(10)", "NVARCHAR2(10)")),
                 (CHAR, "CHAR"),
                 (NCHAR, ("NCHAR", "NATIONAL CHAR")),
                 (BLOB, ("BLOB", "BLOB SUB_TYPE 0")),
 
                 try:
                     compiled = types.to_instance(type_).\
-                            compile(dialect=dialect)
+                        compile(dialect=dialect)
                 except NotImplementedError:
                     continue
 
                 up_adaptions = [typ] + typ.__subclasses__()
                 yield False, typ, up_adaptions
                 for subcl in typ.__subclasses__():
-                    if subcl is not typ and \
-                        typ is not TypeDecorator and \
-                        "sqlalchemy" in subcl.__module__:
+                    if subcl is not typ and typ is not TypeDecorator and \
+                            "sqlalchemy" in subcl.__module__:
                         yield True, subcl, [typ]
 
         for is_down_adaption, typ, target_adaptions in adaptions():
         assert t1._type_affinity is String
         assert t1.dialect_impl(d)._type_affinity is postgresql.UUID
 
+
 class PickleMetadataTest(fixtures.TestBase):
     def testmeta(self):
         for loads, dumps in picklers():
 
         l = users.select().order_by(users.c.user_id).execute().fetchall()
         for assertstr, assertint, assertint2, row in zip(
-            ["BIND_INjackBIND_OUT", "BIND_INlalaBIND_OUT", "BIND_INfredBIND_OUT"],
+            [
+                "BIND_INjackBIND_OUT", "BIND_INlalaBIND_OUT",
+                "BIND_INfredBIND_OUT"],
             [1200, 1500, 900],
             [1800, 2250, 1350],
             l
             (Float(2), "FLOAT(2)", {'precision': 4}),
             (Numeric(19, 2), "NUMERIC(19, 2)", {}),
         ]:
-            for dialect_ in (dialects.postgresql, dialects.mssql, dialects.mysql):
+            for dialect_ in (
+                    dialects.postgresql, dialects.mssql, dialects.mysql):
                 dialect_ = dialect_.dialect()
 
                 raw_impl = types.to_instance(impl_, **kw)
                 raw_dialect_impl = raw_impl.dialect_impl(dialect_)
                 dec_dialect_impl = dec_type.dialect_impl(dialect_)
                 eq_(dec_dialect_impl.__class__, MyType)
-                eq_(raw_dialect_impl.__class__, dec_dialect_impl.impl.__class__)
+                eq_(
+                    raw_dialect_impl.__class__,
+                    dec_dialect_impl.impl.__class__)
 
                 self.assert_compile(
                     MyType(**kw),
             String().dialect_impl(dialect=sl).__class__
         )
         eq_(
-                t.dialect_impl(dialect=pg).impl.__class__,
-                Float().dialect_impl(pg).__class__
+            t.dialect_impl(dialect=pg).impl.__class__,
+            Float().dialect_impl(pg).__class__
         )
 
     def test_type_decorator_repr(self):
                 super(MyType, self).__init__()
                 self.foo = foo
                 self.dialect_specific_args = kwargs
+
             def adapt(self, cls):
                 return cls(foo=self.foo, **self.dialect_specific_args)
         t = MyType(bar='bar')
         eq_(a.foo, 'foo')
         eq_(a.dialect_specific_args['bar'], 'bar')
 
-
     @classmethod
     def define_tables(cls, metadata):
         class MyType(types.UserDefinedType):
             def get_col_spec(self):
                 return "VARCHAR(100)"
+
             def bind_processor(self, dialect):
                 def process(value):
                     return "BIND_IN" + value
                 return process
+
             def result_processor(self, dialect, coltype):
                 def process(value):
                     return value + "BIND_OUT"
                 return process
+
             def adapt(self, typeobj):
                 return typeobj()
 
         class MyDecoratedType(types.TypeDecorator):
             impl = String
+
             def bind_processor(self, dialect):
-                impl_processor = super(MyDecoratedType, self).bind_processor(dialect)\
-                                        or (lambda value: value)
+                impl_processor = super(MyDecoratedType, self).\
+                    bind_processor(dialect) or (lambda value: value)
+
                 def process(value):
                     return "BIND_IN" + impl_processor(value)
                 return process
+
             def result_processor(self, dialect, coltype):
-                impl_processor = super(MyDecoratedType, self).result_processor(dialect, coltype)\
-                                        or (lambda value: value)
+                impl_processor = super(MyDecoratedType, self).\
+                    result_processor(dialect, coltype) or (lambda value: value)
+
                 def process(value):
                     return impl_processor(value) + "BIND_OUT"
                 return process
+
             def copy(self):
                 return MyDecoratedType()
 
             impl = Unicode
 
             def bind_processor(self, dialect):
-                impl_processor = super(MyUnicodeType, self).bind_processor(dialect)\
-                                        or (lambda value: value)
+                impl_processor = super(MyUnicodeType, self).\
+                    bind_processor(dialect) or (lambda value: value)
 
                 def process(value):
                     return "BIND_IN" + impl_processor(value)
                 return process
 
             def result_processor(self, dialect, coltype):
-                impl_processor = super(MyUnicodeType, self).result_processor(dialect, coltype)\
-                                        or (lambda value: value)
+                impl_processor = super(MyUnicodeType, self).\
+                    result_processor(dialect, coltype) or (lambda value: value)
+
                 def process(value):
                     return impl_processor(value) + "BIND_OUT"
                 return process
             def copy(self):
                 return MyUnicodeType(self.impl.length)
 
-        Table('users', metadata,
+        Table(
+            'users', metadata,
             Column('user_id', Integer, primary_key=True),
             # totall custom type
             Column('goofy', MyType, nullable=False),
             Column('goofy9', MyNewIntSubClass, nullable=False),
         )
 
+
 class TypeCoerceCastTest(fixtures.TablesTest):
 
     @classmethod
 
         cls.MyType = MyType
 
-        Table('t', metadata,
-                    Column('data', String(50))
-                )
+        Table('t', metadata, Column('data', String(50)))
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_insert_round_trip_cast(self):
         self._test_insert_round_trip(cast)
 
             [('BIND_INd1BIND_OUT', )]
         )
 
-    @testing.fails_on("oracle",
-                "ORA-00906: missing left parenthesis - "
-                "seems to be CAST(:param AS type)")
+    @testing.fails_on(
+        "oracle", "ORA-00906: missing left parenthesis - "
+        "seems to be CAST(:param AS type)")
     def test_coerce_from_nulltype_cast(self):
         self._test_coerce_from_nulltype(cast)
 
             [('BIND_INTHISISMYOBJBIND_OUT',)]
         )
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_vs_non_coerced_cast(self):
         self._test_vs_non_coerced(cast)
 
         t.insert().values(data=coerce_fn('d1', MyType)).execute()
 
         eq_(
-            select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+            select(
+                [t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
             [('BIND_INd1', 'BIND_INd1BIND_OUT')]
         )
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_vs_non_coerced_alias_cast(self):
         self._test_vs_non_coerced_alias(cast)
 
 
         eq_(
             select([t.c.data, coerce_fn(t.c.data, MyType)]).
-                    alias().select().execute().fetchall(),
+            alias().select().execute().fetchall(),
             [('BIND_INd1', 'BIND_INd1BIND_OUT')]
         )
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_vs_non_coerced_where_cast(self):
         self._test_vs_non_coerced_where(cast)
 
 
         # coerce on left side
         eq_(
-            select([t.c.data, coerce_fn(t.c.data, MyType)]).\
-                        where(coerce_fn(t.c.data, MyType) == 'd1').\
-                        execute().fetchall(),
+            select([t.c.data, coerce_fn(t.c.data, MyType)]).
+            where(coerce_fn(t.c.data, MyType) == 'd1').execute().fetchall(),
             [('BIND_INd1', 'BIND_INd1BIND_OUT')]
         )
 
         # coerce on right side
         eq_(
-            select([t.c.data, coerce_fn(t.c.data, MyType)]).\
-                        where(t.c.data == coerce_fn('d1', MyType)).\
-                        execute().fetchall(),
+            select([t.c.data, coerce_fn(t.c.data, MyType)]).
+            where(t.c.data == coerce_fn('d1', MyType)).execute().fetchall(),
             [('BIND_INd1', 'BIND_INd1BIND_OUT')]
         )
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_coerce_none_cast(self):
         self._test_coerce_none(cast)
 
         t = self.tables.t
         t.insert().values(data=coerce_fn('d1', MyType)).execute()
         eq_(
-            select([t.c.data, coerce_fn(t.c.data, MyType)]).\
-                        where(t.c.data == coerce_fn(None, MyType)).\
-                        execute().fetchall(),
+            select([t.c.data, coerce_fn(t.c.data, MyType)]).
+            where(t.c.data == coerce_fn(None, MyType)).execute().fetchall(),
             []
         )
 
         eq_(
-            select([t.c.data, coerce_fn(t.c.data, MyType)]).\
-                        where(coerce_fn(t.c.data, MyType) == None).\
-                        execute().fetchall(),
+            select([t.c.data, coerce_fn(t.c.data, MyType)]).
+            where(coerce_fn(t.c.data, MyType) == None).  # noqa
+            execute().fetchall(),
             []
         )
 
-    @testing.fails_on("oracle",
-                "oracle doesn't like CAST in the VALUES of an INSERT")
+    @testing.fails_on(
+        "oracle", "oracle doesn't like CAST in the VALUES of an INSERT")
     def test_resolve_clause_element_cast(self):
         self._test_resolve_clause_element(cast)
 
             [('BIND_INd1', 'BIND_INd1BIND_OUT')]
         )
 
-    @testing.fails_on("oracle",
-                "ORA-00906: missing left parenthesis - "
-                "seems to be CAST(:param AS type)")
+    @testing.fails_on(
+        "oracle", "ORA-00906: missing left parenthesis - "
+        "seems to be CAST(:param AS type)")
     def test_cast_existing_typed(self):
         MyType = self.MyType
         coerce_fn = cast
             select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
             [('BIND_INd1BIND_OUT', )])
 
+
 class VariantTest(fixtures.TestBase, AssertsCompiledSQL):
     def setup(self):
         class UTypeOne(types.UserDefinedType):
             def get_col_spec(self):
                 return "UTYPEONE"
+
             def bind_processor(self, dialect):
                 def process(value):
                     return value + "UONE"
         class UTypeTwo(types.UserDefinedType):
             def get_col_spec(self):
                 return "UTYPETWO"
+
             def bind_processor(self, dialect):
                 def process(value):
                     return value + "UTWO"
         self.UTypeTwo = UTypeTwo
         self.UTypeThree = UTypeThree
         self.variant = self.UTypeOne().with_variant(
-                            self.UTypeTwo(), 'postgresql')
-        self.composite = self.variant.with_variant(
-                            self.UTypeThree(), 'mysql')
+            self.UTypeTwo(), 'postgresql')
+        self.composite = self.variant.with_variant(self.UTypeThree(), 'mysql')
 
     def test_illegal_dupe(self):
         v = self.UTypeOne().with_variant(
             "in the mapping for this Variant",
             lambda: v.with_variant(self.UTypeThree(), 'postgresql')
         )
+
     def test_compile(self):
         self.assert_compile(
             self.variant,
     def test_bind_process(self):
         eq_(
             self.variant._cached_bind_processor(
-                    dialects.mysql.dialect())('foo'),
+                dialects.mysql.dialect())('foo'),
             'fooUONE'
         )
         eq_(
             self.variant._cached_bind_processor(
-                    default.DefaultDialect())('foo'),
+                default.DefaultDialect())('foo'),
             'fooUONE'
         )
         eq_(
             self.variant._cached_bind_processor(
-                    dialects.postgresql.dialect())('foo'),
+                dialects.postgresql.dialect())('foo'),
             'fooUTWO'
         )
 
     def test_bind_process_composite(self):
         assert self.composite._cached_bind_processor(
-                    dialects.mysql.dialect()) is None
+            dialects.mysql.dialect()) is None
         eq_(
             self.composite._cached_bind_processor(
-                    default.DefaultDialect())('foo'),
+                default.DefaultDialect())('foo'),
             'fooUONE'
         )
         eq_(
             self.composite._cached_bind_processor(
-                    dialects.postgresql.dialect())('foo'),
+                dialects.postgresql.dialect())('foo'),
             'fooUTWO'
         )
 
+
 class UnicodeTest(fixtures.TestBase):
     """Exercise the Unicode and related types.
 
                 expected
             )
 
-    data = util.u("Alors vous imaginez ma surprise, au lever du jour, quand "\
-            "une drôle de petite voix m’a réveillé. "\
-            "Elle disait: « S’il vous plaît… dessine-moi un mouton! »")
+    data = util.u(
+        "Alors vous imaginez ma surprise, au lever du jour, quand "
+        "une drôle de petite voix m’a réveillé. "
+        "Elle disait: « S’il vous plaît… dessine-moi un mouton! »")
 
     def test_unicode_warnings_typelevel_native_unicode(self):
 
             assert isinstance(uni(unicodedata), str)
         else:
             assert_raises(exc.SAWarning, uni, 'x')
-            assert isinstance(uni(unicodedata), unicode)
+            assert isinstance(uni(unicodedata), unicode)  # noqa
 
     def test_unicode_warnings_typelevel_sqla_unicode(self):
         unicodedata = self.data
             unicodedata.encode('ascii', 'ignore').decode()
         )
 
+enum_table = non_native_enum_table = metadata = None
+
 
 class EnumTest(AssertsCompiledSQL, fixtures.TestBase):
     @classmethod
     def setup_class(cls):
         global enum_table, non_native_enum_table, metadata
         metadata = MetaData(testing.db)
-        enum_table = Table('enum_table', metadata,
-            Column("id", Integer, primary_key=True),
+        enum_table = Table(
+            'enum_table', metadata, Column("id", Integer, primary_key=True),
             Column('someenum', Enum('one', 'two', 'three', name='myenum'))
         )
 
-        non_native_enum_table = Table('non_native_enum_table', metadata,
+        non_native_enum_table = Table(
+            'non_native_enum_table', metadata,
             Column("id", Integer, primary_key=True),
             Column('someenum', Enum('one', 'two', 'three', native_enum=False)),
         )
     def teardown_class(cls):
         metadata.drop_all()
 
-    @testing.fails_on('postgresql+zxjdbc',
-                        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
-                        'but expression is of type character varying')
+    @testing.fails_on(
+        'postgresql+zxjdbc',
+        'zxjdbc fails on ENUM: column "XXX" is of type XXX '
+        'but expression is of type character varying')
     def test_round_trip(self):
         enum_table.insert().execute([
             {'id': 1, 'someenum': 'two'},
 
         eq_(
             non_native_enum_table.select().
-                    order_by(non_native_enum_table.c.id).execute().fetchall(),
+            order_by(non_native_enum_table.c.id).execute().fetchall(),
             [
                 (1, 'two'),
                 (2, 'two'),
         eq_(e1.adapt(ENUM).name, 'foo')
         eq_(e1.adapt(ENUM).schema, 'bar')
 
-    @testing.crashes('mysql',
-                    'Inconsistent behavior across various OS/drivers'
-                )
+    @testing.crashes(
+        'mysql', 'Inconsistent behavior across various OS/drivers')
     def test_constraint(self):
-        assert_raises(exc.DBAPIError,
-            enum_table.insert().execute,
-            {'id': 4, 'someenum': 'four'}
-        )
+        assert_raises(
+            exc.DBAPIError, enum_table.insert().execute,
+            {'id': 4, 'someenum': 'four'})
 
     def test_non_native_constraint_custom_type(self):
         class Foob(object):
         class MyEnum(types.SchemaType, TypeDecorator):
             def __init__(self, values):
                 self.impl = Enum(
-                                *[v.name for v in values],
-                                name="myenum",
-                                native_enum=False
-                            )
-
+                    *[v.name for v in values], name="myenum",
+                    native_enum=False)
 
             def _set_table(self, table, column):
                 self.impl._set_table(table, column)
 
         m = MetaData()
         t1 = Table('t', m, Column('x', MyEnum([Foob('a'), Foob('b')])))
-        const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+        const = [
+            c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
 
         self.assert_compile(
             AddConstraint(const),
             dialect="default"
         )
 
-
-
-    @testing.fails_on('mysql',
-                    "the CHECK constraint doesn't raise an exception for unknown reason")
+    @testing.fails_on(
+        'mysql',
+        "the CHECK constraint doesn't raise an exception for unknown reason")
     def test_non_native_constraint(self):
-        assert_raises(exc.DBAPIError,
-            non_native_enum_table.insert().execute,
+        assert_raises(
+            exc.DBAPIError, non_native_enum_table.insert().execute,
             {'id': 4, 'someenum': 'four'}
         )
 
         are created with checkfirst=False"""
 
         e = engines.mock_engine()
-        t = Table('t1', MetaData(),
-            Column('x', Enum("x", "y", name="pge"))
-        )
+        t = Table('t1', MetaData(), Column('x', Enum("x", "y", name="pge")))
         t.create(e, checkfirst=False)
         # basically looking for the start of
         # the constraint, or the ENUM def itself,
         assert "('x'," in e.print_sql()
 
     def test_repr(self):
-        e = Enum("x", "y", name="somename", convert_unicode=True,
-                        quote=True, inherit_schema=True)
-        eq_(
-            repr(e),
-            "Enum('x', 'y', name='somename', inherit_schema=True)"
-        )
+        e = Enum(
+            "x", "y", name="somename", convert_unicode=True, quote=True,
+            inherit_schema=True)
+        eq_(repr(e), "Enum('x', 'y', name='somename', inherit_schema=True)")
+
+binary_table = MyPickleType = metadata = None
+
 
 class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
     __excluded_on__ = (
                 return value
 
         metadata = MetaData(testing.db)
-        binary_table = Table('binary_table', metadata,
-            Column('primary_id', Integer, primary_key=True, test_needs_autoincrement=True),
+        binary_table = Table(
+            'binary_table', metadata,
+            Column(
+                'primary_id', Integer, primary_key=True,
+                test_needs_autoincrement=True),
             Column('data', LargeBinary),
             Column('data_slice', LargeBinary(100)),
             Column('misc', String(30)),
         stream1 = self.load_stream('binary_data_one.dat')
         stream2 = self.load_stream('binary_data_two.dat')
         binary_table.insert().execute(
-                            primary_id=1,
-                            misc='binary_data_one.dat',
-                            data=stream1,
-                            data_slice=stream1[0:100],
-                            pickled=testobj1,
-                            mypickle=testobj3)
+            primary_id=1, misc='binary_data_one.dat', data=stream1,
+            data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
         binary_table.insert().execute(
-                            primary_id=2,
-                            misc='binary_data_two.dat',
-                            data=stream2,
-                            data_slice=stream2[0:99],
-                            pickled=testobj2)
+            primary_id=2, misc='binary_data_two.dat', data=stream2,
+            data_slice=stream2[0:99], pickled=testobj2)
         binary_table.insert().execute(
-                            primary_id=3,
-                            misc='binary_data_two.dat',
-                            data=None,
-                            data_slice=stream2[0:99],
-                            pickled=None)
+            primary_id=3, misc='binary_data_two.dat', data=None,
+            data_slice=stream2[0:99], pickled=None)
 
         for stmt in (
             binary_table.select(order_by=binary_table.c.primary_id),
             text(
                 "select * from binary_table order by binary_table.primary_id",
-                typemap={'pickled': PickleType,
-                        'mypickle': MyPickleType,
-                        'data': LargeBinary, 'data_slice': LargeBinary},
+                typemap={
+                    'pickled': PickleType, 'mypickle': MyPickleType,
+                    'data': LargeBinary, 'data_slice': LargeBinary},
                 bind=testing.db)
         ):
             l = stmt.execute().fetchall()
 
         data = os.urandom(32)
         binary_table.insert().execute(data=data)
-        eq_(binary_table.select().
-                    where(binary_table.c.data == data).alias().
-                    count().scalar(), 1)
-
+        eq_(
+            binary_table.select().where(binary_table.c.data == data).alias().
+            count().scalar(), 1)
 
     @testing.requires.binary_literals
     def test_literal_roundtrip(self):
         compiled = select([cast(literal(util.b("foo")), LargeBinary)]).compile(
-                            dialect=testing.db.dialect,
-                            compile_kwargs={"literal_binds": True})
+            dialect=testing.db.dialect, compile_kwargs={"literal_binds": True})
         result = testing.db.execute(compiled)
         eq_(result.scalar(), util.b("foo"))
 
         with open(f, mode='rb') as o:
             return o.read()
 
-class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+test_table = meta = MyCustomType = MyTypeDec = None
+
+
+class ExpressionTest(
+        fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
     __dialect__ = 'default'
 
     @classmethod
         class MyCustomType(types.UserDefinedType):
             def get_col_spec(self):
                 return "INT"
+
             def bind_processor(self, dialect):
                 def process(value):
                     return value * 10
                 return process
+
             def result_processor(self, dialect, coltype):
                 def process(value):
                     return value / 10
 
         class MyOldCustomType(MyCustomType):
             def adapt_operator(self, op):
-                return {operators.add: operators.sub,
+                return {
+                    operators.add: operators.sub,
                     operators.sub: operators.add}.get(op, op)
 
         class MyTypeDec(types.TypeDecorator):
                 return value + "BIND_OUT"
 
         meta = MetaData(testing.db)
-        test_table = Table('test', meta,
+        test_table = Table(
+            'test', meta,
             Column('id', Integer, primary_key=True),
             Column('data', String(30)),
             Column('atimestamp', Date),
             Column('avalue', MyCustomType),
             Column('bvalue', MyTypeDec(50)),
-            )
+        )
 
         meta.create_all()
 
         test_table.insert().execute({
-                                'id': 1,
-                                'data': 'somedata',
-                                'atimestamp': datetime.date(2007, 10, 15),
-                                'avalue': 25, 'bvalue': 'foo'})
+            'id': 1, 'data': 'somedata',
+            'atimestamp': datetime.date(2007, 10, 15), 'avalue': 25,
+            'bvalue': 'foo'})
 
     @classmethod
     def teardown_class(cls):
 
         eq_(
             testing.db.execute(
-                    select([test_table.c.id, test_table.c.data, test_table.c.atimestamp])
-                    .where(expr),
-                    {"thedate": datetime.date(2007, 10, 15)}).fetchall(),
-            [(1, 'somedata', datetime.date(2007, 10, 15))]
+                select([
+                    test_table.c.id, test_table.c.data,
+                    test_table.c.atimestamp]).where(expr),
+                {"thedate": datetime.date(2007, 10, 15)}).fetchall(), [
+                    (1, 'somedata', datetime.date(2007, 10, 15))]
         )
 
         expr = test_table.c.avalue == bindparam("somevalue")
         eq_(expr.right.type._type_affinity, MyCustomType)
 
         eq_(
-            testing.db.execute(test_table.select().where(expr),
-             {'somevalue': 25}).fetchall(),
-            [(1, 'somedata', datetime.date(2007, 10, 15), 25,
-             'BIND_INfooBIND_OUT')]
+            testing.db.execute(
+                test_table.select().where(expr), {'somevalue': 25}
+            ).fetchall(), [(
+                1, 'somedata', datetime.date(2007, 10, 15), 25,
+                'BIND_INfooBIND_OUT')]
         )
 
         expr = test_table.c.bvalue == bindparam("somevalue")
         eq_(expr.right.type._type_affinity, String)
 
         eq_(
-            testing.db.execute(test_table.select().where(expr),
-                {"somevalue": "foo"}).fetchall(),
-            [(1, 'somedata',
-                datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
+            testing.db.execute(
+                test_table.select().where(expr), {"somevalue": "foo"}
+            ).fetchall(), [(
+                1, 'somedata', datetime.date(2007, 10, 15), 25,
+                'BIND_INfooBIND_OUT')]
         )
 
     def test_bind_adapt_update(self):
         expr = column('foo', CHAR) == "asdf"
         eq_(expr.right.type.__class__, CHAR)
 
-
     def test_typedec_operator_adapt(self):
         expr = test_table.c.bvalue + "hi"
 
         class CoerceNothing(TypeDecorator):
             coerce_to_is_types = ()
             impl = Integer
+
         class CoerceBool(TypeDecorator):
             coerce_to_is_types = (bool, )
             impl = Boolean
+
         class CoerceNone(TypeDecorator):
             coerce_to_is_types = (type(None),)
             impl = Integer
         c3 = column('x', CoerceNone())
 
         self.assert_compile(
-            and_(c1 == None, c2 == None, c3 == None),
+            and_(c1 == None, c2 == None, c3 == None),  # noqa
             "x = :x_1 AND x = :x_2 AND x IS NULL"
         )
         self.assert_compile(
-            and_(c1 == True, c2 == True, c3 == True),
+            and_(c1 == True, c2 == True, c3 == True),  # noqa
             "x = :x_1 AND x = true AND x = :x_2",
             dialect=default.DefaultDialect(supports_native_boolean=True)
         )
             dialect=default.DefaultDialect(supports_native_boolean=True)
         )
 
-
     def test_typedec_righthand_coercion(self):
         class MyTypeDec(types.TypeDecorator):
             impl = String
         from sqlalchemy.sql import column
         import operator
 
-        for op in (
-            operator.add,
-            operator.mul,
-            operator.truediv,
-            operator.sub
-        ):
+        for op in (operator.add, operator.mul, operator.truediv, operator.sub):
             for other in (Numeric(10, 2), Integer):
                 expr = op(
-                        column('bar', types.Numeric(10, 2)),
-                        column('foo', other)
-                       )
+                    column('bar', types.Numeric(10, 2)),
+                    column('foo', other)
+                )
                 assert isinstance(expr.type, types.Numeric)
                 expr = op(
-                        column('foo', other),
-                        column('bar', types.Numeric(10, 2))
-                       )
+                    column('foo', other),
+                    column('bar', types.Numeric(10, 2))
+                )
                 assert isinstance(expr.type, types.Numeric)
 
     def test_null_comparison(self):
         assert distinct(test_table.c.data).type == test_table.c.data.type
         assert test_table.c.data.distinct().type == test_table.c.data.type
 
+
 class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
     __dialect__ = 'default'
 
         self.assert_compile(String(50), "VARCHAR(50)")
 
     def test_string_collation(self):
-        self.assert_compile(String(50, collation="FOO"),
-                'VARCHAR(50) COLLATE "FOO"')
+        self.assert_compile(
+            String(50, collation="FOO"), 'VARCHAR(50) COLLATE "FOO"')
 
     def test_char_plain(self):
         self.assert_compile(CHAR(), "CHAR")
         self.assert_compile(CHAR(50), "CHAR(50)")
 
     def test_char_collation(self):
-        self.assert_compile(CHAR(50, collation="FOO"),
-                'CHAR(50) COLLATE "FOO"')
+        self.assert_compile(
+            CHAR(50, collation="FOO"), 'CHAR(50) COLLATE "FOO"')
 
     def test_text_plain(self):
         self.assert_compile(Text(), "TEXT")
         self.assert_compile(Text(50), "TEXT(50)")
 
     def test_text_collation(self):
-        self.assert_compile(Text(collation="FOO"),
-                'TEXT COLLATE "FOO"')
+        self.assert_compile(
+            Text(collation="FOO"), 'TEXT COLLATE "FOO"')
 
     def test_default_compile_pg_inet(self):
-        self.assert_compile(dialects.postgresql.INET(), "INET",
-                allow_dialect_select=True)
+        self.assert_compile(
+            dialects.postgresql.INET(), "INET", allow_dialect_select=True)
 
     def test_default_compile_pg_float(self):
-        self.assert_compile(dialects.postgresql.FLOAT(), "FLOAT",
-                allow_dialect_select=True)
+        self.assert_compile(
+            dialects.postgresql.FLOAT(), "FLOAT", allow_dialect_select=True)
 
     def test_default_compile_mysql_integer(self):
         self.assert_compile(
-                dialects.mysql.INTEGER(display_width=5), "INTEGER(5)",
-                allow_dialect_select=True)
+            dialects.mysql.INTEGER(display_width=5), "INTEGER(5)",
+            allow_dialect_select=True)
 
     def test_numeric_plain(self):
         self.assert_compile(types.NUMERIC(), 'NUMERIC')
         self.assert_compile(types.DECIMAL(2, 4), 'DECIMAL(2, 4)')
 
 
-
-
 class NumericRawSQLTest(fixtures.TestBase):
     """Test what DBAPIs and dialects return without any typing
     information supplied at the SQLA level.
 
     """
     def _fixture(self, metadata, type, data):
-        t = Table('t', metadata,
-            Column("val", type)
-        )
+        t = Table('t', metadata, Column("val", type))
         metadata.create_all()
         t.insert().execute(val=data)
 
         else:
             eq_(val, 46.583)
 
-
+interval_table = metadata = None
 
 
 class IntervalTest(fixtures.TestBase, AssertsExecutionResults):
     def setup_class(cls):
         global interval_table, metadata
         metadata = MetaData(testing.db)
-        interval_table = Table("intervaltable", metadata,
-            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+        interval_table = Table(
+            "intervaltable", metadata,
+            Column(
+                "id", Integer, primary_key=True,
+                test_needs_autoincrement=True),
             Column("native_interval", Interval()),
-            Column("native_interval_args", Interval(day_precision=3, second_precision=6)),
-            Column("non_native_interval", Interval(native=False)),
-            )
+            Column(
+                "native_interval_args",
+                Interval(day_precision=3, second_precision=6)),
+            Column(
+                "non_native_interval", Interval(native=False)),
+        )
         metadata.create_all()
 
     @engines.close_first
         assert adapted.native is False
         eq_(str(adapted), "DATETIME")
 
-    @testing.fails_on("postgresql+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
-    @testing.fails_on("oracle+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
+    @testing.fails_on(
+        "postgresql+zxjdbc",
+        "Not yet known how to pass values of the INTERVAL type")
+    @testing.fails_on(
+        "oracle+zxjdbc",
+        "Not yet known how to pass values of the INTERVAL type")
     def test_roundtrip(self):
         small_delta = datetime.timedelta(days=15, seconds=5874)
         delta = datetime.timedelta(414)
         interval_table.insert().execute(
-                                native_interval=small_delta,
-                                native_interval_args=delta,
-                                non_native_interval=delta
-                                )
+            native_interval=small_delta, native_interval_args=delta,
+            non_native_interval=delta)
         row = interval_table.select().execute().first()
         eq_(row['native_interval'], small_delta)
         eq_(row['native_interval_args'], delta)
         eq_(row['non_native_interval'], delta)
 
-    @testing.fails_on("oracle+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
+    @testing.fails_on(
+        "oracle+zxjdbc",
+        "Not yet known how to pass values of the INTERVAL type")
     def test_null(self):
-        interval_table.insert().execute(id=1, native_inverval=None, non_native_interval=None)
+        interval_table.insert().execute(
+            id=1, native_inverval=None, non_native_interval=None)
         row = interval_table.select().execute().first()
         eq_(row['native_interval'], None)
         eq_(row['native_interval_args'], None)
         eq_(row['non_native_interval'], None)
 
 
-class BooleanTest(fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL):
+class BooleanTest(
+        fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL):
     """test edge cases for booleans.  Note that the main boolean test suite
     is now in testing/suite/test_types.py
 
     """
     @classmethod
     def define_tables(cls, metadata):
-        Table('boolean_table', metadata,
+        Table(
+            'boolean_table', metadata,
             Column('id', Integer, primary_key=True, autoincrement=False),
             Column('value', Boolean),
             Column('unconstrained_value', Boolean(create_constraint=False)),
-            )
+        )
 
-    @testing.fails_on('mysql',
-            "The CHECK clause is parsed but ignored by all storage engines.")
-    @testing.fails_on('mssql',
-            "FIXME: MS-SQL 2005 doesn't honor CHECK ?!?")
+    @testing.fails_on(
+        'mysql',
+        "The CHECK clause is parsed but ignored by all storage engines.")
+    @testing.fails_on(
+        'mssql', "FIXME: MS-SQL 2005 doesn't honor CHECK ?!?")
     @testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
     def test_constraint(self):
-        assert_raises((exc.IntegrityError, exc.ProgrammingError),
-                        testing.db.execute,
-                        "insert into boolean_table (id, value) values(1, 5)")
+        assert_raises(
+            (exc.IntegrityError, exc.ProgrammingError),
+            testing.db.execute,
+            "insert into boolean_table (id, value) values(1, 5)")
 
     @testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
     def test_unconstrained(self):
         testing.db.execute(
-            "insert into boolean_table (id, unconstrained_value) values (1, 5)")
+            "insert into boolean_table (id, unconstrained_value)"
+            "values (1, 5)")
 
     def test_non_native_constraint_custom_type(self):
         class Foob(object):
 
         m = MetaData()
         t1 = Table('t', m, Column('x', MyBool()))
-        const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+        const = [
+            c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
 
         self.assert_compile(
             AddConstraint(const),
         ):
             assert p1.compare_values(p1.copy_value(obj), obj)
 
-        assert_raises(NotImplementedError,
-                        p1.compare_values,
-                        pickleable.BrokenComparable('foo'),
-                        pickleable.BrokenComparable('foo'))
+        assert_raises(
+            NotImplementedError, p1.compare_values,
+            pickleable.BrokenComparable('foo'),
+            pickleable.BrokenComparable('foo'))
 
     def test_nonmutable_comparison(self):
         p1 = PickleType()
         ):
             assert p1.compare_values(p1.copy_value(obj), obj)
 
+meta = None
+
+
 class CallableTest(fixtures.TestBase):
     @classmethod
     def setup_class(cls):
     def test_callable_as_arg(self):
         ucode = util.partial(Unicode)
 
-        thing_table = Table('thing', meta,
-            Column('name', ucode(20))
+        thing_table = Table(
+            'thing', meta, Column('name', ucode(20))
         )
         assert isinstance(thing_table.c.name.type, Unicode)
         thing_table.create()
     def test_callable_as_kwarg(self):
         ucode = util.partial(Unicode)
 
-        thang_table = Table('thang', meta,
-            Column('name', type_=ucode(20), primary_key=True)
+        thang_table = Table(
+            'thang', meta, Column('name', type_=ucode(20), primary_key=True)
         )
         assert isinstance(thang_table.c.name.type, Unicode)
         thang_table.create()
-