add these merge tests

Issue #1247 resolved
Mike Bayer repo owner created an issue

im suspecting that the fixes in #1202 are allowing these tests to pass in rc3 but not in rc2. Also, ants change in r5378 are related, since i think the whole issue has to do with flush triggering within a merge ....but we should verify.

from datetime import datetime, timedelta
from sqlalchemy import create_engine, MetaData, Table, Column, ForeignKey
from sqlalchemy.types import Integer, DateTime, Text
from sqlalchemy.orm import mapper, relation
from sqlalchemy.orm import scoped_session, sessionmaker
import unittest

def assert_identical (obj1, obj2):
    assert obj1 is obj2, "%r is not %r" % (obj1, obj2)

def assert_equal(x, y):
    assert x== y

metadata = MetaData()

bars = Table("bars", metadata,
    Column("id", Integer, primary_key=True))

foos = Table("foos", metadata,
    Column("id", Integer, primary_key=True),
    Column("bar_id", None, ForeignKey("bars.id"), nullable=False))

class Bar (object):
    def __init__ (self, id_):
        self.id = id_

class Foo (object):
    def __init__ (self, bar):
        self.id = None
        self.bar = bar

mapper(Bar, bars, properties={
    'id':bars.c.id})

mapper(Foo, foos, properties={
    'id':foos.c.id,
    'bar':relation(Bar, backref="foos")})

Session = sessionmaker()

class TestMerge (unittest.TestCase):

    def setUp (self):
        metadata.bind = create_engine("sqlite:///:memory:", echo=True)
        metadata.create_all()

    def tearDown (self):
        metadata.drop_all()
        metadata.bind = None

    def test_normal_merge (self):
        s = Session()
        assert_identical(s.merge(Bar(1)), s.merge(Bar(1)))

    def test_merge_flush (self):
        s = Session()
        bar = s.merge(Bar(1))
        before_id = id(bar)
        s.flush()
        after_id = id(bar)
        assert_equal(before_id, after_id)

    def test_merge_related_flush (self):
        s = Session()
        bar = s.merge(Bar(1))
        before_id = id(bar)
        foo = Foo(bar)
        s.flush()
        after_id = id(bar)
        related_id = id(foo.bar)
        assert_equal(before_id, after_id)
        assert_equal(before_id, related_id)

    def test_double_merge_flush (self):
        s = Session()
        bar1 = s.merge(Bar(1))
        bar2 = s.merge(Bar(1))
        s.flush()
        assert_identical(bar1, bar2)

    def test_foo(self):
        s = Session()
        bar1m1 = s.merge(Bar(1))
        foo1 = Foo(bar=bar1m1)
        bar1m2 = s.merge(Bar(1))
        s.flush()
        s.expire_all()
        assert foo1.bar is bar1m1
        assert foo1.bar is not None

    def test_merge_bug (self):
        s = Session()
        foo1 = Foo(s.merge(Bar(1)))
        before_id = id(foo1.bar)
        import pdb
        pdb.set_trace()
        foo2 = Foo(s.merge(Bar(1)))
        after_id = id(foo1.bar)
        other_id = id(foo2.bar)
        assert_equal(before_id, other_id)
        assert_equal(after_id, other_id)
        assert_equal(before_id, after_id)
        assert_identical(foo1.bar, foo2.bar)

if __name__ == '__main__':
    unittest.main()

Comments (2)

  1. Log in to comment