Hajime Nakagami avatar Hajime Nakagami committed 6fe5b98 Merge

merge from default

Comments (0)

Files changed (7)

doc/build/changelog/changelog_08.rst

     :version: 0.8.0
 
     .. change::
+        :tags: feature, orm
+        :tickets: 2658
+
+      Added new helper function :func:`.was_deleted`, returns True
+      if the given object was the subject of a :meth:`.Session.delete`
+      operation.
+
+    .. change::
+        :tags: bug, orm
+        :tickets: 2658
+
+      An object that's deleted from a session will be de-associated with
+      that session fully after the transaction is committed, that is
+      the :func:`.object_session` function will return None.
+
+    .. change::
         :tags: bug, oracle
 
       The cx_oracle dialect will no longer run the bind parameter names

doc/build/orm/session.rst

 expressed for collections which are already loaded. See the API docs for
 :meth:`~sqlalchemy.orm.query.Query.delete` for more details.
 
+.. _session_flushing:
+
 Flushing
 --------
 
 
 .. autofunction:: object_session
 
+.. autofunction:: was_deleted
+
 Attribute and State Management Utilities
 -----------------------------------------
 

doc/build/orm/tutorial.rst

 deleted. SQLAlchemy doesn't assume that deletes cascade, you have to tell it
 to do so.
 
+.. _tutorial_delete_cascade:
+
 Configuring delete/delete-orphan Cascade
 ----------------------------------------
 

lib/sqlalchemy/orm/__init__.py

      object_mapper,
      outerjoin,
      polymorphic_union,
+     was_deleted,
      with_parent,
      with_polymorphic,
      )
     'undefer',
     'undefer_group',
     'validates',
+    'was_deleted',
     'with_polymorphic'
     )
 

lib/sqlalchemy/orm/session.py

         if not self.nested and self.session.expire_on_commit:
             for s in self.session.identity_map.all_states():
                 s._expire(s.dict, self.session.identity_map._modified)
+            for s in self._deleted:
+                s.session_id = None
+            self._deleted.clear()
+
 
     def _connection_for_bind(self, bind):
         self._assert_is_active()

lib/sqlalchemy/orm/util.py

 
 
 def has_identity(object):
+    """Return True if the given object has a database
+    identity.
+
+    This typically corresponds to the object being
+    in either the persistent or detached state.
+
+    .. seealso::
+
+        :func:`.was_deleted`
+
+    """
     state = attributes.instance_state(object)
     return state.has_identity
 
+def was_deleted(object):
+    """Return True if the given object was deleted
+    within a session flush.
+
+    .. versionadded:: 0.8.0
+
+    """
+
+    state = attributes.instance_state(object)
+    return state.deleted
 
 def instance_str(instance):
     """Return a string describing an instance."""

test/orm/test_session.py

 from sqlalchemy.testing import eq_, assert_raises, \
-    assert_raises_message, assert_warnings
+    assert_raises_message
 from sqlalchemy.testing.util import gc_collect
 from sqlalchemy.testing import pickleable
 from sqlalchemy.util import pickle
 from sqlalchemy import Integer, String, Sequence
 from sqlalchemy.testing.schema import Table, Column
 from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
-    exc as orm_exc, object_session
+    exc as orm_exc, object_session, was_deleted
 from sqlalchemy.util import pypy
 from sqlalchemy.testing import fixtures
 from test.orm import _fixtures
 
         # ensure tables are unbound
         m2 = sa.MetaData()
-        users_unbound =users.tometadata(m2)
+        users_unbound = users.tometadata(m2)
         addresses_unbound = addresses.tometadata(m2)
 
         mapper(Address, addresses_unbound)
         mapper(User, users_unbound, properties={
-            'addresses':relationship(Address,
+            'addresses': relationship(Address,
                                  backref=backref("user", cascade="all"),
                                  cascade="all")})
 
-        Session = sessionmaker(binds={User: self.metadata.bind,
+        sess = Session(binds={User: self.metadata.bind,
                                       Address: self.metadata.bind})
-        sess = Session()
 
         u1 = User(id=1, name='ed')
         sess.add(u1)
-        eq_(sess.query(User).filter(User.id==1).all(),
+        eq_(sess.query(User).filter(User.id == 1).all(),
             [User(id=1, name='ed')])
 
         # test expression binding
 
         # ensure tables are unbound
         m2 = sa.MetaData()
-        users_unbound =users.tometadata(m2)
+        users_unbound = users.tometadata(m2)
         addresses_unbound = addresses.tometadata(m2)
 
         mapper(Address, addresses_unbound)
         mapper(User, users_unbound, properties={
-            'addresses':relationship(Address,
+            'addresses': relationship(Address,
                                  backref=backref("user", cascade="all"),
                                  cascade="all")})
 
 
         u1 = User(id=1, name='ed')
         sess.add(u1)
-        eq_(sess.query(User).filter(User.id==1).all(),
+        eq_(sess.query(User).filter(User.id == 1).all(),
             [User(id=1, name='ed')])
 
         sess.execute(users_unbound.insert(), params=dict(id=2, name='jack'))
 
         # use :bindparam style
         eq_(sess.execute("select * from users where id=:id",
-                         {'id':7}).fetchall(),
+                         {'id': 7}).fetchall(),
             [(7, u'jack')])
 
 
         # use :bindparam style
         eq_(sess.scalar("select id from users where id=:id",
-                         {'id':7}),
+                         {'id': 7}),
             7)
 
     def test_parameter_execute(self):
                                 self.classes.User)
 
         mapper(User, users, properties={
-            'addresses':relationship(Address, backref="user")})
+            'addresses': relationship(Address, backref="user")})
         mapper(Address, addresses)
 
         sess = create_session(autoflush=True, autocommit=False)
         u = User(name='ed', addresses=[Address(email_address='foo')])
         sess.add(u)
-        eq_(sess.query(Address).filter(Address.user==u).one(),
+        eq_(sess.query(Address).filter(Address.user == u).one(),
             Address(email_address='foo'))
 
         # still works after "u" is garbage collected
         sess.commit()
         sess.close()
         u = sess.query(User).get(u.id)
-        q = sess.query(Address).filter(Address.user==u)
+        q = sess.query(Address).filter(Address.user == u)
         del u
         gc_collect()
         eq_(q.one(), Address(email_address='foo'))
 
         mapper(User, users)
         conn1 = testing.db.connect()
-        conn2 = testing.db.connect()
         sess = create_session(bind=conn1, autocommit=False,
                               autoflush=True)
         u = User()
 
         s = create_session()
         mapper(User, users, properties={
-            'addresses':relationship(Address, cascade="all, delete")
+            'addresses': relationship(Address, cascade="all, delete")
         })
         mapper(Address, addresses)
 
 
         mapper(Address, addresses)
         mapper(User, users, properties={
-            'addresses':relationship(Address)})
+            'addresses': relationship(Address)})
 
         sess = create_session(autocommit=False, autoflush=True)
         u = sess.query(User).get(8)
 
         mapper(Address, addresses)
         mapper(User, users, properties={
-            'addresses':relationship(Address,
+            'addresses': relationship(Address,
                                  backref=backref("user", cascade="all"),
                                  cascade="all")})
 
             assert sa.orm.object_session(a) is None
             assert sa.orm.attributes.instance_state(a).session_id is None
 
+    def test_deleted_expunged(self):
+        users, User = self.tables.users, self.classes.User
+
+        mapper(User, users)
+        sess = Session()
+
+        u1 = sess.query(User).first()
+        sess.delete(u1)
+
+        assert not was_deleted(u1)
+        sess.flush()
+
+        assert was_deleted(u1)
+        assert u1 not in sess
+        assert object_session(u1) is sess
+        sess.commit()
+
+        assert object_session(u1) is None
 
 
 class WeakIdentityMapTest(_fixtures.FixtureTest):
 
         s = sessionmaker()()
         mapper(User, users, properties={
-            "addresses":relationship(Address, backref="user")
+            "addresses": relationship(Address, backref="user")
         })
         mapper(Address, addresses)
         s.add(User(name="ed", addresses=[Address(email_address="ed1")]))
         s.commit()
 
         user = s.query(User).options(joinedload(User.addresses)).one()
-        user.addresses[0].user # lazyload
+        user.addresses[0].user  # lazyload
         eq_(user, User(name="ed", addresses=[Address(email_address="ed1")]))
 
         del user
         assert len(s.identity_map) == 0
 
         user = s.query(User).options(joinedload(User.addresses)).one()
-        user.addresses[0].email_address='ed2'
-        user.addresses[0].user # lazyload
+        user.addresses[0].email_address = 'ed2'
+        user.addresses[0].user  # lazyload
         del user
         gc_collect()
         assert len(s.identity_map) == 2
 
         s = sessionmaker()()
         mapper(User, users, properties={
-            "address":relationship(Address, backref="user", uselist=False)
+            "address": relationship(Address, backref="user", uselist=False)
         })
         mapper(Address, addresses)
         s.add(User(name="ed", address=Address(email_address="ed1")))
         assert len(s.identity_map) == 0
 
         user = s.query(User).options(joinedload(User.address)).one()
-        user.address.email_address='ed2'
-        user.address.user # lazyload
+        user.address.email_address = 'ed2'
+        user.address.user  # lazyload
 
         del user
         gc_collect()
         User, Address = self.classes.User, self.classes.Address
         users, addresses = self.tables.users, self.tables.addresses
         mapper(User, users, properties={
-            "addresses":relationship(Address)
+            "addresses": relationship(Address)
         })
         mapper(Address, addresses)
         return User, Address
         # can't predict result here
         # deterministically, depending on if
         # 'name' or 'addresses' is tested first
-        mod  = s.is_modified(u)
+        mod = s.is_modified(u)
         addresses_loaded = 'addresses' in u.__dict__
         assert mod is not addresses_loaded
 
 
         s = sessionmaker()()
 
-        mapper(User, users, properties={'uname':sa.orm.synonym('name')})
+        mapper(User, users, properties={'uname': sa.orm.synonym('name')})
         u = User(uname='fred')
         assert s.is_modified(u)
         s.add(u)
 
     @classmethod
     def define_tables(cls, metadata):
-        global t1
-        t1 = Table('t1', metadata, Column('id', Integer,
+        Table('t1', metadata, Column('id', Integer,
                    primary_key=True, test_needs_autoincrement=True),
                    Column('data', String(50)))
 
     @classmethod
-    def setup_mappers(cls):
-        global T
-        class T(object):
+    def setup_classes(cls):
+        class T(cls.Basic):
             def __init__(self, data):
                 self.data = data
-        mapper(T, t1)
+        mapper(T, cls.tables.t1)
 
     def teardown(self):
         from sqlalchemy.orm.session import _sessions
         """
 
         all_states = sess.identity_map.all_states()
-        sess.identity_map.all_states = lambda : all_states
+        sess.identity_map.all_states = lambda: all_states
         for obj in objs:
             state = attributes.instance_state(obj)
             sess.identity_map.discard(state)
             state._dispose()
 
     def _test_session(self, **kwargs):
-        global sess
+        T = self.classes.T
         sess = create_session(**kwargs)
 
         data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4'
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.