Source

sqlalchemy / test / sql / test_update.py

Full commit
from test.lib.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql, util
from sqlalchemy.engine import default, base
from test.lib import *
from test.lib.schema import Table, Column
from sqlalchemy.dialects import mysql

class _UpdateFromTestBase(object):
    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata,
              Column('id', Integer, primary_key=True,
                            test_needs_autoincrement=True),
              Column('name', String(30), nullable=False),
        )

        Table('addresses', metadata,
              Column('id', Integer, primary_key=True,
                            test_needs_autoincrement=True),
              Column('user_id', None, ForeignKey('users.id')),
              Column('name', String(30), nullable=False),
              Column('email_address', String(50), nullable=False),
        )

        Table("dingalings", metadata,
              Column('id', Integer, primary_key=True,
                            test_needs_autoincrement=True),
              Column('address_id', None, ForeignKey('addresses.id')),
              Column('data', String(30)),
        )

    @classmethod
    def fixtures(cls):
        return dict(
            users = (
                ('id', 'name'),
                (7, 'jack'),
                (8, 'ed'),
                (9, 'fred'),
                (10, 'chuck')
            ),

            addresses = (
                ('id', 'user_id', 'name', 'email_address'),
                (1, 7, 'x', "jack@bean.com"),
                (2, 8, 'x', "ed@wood.com"),
                (3, 8, 'x', "ed@bettyboop.com"),
                (4, 8, 'x', "ed@lala.com"),
                (5, 9, 'x', "fred@fred.com")
            ),
            dingalings = (
                ('id', 'address_id', 'data'),
                (1, 2, 'ding 1/2'),
                (2, 5, 'ding 2/5')
            ),
        )


class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
    __dialect__ = 'default'

    run_create_tables = run_inserts = run_deletes = None

    def test_render_table(self):
        users, addresses = self.tables.users, self.tables.addresses
        self.assert_compile(
            users.update().\
                values(name='newname').\
                where(users.c.id==addresses.c.user_id).\
                where(addresses.c.email_address=='e1'),
            "UPDATE users SET name=:name FROM addresses "
            "WHERE users.id = addresses.user_id AND "
            "addresses.email_address = :email_address_1",
            checkparams={u'email_address_1': 'e1', 'name': 'newname'}
        )

    def test_render_multi_table(self):
        users, addresses, dingalings = \
                self.tables.users, \
                self.tables.addresses, \
                self.tables.dingalings
        self.assert_compile(
            users.update().\
                values(name='newname').\
                where(users.c.id==addresses.c.user_id).\
                where(addresses.c.email_address=='e1').\
                where(addresses.c.id==dingalings.c.address_id).\
                where(dingalings.c.id==2),
            "UPDATE users SET name=:name FROM addresses, "
            "dingalings WHERE users.id = addresses.user_id "
            "AND addresses.email_address = :email_address_1 "
            "AND addresses.id = dingalings.address_id AND "
            "dingalings.id = :id_1",
            checkparams={u'email_address_1': 'e1', u'id_1': 2,
                                'name': 'newname'}
        )

    def test_render_table_mysql(self):
        users, addresses = self.tables.users, self.tables.addresses
        self.assert_compile(
            users.update().\
                values(name='newname').\
                where(users.c.id==addresses.c.user_id).\
                where(addresses.c.email_address=='e1'),
            "UPDATE users, addresses SET users.name=%s "
            "WHERE users.id = addresses.user_id AND "
            "addresses.email_address = %s",
            checkparams={u'email_address_1': 'e1', 'name': 'newname'},
            dialect=mysql.dialect()
        )

    def test_render_subquery(self):
        users, addresses = self.tables.users, self.tables.addresses
        subq = select([addresses.c.id,
                        addresses.c.user_id,
                        addresses.c.email_address]).\
                            where(addresses.c.id==7).alias()
        self.assert_compile(
            users.update().\
                values(name='newname').\
                where(users.c.id==subq.c.user_id).\
                where(subq.c.email_address=='e1'),
            "UPDATE users SET name=:name FROM "
            "(SELECT addresses.id AS id, addresses.user_id "
            "AS user_id, addresses.email_address AS "
            "email_address FROM addresses WHERE addresses.id = "
            ":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
            "AND anon_1.email_address = :email_address_1",
            checkparams={u'email_address_1': 'e1',
                            u'id_1': 7, 'name': 'newname'}
        )

class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):

    @testing.requires.update_from
    def test_exec_two_table(self):
        users, addresses = self.tables.users, self.tables.addresses
        testing.db.execute(
            addresses.update().\
                values(email_address=users.c.name).\
                where(users.c.id==addresses.c.user_id).\
                where(users.c.name=='ed')
        )
        eq_(
            testing.db.execute(
                addresses.select().\
                    order_by(addresses.c.id)).fetchall(),
            [
                (1, 7, 'x', "jack@bean.com"),
                (2, 8, 'x', "ed"),
                (3, 8, 'x', "ed"),
                (4, 8, 'x', "ed"),
                (5, 9, 'x', "fred@fred.com")
            ]
        )

    @testing.requires.update_from
    def test_exec_two_table_plus_alias(self):
        users, addresses = self.tables.users, self.tables.addresses
        a1 = addresses.alias()

        testing.db.execute(
            addresses.update().\
                values(email_address=users.c.name).\
                where(users.c.id==a1.c.user_id).\
                where(users.c.name=='ed').\
                where(a1.c.id==addresses.c.id)
        )
        eq_(
            testing.db.execute(
                addresses.select().\
                    order_by(addresses.c.id)).fetchall(),
            [
                (1, 7, 'x', "jack@bean.com"),
                (2, 8, 'x', "ed"),
                (3, 8, 'x', "ed"),
                (4, 8, 'x', "ed"),
                (5, 9, 'x', "fred@fred.com")
            ]
        )

    @testing.requires.update_from
    def test_exec_three_table(self):
        users, addresses, dingalings = \
                self.tables.users, \
                self.tables.addresses, \
                self.tables.dingalings
        testing.db.execute(
            addresses.update().\
                values(email_address=users.c.name).\
                where(users.c.id==addresses.c.user_id).\
                where(users.c.name=='ed').
                where(addresses.c.id==dingalings.c.address_id).\
                where(dingalings.c.id==1),
        )
        eq_(
            testing.db.execute(
                    addresses.select().order_by(addresses.c.id)
                ).fetchall(),
            [
                (1, 7, 'x', "jack@bean.com"),
                (2, 8, 'x', "ed"),
                (3, 8, 'x', "ed@bettyboop.com"),
                (4, 8, 'x', "ed@lala.com"),
                (5, 9, 'x', "fred@fred.com")
            ]
        )

    @testing.only_on('mysql', 'Multi table update')
    def test_exec_multitable(self):
        users, addresses = self.tables.users, self.tables.addresses
        testing.db.execute(
            addresses.update().\
                values({
                        addresses.c.email_address:users.c.name,
                        users.c.name:'ed2'
                }).\
                where(users.c.id==addresses.c.user_id).\
                where(users.c.name=='ed')
        )
        eq_(
            testing.db.execute(
                addresses.select().order_by(addresses.c.id)).fetchall(),
            [
                (1, 7, 'x', "jack@bean.com"),
                (2, 8, 'x', "ed"),
                (3, 8, 'x', "ed"),
                (4, 8, 'x', "ed"),
                (5, 9, 'x', "fred@fred.com")
            ]
        )
        eq_(
            testing.db.execute(
                users.select().order_by(users.c.id)).fetchall(),
            [
                (7, 'jack'),
                (8, 'ed2'),
                (9, 'fred'),
                (10, 'chuck')
            ]
        )

class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('users', metadata,
              Column('id', Integer, primary_key=True,
                            test_needs_autoincrement=True),
              Column('name', String(30), nullable=False),
              Column('some_update', String(30), onupdate="im the update")
        )

        Table('addresses', metadata,
              Column('id', Integer, primary_key=True,
                            test_needs_autoincrement=True),
              Column('user_id', None, ForeignKey('users.id')),
              Column('email_address', String(50), nullable=False),
        )

    @classmethod
    def fixtures(cls):
        return dict(
            users = (
                ('id', 'name', 'some_update'),
                (8, 'ed', 'value'),
                (9, 'fred', 'value'),
            ),

            addresses = (
                ('id', 'user_id', 'email_address'),
                (2, 8, "ed@wood.com"),
                (3, 8, "ed@bettyboop.com"),
                (4, 9, "fred@fred.com")
            ),
        )

    @testing.only_on('mysql', 'Multi table update')
    def test_defaults_second_table(self):
        users, addresses = self.tables.users, self.tables.addresses
        ret = testing.db.execute(
            addresses.update().\
                values({
                        addresses.c.email_address:users.c.name,
                        users.c.name:'ed2'
                }).\
                where(users.c.id==addresses.c.user_id).\
                where(users.c.name=='ed')
        )
        eq_(
            set(ret.prefetch_cols()),
            set([users.c.some_update])
        )
        eq_(
            testing.db.execute(
                addresses.select().order_by(addresses.c.id)).fetchall(),
            [
                (2, 8, "ed"),
                (3, 8, "ed"),
                (4, 9, "fred@fred.com")
            ]
        )
        eq_(
            testing.db.execute(
                users.select().order_by(users.c.id)).fetchall(),
            [
                (8, 'ed2', 'im the update'),
                (9, 'fred', 'value'),
            ]
        )

    @testing.only_on('mysql', 'Multi table update')
    def test_no_defaults_second_table(self):
        users, addresses = self.tables.users, self.tables.addresses
        ret = testing.db.execute(
            addresses.update().\
                values({
                        'email_address':users.c.name,
                }).\
                where(users.c.id==addresses.c.user_id).\
                where(users.c.name=='ed')
        )
        eq_(
            ret.prefetch_cols(),[]
        )
        eq_(
            testing.db.execute(
                addresses.select().order_by(addresses.c.id)).fetchall(),
            [
                (2, 8, "ed"),
                (3, 8, "ed"),
                (4, 9, "fred@fred.com")
            ]
        )
        # users table not actually updated,
        # so no onupdate
        eq_(
            testing.db.execute(
                users.select().order_by(users.c.id)).fetchall(),
            [
                (8, 'ed', 'value'),
                (9, 'fred', 'value'),
            ]
        )