Commits

Mike Bayer committed 958307d

- [feature] The Session will produce warnings
when unsupported methods are used inside the
"execute" portion of the flush. These are
the familiar methods add(), delete(), etc.
as well as collection and related-object
manipulations, as called within mapper-level
flush events
like after_insert(), after_update(), etc.
It's been prominently documented for a long
time that SQLAlchemy cannot guarantee
results when the Session is manipulated within
the execution of the flush plan,
however users are still doing it, so now
there's a warning. Maybe someday the Session
will be enhanced to support these operations
inside of the flush, but for now, results
can't be guaranteed.

Comments (0)

Files changed (4)

     need autoflush w pre-attached object.
     [ticket:2464]
 
+  - [feature] The Session will produce warnings
+    when unsupported methods are used inside the
+    "execute" portion of the flush.   These are
+    the familiar methods add(), delete(), etc.
+    as well as collection and related-object
+    manipulations, as called within mapper-level
+    flush events
+    like after_insert(), after_update(), etc.
+    It's been prominently documented for a long
+    time that  SQLAlchemy cannot guarantee
+    results when the Session is manipulated within
+    the execution of the flush plan,
+    however users are still doing it, so now
+    there's a warning.   Maybe someday the Session
+    will be enhanced to support these operations
+    inside of the flush, but for now, results
+    can't be guaranteed.
+
   - [feature] ORM entities can be passed
     to select() as well as the select_from(),
     correlate(), and correlate_except()

lib/sqlalchemy/orm/session.py

         self.bind = bind
         self.__binds = {}
         self._flushing = False
+        self._warn_on_events = False
         self.transaction = None
         self.hash_key = _new_sessionid()
         self.autoflush = autoflush
             self._deleted.pop(state, None)
             state.deleted = True
 
-    def add(self, instance):
+    def add(self, instance, _warn=True):
         """Place an object in the ``Session``.
 
         Its state will be persisted to the database on the next flush
         is ``expunge()``.
 
         """
+        if _warn and self._warn_on_events:
+            self._flush_warning("Session.add()")
+
         try:
             state = attributes.instance_state(instance)
         except exc.NO_STATE:
     def add_all(self, instances):
         """Add the given collection of instances to this ``Session``."""
 
+        if self._warn_on_events:
+            self._flush_warning("Session.add_all()")
+
         for instance in instances:
-            self.add(instance)
+            self.add(instance, _warn=False)
 
     def _save_or_update_state(self, state):
         self._save_or_update_impl(state)
         The database delete operation occurs upon ``flush()``.
 
         """
+        if self._warn_on_events:
+            self._flush_warning("Session.delete()")
+
         try:
             state = attributes.instance_state(instance)
         except exc.NO_STATE:
 
         """
 
+        if self._warn_on_events:
+            self._flush_warning("Session.merge()")
+
         _recursive = {}
 
         if load:
         finally:
             self._flushing = False
 
+    def _flush_warning(self, method):
+        util.warn("Usage of the '%s' operation is not currently supported "
+                    "within the execution stage of the flush process. "
+                    "Results may not be consistent.  Consider using alternative "
+                    "event listeners or connection-level operations instead."
+                    % method)
+
     def _is_clean(self):
         return not self.identity_map.check_modified() and \
                 not self._deleted and \
         flush_context.transaction = transaction = self.begin(
             subtransactions=True)
         try:
-            flush_context.execute()
+            self._warn_on_events = True
+            try:
+                flush_context.execute()
+            finally:
+                self._warn_on_events = False
 
             self.dispatch.after_flush(self, flush_context)
 

lib/sqlalchemy/orm/unitofwork.py

 
         sess = sessionlib._state_session(state)
         if sess:
+            if sess._warn_on_events:
+                sess._flush_warning("collection append")
+
             prop = state.manager.mapper._props[key]
             item_state = attributes.instance_state(item)
             if prop.cascade.save_update and \
 
         sess = sessionlib._state_session(state)
         if sess:
+
             prop = state.manager.mapper._props[key]
+
+            if sess._warn_on_events:
+                sess._flush_warning(
+                        "collection remove"
+                        if prop.uselist
+                        else "related attribute delete")
+
             # expunge pending orphans
             item_state = attributes.instance_state(item)
             if prop.cascade.delete_orphan and \
 
         sess = sessionlib._state_session(state)
         if sess:
+
+            if sess._warn_on_events:
+                sess._flush_warning("related attribute set")
+
             prop = state.manager.mapper._props[key]
             if newvalue is not None:
                 newvalue_state = attributes.instance_state(newvalue)

test/orm/test_session.py

 from sqlalchemy.util import pypy
 from sqlalchemy.testing import fixtures
 from test.orm import _fixtures
+from sqlalchemy import event, ForeignKey
+
 
 class SessionTest(_fixtures.FixtureTest):
     run_inserts = None
         sess.flush()
         self.bind.commit()
 
+
+
+class FlushWarningsTest(fixtures.MappedTest):
+    run_setup_mappers = 'each'
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table('user', metadata,
+                Column('id', Integer, primary_key=True,
+                            test_needs_autoincrement=True),
+                Column('name', String(20))
+            )
+
+        Table('address', metadata,
+                Column('id', Integer, primary_key=True,
+                            test_needs_autoincrement=True),
+                Column('user_id', Integer, ForeignKey('user.id')),
+                Column('email', String(20))
+            )
+
+    @classmethod
+    def setup_classes(cls):
+        class User(cls.Basic):
+            pass
+        class Address(cls.Basic):
+            pass
+
+    @classmethod
+    def setup_mappers(cls):
+        user, User = cls.tables.user, cls.classes.User
+        address, Address = cls.tables.address, cls.classes.Address
+        mapper(User, user, properties={
+                'addresses': relationship(Address, backref="user")
+            })
+        mapper(Address, address)
+
+    def test_o2m_cascade_add(self):
+        Address = self.classes.Address
+        def evt(mapper, conn, instance):
+            instance.addresses.append(Address(email='x1'))
+        self._test(evt, "collection append")
+
+    def test_o2m_cascade_remove(self):
+        def evt(mapper, conn, instance):
+            del instance.addresses[0]
+        self._test(evt, "collection remove")
+
+    def test_m2o_cascade_add(self):
+        User = self.classes.User
+        def evt(mapper, conn, instance):
+            instance.addresses[0].user = User(name='u2')
+        self._test(evt, "related attribute set")
+
+    def test_m2o_cascade_remove(self):
+        def evt(mapper, conn, instance):
+            a1 = instance.addresses[0]
+            del a1.user
+        self._test(evt, "related attribute delete")
+
+    def test_plain_add(self):
+        Address = self.classes.Address
+        def evt(mapper, conn, instance):
+            object_session(instance).add(Address(email='x1'))
+        self._test(evt, "Session.add\(\)")
+
+    def test_plain_merge(self):
+        Address = self.classes.Address
+        def evt(mapper, conn, instance):
+            object_session(instance).merge(Address(email='x1'))
+        self._test(evt, "Session.merge\(\)")
+
+    def test_plain_delete(self):
+        Address = self.classes.Address
+        def evt(mapper, conn, instance):
+            object_session(instance).delete(Address(email='x1'))
+        self._test(evt, "Session.delete\(\)")
+
+    def _test(self, fn, method):
+        User = self.classes.User
+        Address = self.classes.Address
+
+        s = Session()
+        event.listen(User, "after_insert", fn)
+
+        u1 = User(name='u1', addresses=[Address(name='a1')])
+        s.add(u1)
+        assert_raises_message(
+            sa.exc.SAWarning,
+            "Usage of the '%s'" % method,
+            s.commit
+        )