Commits

jason kirtland committed 4242b93

- Experimentally moved a few test packages to fully explicit imports (no *), pyflaked

Comments (0)

Files changed (13)

test/base/dependency.py

 import testenv; testenv.configure_for_tests()
 import sqlalchemy.topological as topological
 from sqlalchemy import util
-from testlib import *
+from testlib import TestBase
 
 
 class DependencySortTest(TestBase):

test/base/except.py

 """Tests exceptions and DB-API exception wrapping."""
 import testenv; testenv.configure_for_tests()
-import sys, unittest
+import unittest
 import exceptions as stdlib_exceptions
 from sqlalchemy import exc as sa_exceptions
-from testlib import *
 
 
 class Error(stdlib_exceptions.StandardError):

test/base/utils.py

 import testenv; testenv.configure_for_tests()
-import threading, time, unittest
+import threading, unittest
 from sqlalchemy import util, sql, exc
-from testlib import *
+from testlib import TestBase, set
 from testlib import sorted
 
 class OrderedDictTest(TestBase):

test/engine/bind.py

 including the deprecated versions of these arguments"""
 
 import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
 from sqlalchemy import engine, exc
-from testlib import *
+from sqlalchemy import MetaData, ThreadLocalMetaData
+from testlib.sa import Table, Column, Integer, String, func, Sequence, text
+from testlib import TestBase, testing
 
 
 class BindTest(TestBase):
 
     @testing.future
     def test_create_drop_err2(self):
+        metadata = MetaData()
+        table = Table('test_table', metadata,
+            Column('foo', Integer))
+
         for meth in [
             table.exists,
             table.create,

test/engine/ddlevents.py

 import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exc
 from sqlalchemy.schema import DDL
-import sqlalchemy
-from testlib import *
+from sqlalchemy import create_engine
+from testlib.sa import MetaData, Table, Column, Integer, String
+import testlib.sa as tsa
+from testlib import TestBase, testing
 
 
 class DDLEventTest(TestBase):
             try:
                 r = eval(py)
                 assert False
-            except exc.UnboundExecutionError:
+            except tsa.exc.UnboundExecutionError:
                 pass
 
         for bind in engine, cx:
         engine = create_engine(testing.db.name + '://',
                                strategy='mock', executor=executor)
         engine.dialect.identifier_preparer = \
-           sqlalchemy.sql.compiler.IdentifierPreparer(engine.dialect)
+           tsa.sql.compiler.IdentifierPreparer(engine.dialect)
         return engine
 
     def test_tokens(self):

test/engine/execute.py

 import testenv; testenv.configure_for_tests()
 import re
-from sqlalchemy import *
-from sqlalchemy import exc
-from testlib import *
 from sqlalchemy.interfaces import ConnectionProxy
-from testlib import engines
+from testlib.sa import MetaData, Table, Column, Integer, String, INT, \
+     VARCHAR, func
+import testlib.sa as tsa
+from testlib import TestBase, testing, engines
 
+
+users, metadata = None, None
 class ExecuteTest(TestBase):
     def setUpAll(self):
         global users, metadata
             try:
                 conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef")
                 assert False
-            except exc.DBAPIError:
+            except tsa.exc.DBAPIError:
                 assert True
 
 class ProxyConnectionTest(TestBase):

test/engine/metadata.py

 import testenv; testenv.configure_for_tests()
 import pickle
-from sqlalchemy import *
-from sqlalchemy import exc
-from testlib import *
+from sqlalchemy import MetaData
+from testlib.sa import Table, Column, Integer, String, UniqueConstraint, \
+     CheckConstraint, ForeignKey
+import testlib.sa as tsa
+from testlib import TestBase, ComparesTables, testing
 
 
 class MetaDataTest(TestBase, ComparesTables):
                 t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
                     Column('col2', String(20)))
                 assert False
-            except exc.InvalidRequestError, e:
+            except tsa.exc.InvalidRequestError, e:
                 assert str(e) == "Table 'table1' is already defined for this MetaData instance.  Specify 'useexisting=True' to redefine options and columns on an existing Table object."
         finally:
             metadata.drop_all()
             meta.drop_all(testing.db)
 
     def test_nonexistent(self):
-        self.assertRaises(exc.NoSuchTableError, Table,
+        self.assertRaises(tsa.exc.NoSuchTableError, Table,
                           'fake_table',
                           MetaData(testing.db), autoload=True)
 

test/engine/parseconnect.py

 import testenv; testenv.configure_for_tests()
 import ConfigParser, StringIO
-from sqlalchemy import *
-from sqlalchemy import exc, pool, engine
 import sqlalchemy.engine.url as url
-from testlib import *
+from sqlalchemy import create_engine, engine_from_config
+import testlib.sa as tsa
+from testlib import TestBase
 
 
 class ParseConnectTest(TestBase):
             }
 
         prefixed = dict(ini.items('prefixed'))
-        self.assert_(engine._coerce_config(prefixed, 'sqlalchemy.') == expected)
+        self.assert_(tsa.engine._coerce_config(prefixed, 'sqlalchemy.') == expected)
 
         plain = dict(ini.items('plain'))
-        self.assert_(engine._coerce_config(plain, '') == expected)
+        self.assert_(tsa.engine._coerce_config(plain, '') == expected)
 
     def test_engine_from_config(self):
         dbapi = MockDBAPI()
         try:
             c = e.connect()
             assert False
-        except exc.DBAPIError:
+        except tsa.exc.DBAPIError:
             assert True
 
     def test_urlattr(self):
         assert e.pool._recycle == 50
 
         # these args work for QueuePool
-        e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=pool.QueuePool, module=MockDBAPI())
+        e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.QueuePool, module=MockDBAPI())
 
         try:
             # but not SingletonThreadPool
-            e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=pool.SingletonThreadPool)
+            e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.SingletonThreadPool)
             assert False
         except TypeError:
             assert True

test/engine/pool.py

 import testenv; testenv.configure_for_tests()
-import threading, thread, time, gc
-from sqlalchemy import exc, interfaces, pool
-from testlib import *
+import threading, time, gc
+from sqlalchemy import pool
+import testlib.sa as tsa
+from testlib import TestBase
 
 
 mcid = 1
         try:
             c4 = p.connect()
             assert False
-        except exc.TimeoutError, e:
+        except tsa.exc.TimeoutError, e:
             assert int(time.time() - now) == 2
 
     def test_timeout_race(self):
                 now = time.time()
                 try:
                     c1 = p.connect()
-                except exc.TimeoutError, e:
+                except tsa.exc.TimeoutError, e:
                     timeouts.append(int(time.time()) - now)
                     continue
                 time.sleep(4)
                     peaks.append(p.overflow())
                     con.close()
                     del con
-                except exc.TimeoutError:
+                except tsa.exc.TimeoutError:
                     pass
         threads = []
         for i in xrange(thread_count):
                 # con can be None if invalidated
                 assert record is not None
                 self.checked_in.append(con)
-        class ListenAll(interfaces.PoolListener, InstrumentingListener):
+        class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
             pass
         class ListenConnect(InstrumentingListener):
             def connect(self, con, record):

test/engine/reconnect.py

 import testenv; testenv.configure_for_tests()
-import sys, weakref
-from sqlalchemy import create_engine, exc, select, MetaData, Table, Column, Integer, String
-from testlib import *
+import weakref
+from testlib.sa import select, MetaData, Table, Column, Integer, String
+import testlib.sa as tsa
+from testlib import TestBase, testing, engines
 
 
 class MockDisconnect(Exception):
     def close(self):
         pass
 
+db, dbapi = None, None
 class MockReconnectTest(TestBase):
     def setUp(self):
         global db, dbapi
         dbapi = MockDBAPI()
 
         # create engine using our current dburi
-        db = create_engine('postgres://foo:bar@localhost/test', module=dbapi)
+        db = tsa.create_engine('postgres://foo:bar@localhost/test', module=dbapi)
 
         # monkeypatch disconnect checker
         db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError:
+        except tsa.exc.DBAPIError:
             pass
 
         # assert was invalidated
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError:
+        except tsa.exc.DBAPIError:
             pass
 
         # assert was invalidated
         try:
             conn.execute(select([1]))
             assert False
-        except exc.InvalidRequestError, e:
+        except tsa.exc.InvalidRequestError, e:
             assert str(e) == "Can't reconnect until invalid transaction is rolled back"
 
         assert trans.is_active
         try:
             trans.commit()
             assert False
-        except exc.InvalidRequestError, e:
+        except tsa.exc.InvalidRequestError, e:
             assert str(e) == "Can't reconnect until invalid transaction is rolled back"
 
         assert trans.is_active
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError:
+        except tsa.exc.DBAPIError:
             pass
 
         assert not conn.closed
         assert not conn.invalidated
         assert len(dbapi.connections) == 1
 
-
+engine = None
 class RealReconnectTest(TestBase):
     def setUp(self):
         global engine
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError, e:
+        except tsa.exc.DBAPIError, e:
             if not e.connection_invalidated:
                 raise
 
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError, e:
+        except tsa.exc.DBAPIError, e:
             if not e.connection_invalidated:
                 raise
         assert conn.invalidated
         assert not conn.invalidated
 
         conn.close()
-    
+
     def test_close(self):
         conn = engine.connect()
         self.assertEquals(conn.execute(select([1])).scalar(), 1)
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError, e:
+        except tsa.exc.DBAPIError, e:
             if not e.connection_invalidated:
                 raise
 
         try:
             conn.execute(select([1]))
             assert False
-        except exc.DBAPIError, e:
+        except tsa.exc.DBAPIError, e:
             if not e.connection_invalidated:
                 raise
 
         try:
             conn.execute(select([1]))
             assert False
-        except exc.InvalidRequestError, e:
+        except tsa.exc.InvalidRequestError, e:
             assert str(e) == "Can't reconnect until invalid transaction is rolled back"
 
         assert trans.is_active
         try:
             trans.commit()
             assert False
-        except exc.InvalidRequestError, e:
+        except tsa.exc.InvalidRequestError, e:
             assert str(e) == "Can't reconnect until invalid transaction is rolled back"
 
         assert trans.is_active
         self.assertEquals(conn.execute(select([1])).scalar(), 1)
         assert not conn.invalidated
 
+meta, table, engine = None, None, None
 class InvalidateDuringResultTest(TestBase):
     def setUp(self):
         global meta, table, engine
         table.insert().execute(
             [{'id':i, 'name':'row %d' % i} for i in range(1, 100)]
         )
-        
+
     def tearDown(self):
         meta.drop_all()
         engine.dispose()
-    
-    @testing.fails_on('mysql')    
+
+    @testing.fails_on('mysql')
     def test_invalidate_on_results(self):
         conn = engine.connect()
-        
+
         result = conn.execute("select * from sometable")
         for x in xrange(20):
             result.fetchone()
-        
+
         engine.test_shutdown()
         try:
             result.fetchone()
             assert False
-        except exc.DBAPIError, e:
+        except tsa.exc.DBAPIError, e:
             if not e.connection_invalidated:
                 raise
 
         assert conn.invalidated
-        
+
 if __name__ == '__main__':
     testenv.main()

test/engine/reflection.py

 import testenv; testenv.configure_for_tests()
 import StringIO, unicodedata
-from sqlalchemy import *
-from sqlalchemy import exc
-from sqlalchemy import types as sqltypes
-from testlib import *
-from testlib import engines
+import sqlalchemy as sa
+from testlib.sa import MetaData, Table, Column
+from testlib import TestBase, ComparesTables, testing, engines, sa as tsa
+from testlib.compat import set
 
 
+metadata, users = None, None
+
 class ReflectionTest(TestBase, ComparesTables):
 
     @testing.exclude('mysql', '<', (4, 1, 1))
         meta = MetaData(testing.db)
 
         users = Table('engine_users', meta,
-            Column('user_id', INT, primary_key=True),
-            Column('user_name', VARCHAR(20), nullable=False),
-            Column('test1', CHAR(5), nullable=False),
-            Column('test2', Float(5), nullable=False),
-            Column('test3', Text),
-            Column('test4', Numeric, nullable = False),
-            Column('test5', DateTime),
-            Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')),
-            Column('test6', DateTime, nullable=False),
-            Column('test7', Text),
-            Column('test8', Binary),
-            Column('test_passivedefault2', Integer, PassiveDefault("5")),
-            Column('test9', Binary(100)),
-            Column('test_numeric', Numeric()),
+            Column('user_id', sa.INT, primary_key=True),
+            Column('user_name', sa.VARCHAR(20), nullable=False),
+            Column('test1', sa.CHAR(5), nullable=False),
+            Column('test2', sa.Float(5), nullable=False),
+            Column('test3', sa.Text),
+            Column('test4', sa.Numeric, nullable = False),
+            Column('test5', sa.DateTime),
+            Column('parent_user_id', sa.Integer,
+                   sa.ForeignKey('engine_users.user_id')),
+            Column('test6', sa.DateTime, nullable=False),
+            Column('test7', sa.Text),
+            Column('test8', sa.Binary),
+            Column('test_passivedefault2', sa.Integer, sa.PassiveDefault("5")),
+            Column('test9', sa.Binary(100)),
+            Column('test_numeric', sa.Numeric()),
             test_needs_fk=True,
         )
 
         addresses = Table('engine_email_addresses', meta,
-            Column('address_id', Integer, primary_key = True),
-            Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
-            Column('email_address', String(20)),
+            Column('address_id', sa.Integer, primary_key = True),
+            Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+            Column('email_address', sa.String(20)),
             test_needs_fk=True,
         )
         meta.create_all()
 
         try:
             meta2 = MetaData()
-            reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db)
-            reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db)
+            reflected_users = Table('engine_users', meta2, autoload=True,
+                                    autoload_with=testing.db)
+            reflected_addresses = Table('engine_email_addresses', meta2,
+                                        autoload=True, autoload_with=testing.db)
             self.assert_tables_equal(users, reflected_users)
             self.assert_tables_equal(addresses, reflected_addresses)
         finally:
 
     def test_include_columns(self):
         meta = MetaData(testing.db)
-        foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']])
+        foo = Table('foo', meta, *[Column(n, sa.String(30))
+                                   for n in ['a', 'b', 'c', 'd', 'e', 'f']])
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
-            foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
+            foo = Table('foo', meta2, autoload=True,
+                        include_columns=['b', 'f', 'e'])
             # test that cols come back in original order
             self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
             for c in ('b', 'f', 'e'):
                 assert c in foo.c
             for c in ('a', 'c', 'd'):
                 assert c not in foo.c
-                
+
             # test against a table which is already reflected
             meta3 = MetaData(testing.db)
             foo = Table('foo', meta3, autoload=True)
-            foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True)
+            foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
+                        useexisting=True)
             self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
             for c in ('b', 'f', 'e'):
                 assert c in foo.c
     def test_unknown_types(self):
         meta = MetaData(testing.db)
         t = Table("test", meta,
-            Column('foo', DateTime))
+            Column('foo', sa.DateTime))
 
         import sys
         dialect_module = sys.modules[testing.db.dialect.__module__]
                 m2 = MetaData(testing.db)
                 t2 = Table("test", m2, autoload=True)
                 assert False
-            except exc.SAWarning:
+            except tsa.exc.SAWarning:
                 assert True
 
             @testing.emits_warning('Did not recognize type')
             def warns():
                 m3 = MetaData(testing.db)
                 t3 = Table("test", m3, autoload=True)
-                assert t3.c.foo.type.__class__ == sqltypes.NullType
+                assert t3.c.foo.type.__class__ == sa.types.NullType
 
         finally:
             dialect_module.ischema_names = ischema_names
         meta = MetaData(testing.db)
         table = Table(
             'override_test', meta,
-            Column('col1', Integer, primary_key=True),
-            Column('col2', String(20)),
-            Column('col3', Numeric)
+            Column('col1', sa.Integer, primary_key=True),
+            Column('col2', sa.String(20)),
+            Column('col3', sa.Numeric)
         )
         table.create()
 
         try:
             table = Table(
                 'override_test', meta2,
-                Column('col2', Unicode()),
-                Column('col4', String(30)), autoload=True)
+                Column('col2', sa.Unicode()),
+                Column('col4', sa.String(30)), autoload=True)
 
-            self.assert_(isinstance(table.c.col1.type, Integer))
-            self.assert_(isinstance(table.c.col2.type, Unicode))
-            self.assert_(isinstance(table.c.col4.type, String))
+            self.assert_(isinstance(table.c.col1.type, sa.Integer))
+            self.assert_(isinstance(table.c.col2.type, sa.Unicode))
+            self.assert_(isinstance(table.c.col4.type, sa.String))
         finally:
             table.drop()
 
 
         meta = MetaData(testing.db)
         users = Table('users', meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(30)))
+            Column('id', sa.Integer, primary_key=True),
+            Column('name', sa.String(30)))
         addresses = Table('addresses', meta,
-            Column('id', Integer, primary_key=True),
-            Column('street', String(30)))
+            Column('id', sa.Integer, primary_key=True),
+            Column('street', sa.String(30)))
 
 
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
             a2 = Table('addresses', meta2,
-                Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+                Column('id', sa.Integer,
+                       sa.ForeignKey('users.id'), primary_key=True),
                 autoload=True)
             u2 = Table('users', meta2, autoload=True)
 
             meta3 = MetaData(testing.db)
             u3 = Table('users', meta3, autoload=True)
             a3 = Table('addresses', meta3,
-                Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+                Column('id', sa.Integer, sa.ForeignKey('users.id'),
+                       primary_key=True),
                 autoload=True)
 
             assert list(a3.primary_key) == [a3.c.id]
 
         meta = MetaData(testing.db)
         users = Table('users', meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(30)))
+            Column('id', sa.Integer, primary_key=True),
+            Column('name', sa.String(30)))
         addresses = Table('addresses', meta,
-            Column('id', Integer, primary_key=True),
-            Column('street', String(30)),
-            Column('user_id', Integer))
+            Column('id', sa.Integer, primary_key=True),
+            Column('street', sa.String(30)),
+            Column('user_id', sa.Integer))
 
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
             a2 = Table('addresses', meta2,
-                Column('user_id', Integer, ForeignKey('users.id')),
+                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
                 autoload=True)
             u2 = Table('users', meta2, autoload=True)
 
             meta3 = MetaData(testing.db)
             u3 = Table('users', meta3, autoload=True)
             a3 = Table('addresses', meta3,
-                Column('user_id', Integer, ForeignKey('users.id')),
+                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
                 autoload=True)
 
             assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
 
             meta4 = MetaData(testing.db)
             u4 = Table('users', meta4,
-                       Column('id', Integer, key='u_id', primary_key=True),
+                       Column('id', sa.Integer, key='u_id', primary_key=True),
                        autoload=True)
             a4 = Table('addresses', meta4,
-                       Column('id', Integer, key='street', primary_key=True),
-                       Column('street', String(30), key='user_id'),
-                       Column('user_id', Integer, ForeignKey('users.u_id'),
+                       Column('id', sa.Integer, key='street', primary_key=True),
+                       Column('street', sa.String(30), key='user_id'),
+                       Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'),
                               key='id'),
                        autoload=True)
 
 
         meta = MetaData(testing.db)
         users = Table('users', meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(30)),
+            Column('id', sa.Integer, primary_key=True),
+            Column('name', sa.String(30)),
             test_needs_fk=True)
         addresses = Table('addresses', meta,
-            Column('id', Integer,primary_key=True),
-            Column('user_id', Integer, ForeignKey('users.id')),
+            Column('id', sa.Integer, primary_key=True),
+            Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
             test_needs_fk=True)
 
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
             a2 = Table('addresses', meta2,
-                Column('user_id',Integer, ForeignKey('users.id')),
+                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
                 autoload=True)
             u2 = Table('users', meta2, autoload=True)
 
 
             meta2 = MetaData(testing.db)
             u2 = Table('users', meta2, 
-                Column('id', Integer, primary_key=True),
+                Column('id', sa.Integer, primary_key=True),
                 autoload=True)
             a2 = Table('addresses', meta2,
-                Column('id', Integer, primary_key=True),
-                Column('user_id',Integer, ForeignKey('users.id')),
+                Column('id', sa.Integer, primary_key=True),
+                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
                 autoload=True)
 
             assert len(a2.foreign_keys) == 1
             assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
         finally:
             meta.drop_all()
-    
+
     def test_use_existing(self):
         meta = MetaData(testing.db)
         users = Table('users', meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(30)),
+            Column('id', sa.Integer, primary_key=True),
+            Column('name', sa.String(30)),
             test_needs_fk=True)
         addresses = Table('addresses', meta,
-            Column('id', Integer,primary_key=True),
-            Column('user_id', Integer, ForeignKey('users.id')),
-            Column('data', String(100)),
+            Column('id', sa.Integer,primary_key=True),
+            Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
+            Column('data', sa.String(100)),
             test_needs_fk=True)
 
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
-            addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True)
+            addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True)
             try:
-                users = Table('users', meta2, Column('name', Unicode), autoload=True)
+                users = Table('users', meta2, Column('name', sa.Unicode), autoload=True)
                 assert False
-            except exc.InvalidRequestError, err:
+            except tsa.exc.InvalidRequestError, err:
                 assert str(err) == "Table 'users' is already defined for this MetaData instance.  Specify 'useexisting=True' to redefine options and columns on an existing Table object."
             
-            users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True)
-            assert isinstance(users.c.name.type, Unicode)
+            users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True)
+            assert isinstance(users.c.name.type, sa.Unicode)
 
             assert not users.quote
             
     def test_fk_error(self):
         metadata = MetaData(testing.db)
         slots_table = Table('slots', metadata,
-            Column('slot_id', Integer, primary_key=True),
-            Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')),
-            Column('slot', String(128)),
+            Column('slot_id', sa.Integer, primary_key=True),
+            Column('pkg_id', sa.Integer, sa.ForeignKey('pkgs.pkg_id')),
+            Column('slot', sa.String(128)),
             )
         try:
             metadata.create_all()
             assert False
-        except exc.InvalidRequestError, err:
+        except tsa.exc.InvalidRequestError, err:
             assert str(err) == "Could not find table 'pkgs' with which to generate a foreign key"
 
     def test_composite_pks(self):
         meta = MetaData(testing.db)
         multi = Table(
             'multi', meta,
-            Column('multi_id', Integer, primary_key=True),
-            Column('multi_rev', Integer, primary_key=True),
-            Column('multi_hoho', Integer, primary_key=True),
-            Column('name', String(50), nullable=False),
-            Column('val', String(100)),
+            Column('multi_id', sa.Integer, primary_key=True),
+            Column('multi_rev', sa.Integer, primary_key=True),
+            Column('multi_hoho', sa.Integer, primary_key=True),
+            Column('name', sa.String(50), nullable=False),
+            Column('val', sa.String(100)),
             test_needs_fk=True,
         )
         multi2 = Table('multi2', meta,
-            Column('id', Integer, primary_key=True),
-            Column('foo', Integer),
-            Column('bar', Integer),
-            Column('lala', Integer),
-            Column('data', String(50)),
-            ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
+            Column('id', sa.Integer, primary_key=True),
+            Column('foo', sa.Integer),
+            Column('bar', sa.Integer),
+            Column('lala', sa.Integer),
+            Column('data', sa.String(50)),
+            sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
             test_needs_fk=True,
         )
         meta.create_all()
             table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db)
             self.assert_tables_equal(multi, table)
             self.assert_tables_equal(multi2, table2)
-            j = join(table, table2)
-            self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
+            j = sa.join(table, table2)
+            self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
         finally:
             meta.drop_all()
 
         # check a table that uses an SQL reserved name doesn't cause an error
         meta = MetaData(testing.db)
         table_a = Table('select', meta,
-                       Column('not', Integer, primary_key=True),
-                       Column('from', String(12), nullable=False),
-                       UniqueConstraint('from', name='when'))
-        Index('where', table_a.c['from'])
+                       Column('not', sa.Integer, primary_key=True),
+                       Column('from', sa.String(12), nullable=False),
+                       sa.UniqueConstraint('from', name='when'))
+        sa.Index('where', table_a.c['from'])
 
         # There's currently no way to calculate identifier case normalization
         # in isolation, so...
         quoter = meta.bind.dialect.identifier_preparer.quote_identifier
 
         table_b = Table('false', meta,
-                        Column('create', Integer, primary_key=True),
-                        Column('true', Integer, ForeignKey('select.not')),
-                        CheckConstraint('%s <> 1' % quoter(check_col),
+                        Column('create', sa.Integer, primary_key=True),
+                        Column('true', sa.Integer, sa.ForeignKey('select.not')),
+                        sa.CheckConstraint('%s <> 1' % quoter(check_col),
                                         name='limit'))
 
         table_c = Table('is', meta,
-                        Column('or', Integer, nullable=False, primary_key=True),
-                        Column('join', Integer, nullable=False, primary_key=True),
-                        PrimaryKeyConstraint('or', 'join', name='to'))
+                        Column('or', sa.Integer, nullable=False, primary_key=True),
+                        Column('join', sa.Integer, nullable=False, primary_key=True),
+                        sa.PrimaryKeyConstraint('or', 'join', name='to'))
 
-        index_c = Index('else', table_c.c.join)
+        index_c = sa.Index('else', table_c.c.join)
 
         meta.create_all()
 
 
         baseline = MetaData(testing.db)
         for name in names:
-            Table(name, baseline, Column('id', Integer, primary_key=True))
+            Table(name, baseline, Column('id', sa.Integer, primary_key=True))
         baseline.create_all()
 
         try:
             try:
                 m4.reflect(only=['rt_a', 'rt_f'])
                 self.assert_(False)
-            except exc.InvalidRequestError, e:
+            except tsa.exc.InvalidRequestError, e:
                 self.assert_(e.args[0].endswith('(rt_f)'))
 
             m5 = MetaData(testing.db)
             try:
                 m8 = MetaData(reflect=True)
                 self.assert_(False)
-            except exc.ArgumentError, e:
+            except tsa.exc.ArgumentError, e:
                 self.assert_(
                     e.args[0] ==
                     "A bind must be supplied in conjunction with reflect=True")
         global metadata, users
         metadata = MetaData()
         users = Table('users', metadata,
-                      Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
-                      Column('user_name', String(40)),
+                      Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True),
+                      Column('user_name', sa.String(40)),
                       )
 
         addresses = Table('email_addresses', metadata,
-            Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True),
-            Column('user_id', Integer, ForeignKey(users.c.user_id)),
-            Column('email_address', String(40)),
+            Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True),
+            Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+            Column('email_address', sa.String(40)),
         )
 
         orders = Table('orders', metadata,
-            Column('order_id', Integer, Sequence('order_id_seq', optional=True), primary_key = True),
-            Column('user_id', Integer, ForeignKey(users.c.user_id)),
-            Column('description', String(50)),
-            Column('isopen', Integer),
+            Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True),
+            Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+            Column('description', sa.String(50)),
+            Column('isopen', sa.Integer),
         )
 
         orderitems = Table('items', metadata,
-            Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True),
-            Column('order_id', INT, ForeignKey("orders")),
-            Column('item_name', VARCHAR(50)),
+            Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True),
+            Column('order_id', sa.INT, sa.ForeignKey("orders")),
+            Column('item_name', sa.VARCHAR(50)),
         )
 
     def test_sorter( self ):
     def test_append_constraint_unique(self):
         meta = MetaData()
         
-        users = Table('users', meta, Column('id', Integer))
-        addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer))
+        users = Table('users', meta, Column('id', sa.Integer))
+        addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer))
         
-        fk = ForeignKeyConstraint(['user_id'],[users.c.id])
+        fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id])
         
         addresses.append_constraint(fk)
         addresses.append_constraint(fk)
                 names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'])
 
             for name in names:
-                Table(name, metadata, Column('id', Integer, Sequence(name + "_id_seq"), primary_key=True))
+                Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"), primary_key=True))
             metadata.create_all()
 
             reflected = set(bind.table_names())
     def test_iteration(self):
         metadata = MetaData()
         table1 = Table('table1', metadata,
-            Column('col1', Integer, primary_key=True),
+            Column('col1', sa.Integer, primary_key=True),
             schema='someschema')
         table2 = Table('table2', metadata,
-            Column('col1', Integer, primary_key=True),
-            Column('col2', Integer, ForeignKey('someschema.table1.col1')),
+            Column('col1', sa.Integer, primary_key=True),
+            Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')),
             schema='someschema')
         # ensure this doesnt crash
         print [t for t in metadata.table_iterator()]
         buf = StringIO.StringIO()
         def foo(s, p=None):
             buf.write(s)
-        gen = create_engine(testing.db.name + "://", strategy="mock", executor=foo)
+        gen = sa.create_engine(testing.db.name + "://", strategy="mock", executor=foo)
         gen = gen.dialect.schemagenerator(gen.dialect, gen)
         gen.traverse(table1)
         gen.traverse(table2)
 
         metadata = MetaData(engine)
         table1 = Table('table1', metadata,
-                       Column('col1', Integer, primary_key=True),
+                       Column('col1', sa.Integer, primary_key=True),
                        schema=schema)
         table2 = Table('table2', metadata,
-                       Column('col1', Integer, primary_key=True),
-                       Column('col2', Integer,
-                              ForeignKey('%s.table1.col1' % schema)),
+                       Column('col1', sa.Integer, primary_key=True),
+                       Column('col2', sa.Integer,
+                              sa.ForeignKey('%s.table1.col1' % schema)),
                        schema=schema)
         try:
             metadata.create_all()
         global metadata, users
         metadata = MetaData()
         users = Table('users', metadata,
-                      Column('user_id', Integer, Sequence('user_id_seq'), primary_key=True),
-                      Column('user_name', String(40)),
+                      Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True),
+                      Column('user_name', sa.String(40)),
                       )
 
     @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')

test/engine/transaction.py

 import testenv; testenv.configure_for_tests()
 import sys, time, threading
+from testlib.sa import create_engine, MetaData, Table, Column, INT, VARCHAR, \
+     Sequence, select, Integer, String, func, text
+from testlib import TestBase, testing
 
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from testlib import *
 
-
+users, metadata = None, None
 class TransactionTest(TestBase):
     def setUpAll(self):
         global users, metadata
         users.drop(conn2)
         conn2.close()
 
+foo = None
 class ExplicitAutoCommitTest(TestBase):
     """test the 'autocommit' flag on select() and text() objects.  
     
         
         conn1.close()
         conn2.close()
-        
-    
+
+
+tlengine = None
 class TLTransactionTest(TestBase):
     def setUpAll(self):
         global users, metadata, tlengine
         finally:
             external_connection.close()
 
-    @testing.exclude('mysql', '<', (5, 0, 3))
-    def testsessionnesting(self):
-        class User(object):
-            pass
-        try:
-            mapper(User, users)
-
-            sess = create_session(bind=tlengine)
-            tlengine.begin()
-            u = User()
-            sess.save(u)
-            sess.flush()
-            tlengine.commit()
-        finally:
-            clear_mappers()
 
 
     def testconnections(self):
             [(1,),(2,)]
         )
 
+counters = None
 class ForUpdateTest(TestBase):
     def setUpAll(self):
         global counters, metadata

test/orm/session.py

 import testenv; testenv.configure_for_tests()
+import gc
+import pickle
 from sqlalchemy import *
 from sqlalchemy import exc as sa_exc, util
 from sqlalchemy.orm import *
 from testlib import *
 from testlib.tables import *
 from testlib import fixtures, tables
-import pickle
-import gc
 
 
 class SessionTest(TestBase, AssertsExecutionResults):
         assert len(list(sess)) == 1
 
 
+class TLTransactionTest(TestBase):
+    def setUpAll(self):
+        global users, metadata, tlengine
+        tlengine = create_engine(testing.db.url, strategy='threadlocal')
+        metadata = MetaData()
+        users = Table('query_users', metadata,
+            Column('user_id', INT, Sequence('query_users_id_seq', optional=True), primary_key=True),
+            Column('user_name', VARCHAR(20)),
+            test_needs_acid=True,
+        )
+        users.create(tlengine)
+    def tearDown(self):
+        tlengine.execute(users.delete())
+    def tearDownAll(self):
+        users.drop(tlengine)
+        tlengine.dispose()
+
+    @testing.exclude('mysql', '<', (5, 0, 3))
+    def testsessionnesting(self):
+        class User(object):
+            pass
+        try:
+            mapper(User, users)
+
+            sess = create_session(bind=tlengine)
+            tlengine.begin()
+            u = User()
+            sess.save(u)
+            sess.flush()
+            tlengine.commit()
+        finally:
+            clear_mappers()
+
 
 if __name__ == "__main__":
     testenv.main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.