Commits

diana committed 72c2942

just a pep8 pass of lib/sqlalchemy/testing/

  • Participants
  • Parent commits 88d50ae

Comments (0)

Files changed (12)

lib/sqlalchemy/testing/pickleable.py

-"""Classes used in pickling tests, need to be at the module level for unpickling."""
+"""Classes used in pickling tests, need to be at the module level for
+unpickling.
+"""
 
 from . import fixtures
 
+
 class User(fixtures.ComparableEntity):
     pass
 
+
 class Order(fixtures.ComparableEntity):
     pass
 
+
 class Dingaling(fixtures.ComparableEntity):
     pass
 
+
 class EmailUser(User):
     pass
 
+
 class Address(fixtures.ComparableEntity):
     pass
 
+
 # TODO: these are kind of arbitrary....
 class Child1(fixtures.ComparableEntity):
     pass
 
+
 class Child2(fixtures.ComparableEntity):
     pass
 
+
 class Parent(fixtures.ComparableEntity):
     pass
 
+
 class Screen(object):
+
     def __init__(self, obj, parent=None):
         self.obj = obj
         self.parent = parent
 
+
 class Foo(object):
+
     def __init__(self, moredata):
         self.data = 'im data'
         self.stuff = 'im stuff'
         self.moredata = moredata
+
     __hash__ = object.__hash__
+
     def __eq__(self, other):
         return other.data == self.data and \
                 other.stuff == self.stuff and \
 
 
 class Bar(object):
+
     def __init__(self, x, y):
         self.x = x
         self.y = y
+
     __hash__ = object.__hash__
-    def __eq__(self, other):
-        return other.__class__ is self.__class__ and \
-            other.x == self.x and \
-            other.y == self.y
-    def __str__(self):
-        return "Bar(%d, %d)" % (self.x, self.y)
 
-class OldSchool:
-    def __init__(self, x, y):
-        self.x = x
-        self.y = y
     def __eq__(self, other):
         return other.__class__ is self.__class__ and \
             other.x == self.x and \
             other.y == self.y
 
-class OldSchoolWithoutCompare:
+    def __str__(self):
+        return "Bar(%d, %d)" % (self.x, self.y)
+
+
+class OldSchool:
+
     def __init__(self, x, y):
         self.x = x
         self.y = y
 
-class BarWithoutCompare(object):
+    def __eq__(self, other):
+        return other.__class__ is self.__class__ and \
+            other.x == self.x and \
+            other.y == self.y
+
+
+class OldSchoolWithoutCompare:
+
     def __init__(self, x, y):
         self.x = x
         self.y = y
+
+
+class BarWithoutCompare(object):
+
+    def __init__(self, x, y):
+        self.x = x
+        self.y = y
+
     def __str__(self):
         return "Bar(%d, %d)" % (self.x, self.y)
 
 
 class NotComparable(object):
+
     def __init__(self, data):
         self.data = data
 
 
 
 class BrokenComparable(object):
+
     def __init__(self, data):
         self.data = data
 
 
     def __ne__(self, other):
         raise NotImplementedError
-

lib/sqlalchemy/testing/profiling.py

 
 _current_test = None
 
+
 def profiled(target=None, **target_opts):
     """Function profiling.
 
                 else:
                     stats.print_stats()
 
-                print_callers = target_opts.get('print_callers',
-                                                profile_config['print_callers'])
+                print_callers = target_opts.get(
+                    'print_callers', profile_config['print_callers'])
                 if print_callers:
                     stats.print_callers()
 
-                print_callees = target_opts.get('print_callees',
-                                                profile_config['print_callees'])
+                print_callees = target_opts.get(
+                    'print_callees', profile_config['print_callees'])
                 if print_callees:
                     stats.print_callees()
 
 
     """
     def __init__(self, filename):
-        self.write = config.options is not None and config.options.write_profiles
+        self.write = (
+            config.options is not None and
+            config.options.write_profiles
+        )
         self.fname = os.path.abspath(filename)
         self.short_fname = os.path.split(self.fname)[-1]
-        self.data = collections.defaultdict(lambda: collections.defaultdict(dict))
+        self.data = collections.defaultdict(
+            lambda: collections.defaultdict(dict))
         self._read()
         if self.write:
             # rewrite for the case where features changed,
 
     def has_stats(self):
         test_key = _current_test
-        return test_key in self.data and self.platform_key in self.data[test_key]
+        return (
+            test_key in self.data and
+            self.platform_key in self.data[test_key]
+        )
 
     def result(self, callcount):
         test_key = _current_test
         per_platform['current_count'] += 1
         return result
 
-
     def _header(self):
         return \
         "# %s\n"\
         "# assertions are raised if the counts do not match.\n"\
         "# \n"\
         "# To add a new callcount test, apply the function_call_count \n"\
-        "# decorator and re-run the tests using the --write-profiles option - \n"\
-        "# this file will be rewritten including the new count.\n"\
+        "# decorator and re-run the tests using the --write-profiles \n"\
+        "# option - this file will be rewritten including the new count.\n"\
         "# \n"\
         "" % (self.fname)
 
             test_key, platform_key, counts = line.split()
             per_fn = self.data[test_key]
             per_platform = per_fn[platform_key]
-            per_platform['counts'] = [int(count) for count in counts.split(",")]
+            c = [int(count) for count in counts.split(",")]
+            per_platform['counts'] = c
             per_platform['lineno'] = lineno + 1
             per_platform['current_count'] = 0
         profile_f.close()
             profile_f.write("\n# TEST: %s\n\n" % test_key)
             for platform_key in sorted(per_fn):
                 per_platform = per_fn[platform_key]
-                profile_f.write(
-                    "%s %s %s\n" % (
-                        test_key,
-                        platform_key, ",".join(str(count) for count in per_platform['counts'])
-                    )
-                )
+                c = ",".join(str(count) for count in per_platform['counts'])
+                profile_f.write("%s %s %s\n" % (test_key, platform_key, c))
         profile_f.close()
 
 from sqlalchemy.util.compat import update_wrapper
 
+
 def function_call_count(variance=0.05):
     """Assert a target for a test case's function call count.
 
     def decorate(fn):
         def wrap(*args, **kw):
 
-
             if cProfile is None:
                 raise SkipTest("cProfile is not installed")
 
 
             gc_collect()
 
-
             timespent, load_stats, fn_result = _profile(
                 fn, *args, **kw
             )
                 if abs(callcount - expected_count) > deviance:
                     raise AssertionError(
                         "Adjusted function call count %s not within %s%% "
-                        "of expected %s. (Delete line %d of file %s to regenerate "
-                            "this callcount, when tests are run with --write-profiles.)"
+                        "of expected %s. (Delete line %d of file %s to "
+                        "regenerate this callcount, when tests are run "
+                        "with --write-profiles.)"
                         % (
                         callcount, (variance * 100),
                         expected_count, line_no,
     ended = time.time()
 
     return ended - began, load_stats, locals()['result']
-

lib/sqlalchemy/testing/requirements.py

 
 from . import exclusions
 
+
 class Requirements(object):
     def __init__(self, db, config):
         self.db = db
         """
         return exclusions.open()
 
-
     @property
     def datetime(self):
         """target dialect supports representation of Python
 
     @property
     def empty_strings_varchar(self):
-        """target database can persist/return an empty string with a varchar."""
+        """target database can persist/return an empty string with a
+        varchar.
 
+        """
         return exclusions.open()
 
     @property
 
         return exclusions.open()
 
-
     @property
     def update_from(self):
         """Target must support UPDATE..FROM syntax"""

lib/sqlalchemy/testing/runner.py

 
 import nose
 
+
 def main():
     nose.main(addplugins=[NoseSQLAlchemy()])

lib/sqlalchemy/testing/schema.py

 
 table_options = {}
 
+
 def Table(*args, **kw):
     """A schema.Table wrapper/hook for dialect-specific tweaks."""
 
         event.listen(col, 'after_parent_attach', add_seq, propagate=True)
     return col
 
+
 def _truncate_name(dialect, name):
     if len(name) > dialect.max_identifier_length:
         return name[0:max(dialect.max_identifier_length - 6, 0)] + \
                 "_" + hex(hash(name) % 64)[2:]
     else:
         return name
-

lib/sqlalchemy/testing/suite/test_ddl.py

                 (1, 'some data')
             )
 
-
     @requirements.create_table
     @util.provide_metadata
     def test_create_table(self):
         )
         self._simple_roundtrip()
 
-
     @requirements.drop_table
     @util.provide_metadata
     def test_drop_table(self):
         )
 
 
-__all__ = ('TableDDLTest', )
+__all__ = ('TableDDLTest', )

lib/sqlalchemy/testing/suite/test_insert.py

 
 from ..schema import Table, Column
 
+
 class LastrowidTest(fixtures.TablesTest):
     run_deletes = 'each'
 
         else:
             engine = config.db
 
-
         r = engine.execute(
             self.tables.autoinc_pk.insert(),
             data="some data"
         assert r.is_insert
         assert not r.returns_rows
 
+
 class ReturningTest(fixtures.TablesTest):
     run_deletes = 'each'
     __requires__ = 'returning', 'autoincrement_insert'
 
 
 __all__ = ('LastrowidTest', 'InsertBehaviorTest', 'ReturningTest')
-
-

lib/sqlalchemy/testing/suite/test_reflection.py

 
 metadata, users = None, None
 
+
 class HasTableTest(fixtures.TablesTest):
     @classmethod
     def define_tables(cls, metadata):
             assert config.db.dialect.has_table(conn, "test_table")
             assert not config.db.dialect.has_table(conn, "nonexistent_table")
 
+
 class HasSequenceTest(fixtures.TestBase):
     __requires__ = 'sequences',
 
         self._test_get_table_oid('users', schema='test_schema')
 
 
-__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')
+__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')

lib/sqlalchemy/testing/suite/test_types.py

 from ..schema import Table, Column
 import datetime
 
+
 class _UnicodeFixture(object):
     __requires__ = 'unicode_data',
 
         for row in rows:
             assert isinstance(row[0], unicode)
 
-
     def _test_empty_strings(self):
         unicode_table = self.tables.unicode_table
 
                 ).first()
         eq_(row, (u'',))
 
+
 class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
     __requires__ = 'unicode_data',
 
     datatype = Unicode(255)
 
-
     @requirements.empty_strings_varchar
     def test_empty_strings_varchar(self):
         self._test_empty_strings()
 
+
 class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
     __requires__ = 'unicode_data', 'text_type'
 
         foo.create(config.db)
         foo.drop(config.db)
 
+
 class _DateFixture(object):
     compare = None
 
     datatype = DateTime
     data = datetime.datetime(2012, 10, 15, 12, 57, 18)
 
+
 class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'datetime_microseconds',
     datatype = DateTime
     data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
 
+
 class TimeTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'time',
     datatype = Time
     data = datetime.time(12, 57, 18)
 
+
 class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'time_microseconds',
     datatype = Time
     data = datetime.time(12, 57, 18, 396)
 
+
 class DateTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'date',
     datatype = Date
     data = datetime.date(2012, 10, 15)
 
+
 class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'date',
     datatype = Date
     data = datetime.datetime(2012, 10, 15, 12, 57, 18)
     compare = datetime.date(2012, 10, 15)
 
+
 class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'datetime_historic',
     datatype = DateTime
     data = datetime.datetime(1850, 11, 10, 11, 52, 35)
 
+
 class DateHistoricTest(_DateFixture, fixtures.TablesTest):
     __requires__ = 'date_historic',
     datatype = Date
             'DateTimeHistoricTest', 'DateTimeCoercedToDateTimeTest',
             'TimeMicrosecondsTest', 'TimeTest', 'DateTimeMicrosecondsTest',
             'DateHistoricTest', 'StringTest')
-
-
-

lib/sqlalchemy/testing/suite/test_update_delete.py

 from .. import fixtures, config
-from ..config import requirements
 from ..assertions import eq_
-from .. import engines
 
-from sqlalchemy import Integer, String, select
+from sqlalchemy import Integer, String
 from ..schema import Table, Column
 
 
             ]
         )
 
-__all__ = ('SimpleUpdateDeleteTest', )
+__all__ = ('SimpleUpdateDeleteTest', )

lib/sqlalchemy/testing/util.py

 else:
     # assume CPython - straight gc.collect, lazy_gc() is a pass
     gc_collect = gc.collect
+
     def lazy_gc():
         pass
 
+
 def picklers():
     picklers = set()
     # Py2K
                     ).to_integral(decimal.ROUND_FLOOR) / \
                         pow(10, prec)
 
+
 class RandomSet(set):
     def __iter__(self):
         l = list(set.__iter__(self))
     def copy(self):
         return RandomSet(self)
 
+
 def conforms_partial_ordering(tuples, sorted_elements):
     """True if the given sorting conforms to the given partial ordering."""
 
     else:
         return True
 
+
 def all_partial_orderings(tuples, elements):
     edges = defaultdict(set)
     for parent, child in tuples:
     return fn
 
 
-
 def run_as_contextmanager(ctx, fn, *arg, **kw):
     """Run the given function under the given contextmanager,
     simulating the behavior of 'with' to support older
         else:
             return raise_
 
+
 def rowset(results):
     """Converts the results of sql execution into a plain set of column tuples.
 
         metadata.drop_all()
         self.metadata = prev_meta
 
+
 class adict(dict):
     """Dict keys available as attributes.  Shadows."""
     def __getattribute__(self, key):
 
     def get_all(self, *keys):
         return tuple([self[key] for key in keys])
-
-

lib/sqlalchemy/testing/warnings.py

 from .. import exc as sa_exc
 from .. import util
 
+
 def testing_warn(msg, stacklevel=3):
     """Replaces sqlalchemy.util.warn during tests."""
 
     else:
         warnings.warn_explicit(msg, filename, lineno)
 
+
 def resetwarnings():
     """Reset warning behavior to testing defaults."""
 
     warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning)
     warnings.filterwarnings('error', category=sa_exc.SAWarning)
 
+
 def assert_warnings(fn, warnings):
     """Assert that each of the given warnings are emitted by fn."""
 
 
     canary = []
     orig_warn = util.warn
+
     def capture_warnings(*args, **kw):
         orig_warn(*args, **kw)
         popwarn = warnings.pop(0)