Commits

diana clarke committed 192bddc

starting on the update tests next, pep8 pass first (see #2630)

Comments (0)

Files changed (2)

test/sql/test_labels.py

-
-from sqlalchemy import exc as exceptions
-from sqlalchemy import testing
-from sqlalchemy.testing import engines
-from sqlalchemy import select, MetaData, Integer, or_
+from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_
 from sqlalchemy.engine import default
 from sqlalchemy.sql import table, column
-from sqlalchemy.testing import assert_raises, eq_
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\
+    fixtures
 from sqlalchemy.testing.schema import Table, Column
 
 IDENT_LENGTH = 29
 
 
 class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = 'DefaultDialect'
 
     table1 = table('some_large_named_table',
             column('this_is_the_primarykey_column'),
             column('this_is_the_data_column')
         )
 
-    __dialect__ = 'DefaultDialect'
-
-
     def _length_fixture(self, length=IDENT_LENGTH, positional=False):
         dialect = default.DefaultDialect()
         dialect.max_identifier_length = length
         ta = table2.alias()
         on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column
         self.assert_compile(
-            select([table1, ta]).select_from(table1.join(ta, on)).\
+            select([table1, ta]).select_from(table1.join(ta, on)).
                 where(ta.c.this_is_the_data_column == 'data3'),
             'SELECT '
                 'some_large_named_table.this_is_the_primarykey_column, '
         t = Table('this_name_is_too_long_for_what_were_doing_in_this_test',
                         m, Column('foo', Integer))
         eng = self._engine_fixture()
-        for meth in (
-                    t.create,
-                    t.drop,
-                    m.create_all,
-                    m.drop_all
-                ):
-            assert_raises(
-                    exceptions.IdentifierError,
-                    meth, eng
-                )
+        methods = (t.create, t.drop, m.create_all, m.drop_all)
+        for meth in methods:
+            assert_raises(exceptions.IdentifierError, meth, eng)
 
     def _assert_labeled_table1_select(self, s):
         table1 = self.table1
             dialect=self._length_fixture(positional=True)
         )
 
+
 class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = 'DefaultDialect'
 
     table1 = table('some_large_named_table',
             column('this_is_the_primarykey_column'),
             column('this_is_the_data_column')
         )
 
-    __dialect__ = 'DefaultDialect'
-
     def test_adjustable_1(self):
         table1 = self.table1
         q = table1.select(
             'AS _1',
             dialect=compile_dialect)
 
-
     def test_adjustable_result_schema_column_1(self):
         table1 = self.table1
+
         q = table1.select(
             table1.c.this_is_the_primarykey_column == 4).apply_labels().\
                 alias('foo')
-        dialect = default.DefaultDialect(label_length=10)
 
+        dialect = default.DefaultDialect(label_length=10)
         compiled = q.compile(dialect=dialect)
+
         assert set(compiled.result_map['some_2'][1]).issuperset([
-                    table1.c.this_is_the_data_column,
-                    'some_large_named_table_this_is_the_data_column',
-                    'some_2'
+            table1.c.this_is_the_data_column,
+            'some_large_named_table_this_is_the_data_column',
+            'some_2'
+        ])
 
-                ])
         assert set(compiled.result_map['some_1'][1]).issuperset([
-                    table1.c.this_is_the_primarykey_column,
-                    'some_large_named_table_this_is_the_primarykey_column',
-                    'some_1'
-
-                ])
+            table1.c.this_is_the_primarykey_column,
+            'some_large_named_table_this_is_the_primarykey_column',
+            'some_1'
+        ])
 
     def test_adjustable_result_schema_column_2(self):
         table1 = self.table1
         x = select([q])
 
         dialect = default.DefaultDialect(label_length=10)
-
         compiled = x.compile(dialect=dialect)
+
         assert set(compiled.result_map['this_2'][1]).issuperset([
-                    q.corresponding_column(table1.c.this_is_the_data_column),
-                    'this_is_the_data_column',
-                    'this_2'
+            q.corresponding_column(table1.c.this_is_the_data_column),
+            'this_is_the_data_column',
+            'this_2'])
 
-                ])
         assert set(compiled.result_map['this_1'][1]).issuperset([
-                    q.corresponding_column(table1.c.this_is_the_primarykey_column),
-                    'this_is_the_primarykey_column',
-                    'this_1'
-
-                ])
+            q.corresponding_column(table1.c.this_is_the_primarykey_column),
+            'this_is_the_primarykey_column',
+            'this_1'])
 
     def test_table_plus_column_exceeds_length(self):
         """test that the truncation only occurs when tablename + colname are
                 'other_thirty_characters_table_.thirty_characters_table_id',
             dialect=compile_dialect)
 
-
     def test_colnames_longer_than_labels_lowercase(self):
         t1 = table('a', column('abcde'))
         self._test_colnames_longer_than_labels(t1)
         # 'abcde' is longer than 4, but rendered as itself
         # needs to have all characters
         s = select([a1])
-        self.assert_compile(
-            select([a1]),
-            "SELECT asdf.abcde FROM a AS asdf",
-            dialect=dialect
-        )
+        self.assert_compile(select([a1]),
+            'SELECT asdf.abcde FROM a AS asdf',
+            dialect=dialect)
         compiled = s.compile(dialect=dialect)
         assert set(compiled.result_map['abcde'][1]).issuperset([
-                    'abcde',
-                    a1.c.abcde,
-                    'abcde'
-                ])
+            'abcde', a1.c.abcde, 'abcde'])
 
         # column still there, but short label
         s = select([a1]).apply_labels()
-        self.assert_compile(
-            s,
-            "SELECT asdf.abcde AS _1 FROM a AS asdf",
-            dialect=dialect
-        )
+        self.assert_compile(s,
+            'SELECT asdf.abcde AS _1 FROM a AS asdf',
+            dialect=dialect)
         compiled = s.compile(dialect=dialect)
         assert set(compiled.result_map['_1'][1]).issuperset([
-                    'asdf_abcde',
-                    a1.c.abcde,
-                    '_1'
-                ])
-
-
+            'asdf_abcde', a1.c.abcde, '_1'])

test/sql/test_update.py

-from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
-import datetime
 from sqlalchemy import *
-from sqlalchemy import exc, sql, util
-from sqlalchemy.engine import default, base
 from sqlalchemy import testing
-from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.schema import Table, Column
 from sqlalchemy.dialects import mysql
+from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
+from sqlalchemy.testing.schema import Table, Column
+
 
 class _UpdateFromTestBase(object):
     @classmethod
     def define_tables(cls, metadata):
         Table('users', metadata,
               Column('id', Integer, primary_key=True,
-                            test_needs_autoincrement=True),
-              Column('name', String(30), nullable=False),
-        )
+                     test_needs_autoincrement=True),
+              Column('name', String(30), nullable=False))
 
         Table('addresses', metadata,
               Column('id', Integer, primary_key=True,
-                            test_needs_autoincrement=True),
+                     test_needs_autoincrement=True),
               Column('user_id', None, ForeignKey('users.id')),
               Column('name', String(30), nullable=False),
-              Column('email_address', String(50), nullable=False),
-        )
+              Column('email_address', String(50), nullable=False))
 
-        Table("dingalings", metadata,
+        Table('dingalings', metadata,
               Column('id', Integer, primary_key=True,
-                            test_needs_autoincrement=True),
+                     test_needs_autoincrement=True),
               Column('address_id', None, ForeignKey('addresses.id')),
-              Column('data', String(30)),
-        )
+              Column('data', String(30)))
 
     @classmethod
     def fixtures(cls):
         return dict(
-            users = (
+            users=(
                 ('id', 'name'),
                 (7, 'jack'),
                 (8, 'ed'),
                 (9, 'fred'),
                 (10, 'chuck')
             ),
-
             addresses = (
                 ('id', 'user_id', 'name', 'email_address'),
-                (1, 7, 'x', "jack@bean.com"),
-                (2, 8, 'x', "ed@wood.com"),
-                (3, 8, 'x', "ed@bettyboop.com"),
-                (4, 8, 'x', "ed@lala.com"),
-                (5, 9, 'x', "fred@fred.com")
+                (1, 7, 'x', 'jack@bean.com'),
+                (2, 8, 'x', 'ed@wood.com'),
+                (3, 8, 'x', 'ed@bettyboop.com'),
+                (4, 8, 'x', 'ed@lala.com'),
+                (5, 9, 'x', 'fred@fred.com')
             ),
             dingalings = (
                 ('id', 'address_id', 'data'),
         )
 
 
-class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
+class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
+                            AssertsCompiledSQL):
     __dialect__ = 'default'
 
     run_create_tables = run_inserts = run_deletes = None
 
     def test_render_table(self):
         users, addresses = self.tables.users, self.tables.addresses
+
         self.assert_compile(
-            users.update().\
-                values(name='newname').\
-                where(users.c.id==addresses.c.user_id).\
-                where(addresses.c.email_address=='e1'),
-            "UPDATE users SET name=:name FROM addresses "
-            "WHERE users.id = addresses.user_id AND "
-            "addresses.email_address = :email_address_1",
-            checkparams={u'email_address_1': 'e1', 'name': 'newname'}
-        )
+            users.update().
+                values(name='newname').
+                where(users.c.id == addresses.c.user_id).
+                where(addresses.c.email_address == 'e1'),
+            'UPDATE users '
+            'SET name=:name FROM addresses '
+            'WHERE '
+                'users.id = addresses.user_id AND '
+                'addresses.email_address = :email_address_1',
+            checkparams={u'email_address_1': 'e1', 'name': 'newname'})
 
     def test_render_multi_table(self):
-        users, addresses, dingalings = \
-                self.tables.users, \
-                self.tables.addresses, \
-                self.tables.dingalings
+        users = self.tables.users
+        addresses = self.tables.addresses
+        dingalings = self.tables.dingalings
+
+        checkparams = {
+            u'email_address_1': 'e1',
+            u'id_1': 2,
+            'name': 'newname'
+        }
+
         self.assert_compile(
-            users.update().\
-                values(name='newname').\
-                where(users.c.id==addresses.c.user_id).\
-                where(addresses.c.email_address=='e1').\
-                where(addresses.c.id==dingalings.c.address_id).\
-                where(dingalings.c.id==2),
-            "UPDATE users SET name=:name FROM addresses, "
-            "dingalings WHERE users.id = addresses.user_id "
-            "AND addresses.email_address = :email_address_1 "
-            "AND addresses.id = dingalings.address_id AND "
-            "dingalings.id = :id_1",
-            checkparams={u'email_address_1': 'e1', u'id_1': 2,
-                                'name': 'newname'}
-        )
+            users.update().
+                values(name='newname').
+                where(users.c.id == addresses.c.user_id).
+                where(addresses.c.email_address == 'e1').
+                where(addresses.c.id == dingalings.c.address_id).
+                where(dingalings.c.id == 2),
+            'UPDATE users '
+            'SET name=:name '
+            'FROM addresses, dingalings '
+            'WHERE '
+                'users.id = addresses.user_id AND '
+                'addresses.email_address = :email_address_1 AND '
+                'addresses.id = dingalings.address_id AND '
+                'dingalings.id = :id_1',
+            checkparams=checkparams)
 
     def test_render_table_mysql(self):
         users, addresses = self.tables.users, self.tables.addresses
+
         self.assert_compile(
-            users.update().\
-                values(name='newname').\
-                where(users.c.id==addresses.c.user_id).\
-                where(addresses.c.email_address=='e1'),
-            "UPDATE users, addresses SET users.name=%s "
-            "WHERE users.id = addresses.user_id AND "
-            "addresses.email_address = %s",
+            users.update().
+                values(name='newname').
+                where(users.c.id == addresses.c.user_id).
+                where(addresses.c.email_address == 'e1'),
+            'UPDATE users, addresses '
+            'SET users.name=%s '
+            'WHERE '
+                'users.id = addresses.user_id AND '
+                'addresses.email_address = %s',
             checkparams={u'email_address_1': 'e1', 'name': 'newname'},
-            dialect=mysql.dialect()
-        )
+            dialect=mysql.dialect())
 
     def test_render_subquery(self):
         users, addresses = self.tables.users, self.tables.addresses
-        subq = select([addresses.c.id,
-                        addresses.c.user_id,
-                        addresses.c.email_address]).\
-                            where(addresses.c.id==7).alias()
+
+        checkparams = {
+            u'email_address_1': 'e1',
+            u'id_1': 7,
+            'name': 'newname'
+        }
+
+        cols = [
+            addresses.c.id,
+            addresses.c.user_id,
+            addresses.c.email_address
+        ]
+
+        subq = select(cols).where(addresses.c.id == 7).alias()
         self.assert_compile(
-            users.update().\
-                values(name='newname').\
-                where(users.c.id==subq.c.user_id).\
-                where(subq.c.email_address=='e1'),
-            "UPDATE users SET name=:name FROM "
-            "(SELECT addresses.id AS id, addresses.user_id "
-            "AS user_id, addresses.email_address AS "
-            "email_address FROM addresses WHERE addresses.id = "
-            ":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
-            "AND anon_1.email_address = :email_address_1",
-            checkparams={u'email_address_1': 'e1',
-                            u'id_1': 7, 'name': 'newname'}
-        )
+            users.update().
+                values(name='newname').
+                where(users.c.id == subq.c.user_id).
+                where(subq.c.email_address == 'e1'),
+            'UPDATE users '
+            'SET name=:name FROM ('
+                'SELECT '
+                    'addresses.id AS id, '
+                    'addresses.user_id AS user_id, '
+                    'addresses.email_address AS email_address '
+                'FROM addresses '
+                'WHERE addresses.id = :id_1'
+            ') AS anon_1 '
+            'WHERE users.id = anon_1.user_id '
+            'AND anon_1.email_address = :email_address_1',
+            checkparams=checkparams)
+
 
 class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
 
     @testing.requires.update_from
     def test_exec_two_table(self):
         users, addresses = self.tables.users, self.tables.addresses
+
         testing.db.execute(
-            addresses.update().\
-                values(email_address=users.c.name).\
-                where(users.c.id==addresses.c.user_id).\
-                where(users.c.name=='ed')
-        )
-        eq_(
-            testing.db.execute(
-                addresses.select().\
-                    order_by(addresses.c.id)).fetchall(),
-            [
-                (1, 7, 'x', "jack@bean.com"),
-                (2, 8, 'x', "ed"),
-                (3, 8, 'x', "ed"),
-                (4, 8, 'x', "ed"),
-                (5, 9, 'x', "fred@fred.com")
-            ]
-        )
+            addresses.update().
+                values(email_address=users.c.name).
+                where(users.c.id == addresses.c.user_id).
+                where(users.c.name == 'ed'))
+
+        expected = [
+            (1, 7, 'x', 'jack@bean.com'),
+            (2, 8, 'x', 'ed'),
+            (3, 8, 'x', 'ed'),
+            (4, 8, 'x', 'ed'),
+            (5, 9, 'x', 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
 
     @testing.requires.update_from
     def test_exec_two_table_plus_alias(self):
         users, addresses = self.tables.users, self.tables.addresses
-        a1 = addresses.alias()
 
+        a1 = addresses.alias()
         testing.db.execute(
-            addresses.update().\
-                values(email_address=users.c.name).\
-                where(users.c.id==a1.c.user_id).\
-                where(users.c.name=='ed').\
-                where(a1.c.id==addresses.c.id)
-        )
-        eq_(
-            testing.db.execute(
-                addresses.select().\
-                    order_by(addresses.c.id)).fetchall(),
-            [
-                (1, 7, 'x', "jack@bean.com"),
-                (2, 8, 'x', "ed"),
-                (3, 8, 'x', "ed"),
-                (4, 8, 'x', "ed"),
-                (5, 9, 'x', "fred@fred.com")
-            ]
+            addresses.update().
+                values(email_address=users.c.name).
+                where(users.c.id == a1.c.user_id).
+                where(users.c.name == 'ed').
+                where(a1.c.id == addresses.c.id)
         )
 
+        expected = [
+            (1, 7, 'x', 'jack@bean.com'),
+            (2, 8, 'x', 'ed'),
+            (3, 8, 'x', 'ed'),
+            (4, 8, 'x', 'ed'),
+            (5, 9, 'x', 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
+
     @testing.requires.update_from
     def test_exec_three_table(self):
-        users, addresses, dingalings = \
-                self.tables.users, \
-                self.tables.addresses, \
-                self.tables.dingalings
+        users = self.tables.users
+        addresses = self.tables.addresses
+        dingalings = self.tables.dingalings
+
         testing.db.execute(
-            addresses.update().\
-                values(email_address=users.c.name).\
-                where(users.c.id==addresses.c.user_id).\
-                where(users.c.name=='ed').
-                where(addresses.c.id==dingalings.c.address_id).\
-                where(dingalings.c.id==1),
-        )
-        eq_(
-            testing.db.execute(
-                    addresses.select().order_by(addresses.c.id)
-                ).fetchall(),
-            [
-                (1, 7, 'x', "jack@bean.com"),
-                (2, 8, 'x', "ed"),
-                (3, 8, 'x', "ed@bettyboop.com"),
-                (4, 8, 'x', "ed@lala.com"),
-                (5, 9, 'x', "fred@fred.com")
-            ]
-        )
+            addresses.update().
+                values(email_address=users.c.name).
+                where(users.c.id == addresses.c.user_id).
+                where(users.c.name == 'ed').
+                where(addresses.c.id == dingalings.c.address_id).
+                where(dingalings.c.id == 1))
+
+        expected = [
+            (1, 7, 'x', 'jack@bean.com'),
+            (2, 8, 'x', 'ed'),
+            (3, 8, 'x', 'ed@bettyboop.com'),
+            (4, 8, 'x', 'ed@lala.com'),
+            (5, 9, 'x', 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
 
     @testing.only_on('mysql', 'Multi table update')
     def test_exec_multitable(self):
         users, addresses = self.tables.users, self.tables.addresses
+
+        values = {
+            addresses.c.email_address: users.c.name,
+            users.c.name: 'ed2'
+        }
+
         testing.db.execute(
-            addresses.update().\
-                values({
-                        addresses.c.email_address:users.c.name,
-                        users.c.name:'ed2'
-                }).\
-                where(users.c.id==addresses.c.user_id).\
-                where(users.c.name=='ed')
-        )
-        eq_(
-            testing.db.execute(
-                addresses.select().order_by(addresses.c.id)).fetchall(),
-            [
-                (1, 7, 'x', "jack@bean.com"),
-                (2, 8, 'x', "ed"),
-                (3, 8, 'x', "ed"),
-                (4, 8, 'x', "ed"),
-                (5, 9, 'x', "fred@fred.com")
-            ]
-        )
-        eq_(
-            testing.db.execute(
-                users.select().order_by(users.c.id)).fetchall(),
-            [
-                (7, 'jack'),
-                (8, 'ed2'),
-                (9, 'fred'),
-                (10, 'chuck')
-            ]
-        )
+            addresses.update().
+                values(values).
+                where(users.c.id == addresses.c.user_id).
+                where(users.c.name == 'ed'))
+
+        expected = [
+            (1, 7, 'x', 'jack@bean.com'),
+            (2, 8, 'x', 'ed'),
+            (3, 8, 'x', 'ed'),
+            (4, 8, 'x', 'ed'),
+            (5, 9, 'x', 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
+
+        expected = [
+            (7, 'jack'),
+            (8, 'ed2'),
+            (9, 'fred'),
+            (10, 'chuck')]
+        self._assert_users(users, expected)
+
+    def _assert_addresses(self, addresses, expected):
+        stmt = addresses.select().order_by(addresses.c.id)
+        eq_(testing.db.execute(stmt).fetchall(), expected)
+
+    def _assert_users(self, users, expected):
+        stmt = users.select().order_by(users.c.id)
+        eq_(testing.db.execute(stmt).fetchall(), expected)
+
 
-class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
+class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
+                                             fixtures.TablesTest):
     @classmethod
     def define_tables(cls, metadata):
         Table('users', metadata,
               Column('id', Integer, primary_key=True,
-                            test_needs_autoincrement=True),
+                     test_needs_autoincrement=True),
               Column('name', String(30), nullable=False),
-              Column('some_update', String(30), onupdate="im the update")
-        )
+              Column('some_update', String(30), onupdate='im the update'))
 
         Table('addresses', metadata,
               Column('id', Integer, primary_key=True,
-                            test_needs_autoincrement=True),
+                     test_needs_autoincrement=True),
               Column('user_id', None, ForeignKey('users.id')),
-              Column('email_address', String(50), nullable=False),
-        )
+              Column('email_address', String(50), nullable=False))
 
     @classmethod
     def fixtures(cls):
         return dict(
-            users = (
+            users=(
                 ('id', 'name', 'some_update'),
                 (8, 'ed', 'value'),
                 (9, 'fred', 'value'),
             ),
-
-            addresses = (
+            addresses=(
                 ('id', 'user_id', 'email_address'),
-                (2, 8, "ed@wood.com"),
-                (3, 8, "ed@bettyboop.com"),
-                (4, 9, "fred@fred.com")
+                (2, 8, 'ed@wood.com'),
+                (3, 8, 'ed@bettyboop.com'),
+                (4, 9, 'fred@fred.com')
             ),
         )
 
     @testing.only_on('mysql', 'Multi table update')
     def test_defaults_second_table(self):
         users, addresses = self.tables.users, self.tables.addresses
+
+        values = {
+            addresses.c.email_address: users.c.name,
+            users.c.name: 'ed2'
+        }
+
         ret = testing.db.execute(
-            addresses.update().\
-                values({
-                        addresses.c.email_address:users.c.name,
-                        users.c.name:'ed2'
-                }).\
-                where(users.c.id==addresses.c.user_id).\
-                where(users.c.name=='ed')
-        )
-        eq_(
-            set(ret.prefetch_cols()),
-            set([users.c.some_update])
-        )
-        eq_(
-            testing.db.execute(
-                addresses.select().order_by(addresses.c.id)).fetchall(),
-            [
-                (2, 8, "ed"),
-                (3, 8, "ed"),
-                (4, 9, "fred@fred.com")
-            ]
-        )
-        eq_(
-            testing.db.execute(
-                users.select().order_by(users.c.id)).fetchall(),
-            [
-                (8, 'ed2', 'im the update'),
-                (9, 'fred', 'value'),
-            ]
-        )
+            addresses.update().
+                values(values).
+                where(users.c.id == addresses.c.user_id).
+                where(users.c.name == 'ed'))
+
+        eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
+
+        expected = [
+            (2, 8, 'ed'),
+            (3, 8, 'ed'),
+            (4, 9, 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
+
+        expected = [
+            (8, 'ed2', 'im the update'),
+            (9, 'fred', 'value')]
+        self._assert_users(users, expected)
 
     @testing.only_on('mysql', 'Multi table update')
     def test_no_defaults_second_table(self):
         users, addresses = self.tables.users, self.tables.addresses
+
         ret = testing.db.execute(
-            addresses.update().\
-                values({
-                        'email_address':users.c.name,
-                }).\
-                where(users.c.id==addresses.c.user_id).\
-                where(users.c.name=='ed')
-        )
-        eq_(
-            ret.prefetch_cols(),[]
-        )
-        eq_(
-            testing.db.execute(
-                addresses.select().order_by(addresses.c.id)).fetchall(),
-            [
-                (2, 8, "ed"),
-                (3, 8, "ed"),
-                (4, 9, "fred@fred.com")
-            ]
-        )
-        # users table not actually updated,
-        # so no onupdate
-        eq_(
-            testing.db.execute(
-                users.select().order_by(users.c.id)).fetchall(),
-            [
-                (8, 'ed', 'value'),
-                (9, 'fred', 'value'),
-            ]
-        )
+            addresses.update().
+                values({'email_address': users.c.name}).
+                where(users.c.id == addresses.c.user_id).
+                where(users.c.name == 'ed'))
+
+        eq_(ret.prefetch_cols(), [])
+
+        expected = [
+            (2, 8, 'ed'),
+            (3, 8, 'ed'),
+            (4, 9, 'fred@fred.com')]
+        self._assert_addresses(addresses, expected)
+
+        # users table not actually updated, so no onupdate
+        expected = [
+            (8, 'ed', 'value'),
+            (9, 'fred', 'value')]
+        self._assert_users(users, expected)
+
+    def _assert_addresses(self, addresses, expected):
+        stmt = addresses.select().order_by(addresses.c.id)
+        eq_(testing.db.execute(stmt).fetchall(), expected)
+
+    def _assert_users(self, users, expected):
+        stmt = users.select().order_by(users.c.id)
+        eq_(testing.db.execute(stmt).fetchall(), expected)