Commits

Mike Bayer committed 178efa8

- starting to groom the branch for its inclusion
- one-to-many relationships now maintain a list of positive
parent-child associations within the flush, preventing
previous parents marked as deleted from cascading a
delete or NULL foreign key set on those child objects,
despite the end-user not removing the child from the old
association. [ticket:1764]
- re-established Preprocess as unique on their arguments,
as they were definitely duped in inheritance scenarios
- added a "memo" feature to UOWTransaction which represents the usual
pattern of using the .attributes collection
- added the test case from [ticket:1081] into perf/

Comments (0)

Files changed (8)

 CHANGES
 =======
 
+0.6uow_refactor
+===============
+- orm
+  - Unit of work internals have been rewritten.  Units of work
+    with large numbers of objects interdependent objects 
+    can now be flushed without recursion overflows 
+    as there is no longer reliance upon recursive calls
+    [ticket:1081].  The number of internal structures now stays 
+    constant for a particular session state, regardless of 
+    how many relationships are present on mappings.  The flow 
+    of events now corresponds to a linear list of steps, 
+    generated by the mappers and relationships based on actual
+    work to be done, filtered through a single topological sort 
+    for correct ordering.  Flush actions are assembled using 
+    far fewer steps and less memory. [ticket:1742]
+  
+  - one-to-many relationships now maintain a list of positive
+    parent-child associations within the flush, preventing
+    previous parents marked as deleted from cascading a 
+    delete or NULL foreign key set on those child objects,
+    despite the end-user not removing the child from the old
+    association. [ticket:1764]
+  
 0.6.0
 =====
 

lib/sqlalchemy/__init__.py

 __all__ = sorted(name for name, obj in locals().items()
                  if not (name.startswith('_') or inspect.ismodule(obj)))
                  
-__version__ = '0.6beta3'
+__version__ = '0.6uow_refactor'
 
 del inspect, sys

lib/sqlalchemy/orm/dependency.py

 
 from sqlalchemy import sql, util
 import sqlalchemy.exceptions as sa_exc
-from sqlalchemy.orm import attributes, exc, sync, unitofwork
+from sqlalchemy.orm import attributes, exc, sync, unitofwork, util as mapperutil
 from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE, MANYTOMANY
 
 
                     "child are present" %
                      self.prop)
 
-    def _get_instrumented_attribute(self):
-        """Return the ``InstrumentedAttribute`` handled by this
-        ``DependencyProecssor``.
-        
-        """
-        return self.parent.class_manager.get_impl(self.key)
-
     def hasparent(self, state):
         """return True if the given object instance has a parent,
         according to the ``InstrumentedAttribute`` handled by this 
         ``DependencyProcessor``.
         
         """
-        # TODO: use correct API for this
-        return self._get_instrumented_attribute().hasparent(state)
+        return self.parent.class_manager.get_impl(self.key).hasparent(state)
 
     def per_property_preprocessors(self, uow):
         """establish actions and dependencies related to a flush.
                 (parent_saves, after_save),
                 (after_save, child_saves),
                 (after_save, child_deletes),
-
+    
                 (child_saves, parent_deletes),
                 (child_deletes, parent_deletes),
 
                             uowcommit.register_object(child, isdelete=True)
                         else:
                             uowcommit.register_object(child)
+                
                 if should_null_fks:
                     for child in history.unchanged:
                         if child is not None:
         
             
     def presort_saves(self, uowcommit, states):
+        children_added = uowcommit.memo(('children_added', self), set)
+        
         for state in states:
+            pks_changed = self._pks_changed(uowcommit, state)
+            
             history = uowcommit.get_attribute_history(
                                             state, 
                                             self.key, 
-                                            passive=True)
+                                            passive=not pks_changed 
+                                                    or self.passive_updates)
             if history:
                 for child in history.added:
                     if child is not None:
-                        uowcommit.register_object(child)
+                        uowcommit.register_object(child, cancel_delete=True)
+
+                children_added.update(history.added)
+
                 for child in history.deleted:
                     if not self.cascade.delete_orphan:
                         uowcommit.register_object(child, isdelete=False)
                             uowcommit.register_object(
                                 attributes.instance_state(c),
                                 isdelete=True)
-            if self._pks_changed(uowcommit, state):
-                if not history:
-                    history = uowcommit.get_attribute_history(
-                                        state, self.key, 
-                                        passive=self.passive_updates)
+
+            if pks_changed:
                 if history:
                     for child in history.unchanged:
                         if child is not None:
         # safely for any cascade but is unnecessary if delete cascade
         # is on.
         if self.post_update or not self.passive_deletes == 'all':
+            children_added = uowcommit.memo(('children_added', self), set)
+
             for state in states:
                 history = uowcommit.get_attribute_history(
                                             state, 
                                             uowcommit, 
                                             [state])
                     if self.post_update or not self.cascade.delete:
-                        for child in history.unchanged:
+                        for child in set(history.unchanged).\
+                                            difference(children_added):
                             if child is not None:
                                 self._synchronize(
                                             state, 
                                             child, 
                                             uowcommit, 
                                             [state])
-    
+                        # technically, we can even remove each child from the
+                        # collection here too.  but this would be a somewhat 
+                        # inconsistent behavior since it wouldn't happen if the old
+                        # parent wasn't deleted but child was moved.
+                            
     def process_saves(self, uowcommit, states):
         for state in states:
             history = uowcommit.get_attribute_history(state, self.key, passive=True)
         self._process_key_switches(states, uowcommit)
     
     def _key_switchers(self, uow, states):
-        if ('pk_switchers', self) in uow.attributes:
-            switched, notswitched = uow.attributes[('pk_switchers', self)]
-        else:
-            uow.attributes[('pk_switchers', self)] = (switched, notswitched) = (set(), set())
+        switched, notswitched = uow.memo(
+                                        ('pk_switchers', self), 
+                                        lambda: (set(), set())
+                                    )
             
         allstates = switched.union(notswitched)
         for s in states:

lib/sqlalchemy/orm/sync.py

         except exc.UnmappedColumnError:
             _raise_col_to_prop(False, source_mapper, l, None, r)
         history = uowcommit.get_attribute_history(source, prop.key, passive=True)
-        if len(history.deleted):
-            return True
+        return bool(history.deleted)
     else:
         return False
 

lib/sqlalchemy/orm/unitofwork.py

 
 """
 
-from sqlalchemy import util, log, topological
+from sqlalchemy import util, topological
 from sqlalchemy.orm import attributes, interfaces
 from sqlalchemy.orm import util as mapperutil
 from sqlalchemy.orm.util import _state_mapper
 
 # Load lazily
-object_session = None
 _state_session = None
 
 class UOWEventHandler(interfaces.AttributeExtension):
         # as a parent.
         self.mappers = util.defaultdict(set)
         
-        # a set of Preprocess objects, which gather
+        # a dictionary of Preprocess objects, which gather
         # additional states impacted by the flush
         # and determine if a flush action is needed
-        self.presort_actions = set()
+        self.presort_actions = {}
         
         # dictionary of PostSortRec objects, each 
         # one issues work during the flush within
         
         return state in self.states and self.states[state][0]
     
+    def memo(self, key, callable_):
+        if key in self.attributes:
+            return self.attributes[key]
+        else:
+            self.attributes[key] = ret = callable_()
+            return ret
+            
     def remove_state_actions(self, state):
         """remove pending actions for a state from the uowtransaction."""
         
             # if the cached lookup was "passive" and now we want non-passive, do a non-passive
             # lookup and re-cache
             if cached_passive and not passive:
-                history = attributes.get_state_history(state, key, passive=False)
+                history = state.get_history(key, passive=False)
                 self.attributes[hashkey] = (history, passive)
         else:
-            history = attributes.get_state_history(state, key, passive=passive)
+            history = state.get_history(key, passive=passive)
             self.attributes[hashkey] = (history, passive)
 
         if not history or not state.get_impl(key).uses_objects:
             return history.as_state()
     
     def register_preprocessor(self, processor, fromparent):
-        self.presort_actions.add(Preprocess(processor, fromparent))
+        key = (processor, fromparent)
+        if key not in self.presort_actions:
+            self.presort_actions[key] = Preprocess(processor, fromparent)
             
-    def register_object(self, state, isdelete=False, listonly=False):
+    def register_object(self, state, isdelete=False, 
+                            listonly=False, cancel_delete=False):
         if not self.session._contains_state(state):
             return
 
             self.mappers[mapper].add(state)
             self.states[state] = (isdelete, listonly)
         else:
-            existing_isdelete, existing_listonly = self.states[state]
-            self.states[state] = (
-                existing_isdelete or isdelete,
-                existing_listonly and listonly
-            )
+            if not listonly and (isdelete or cancel_delete):
+                self.states[state] = (isdelete, False)
     
     def issue_post_update(self, state, post_update_cols):
         mapper = state.manager.mapper.base_mapper
     
     @util.memoized_property
     def _mapper_for_dep(self):
+        """return a dynamic mapping of (Mapper, DependencyProcessor) to 
+        True or False, indicating if the DependencyProcessor operates 
+        on objects of that Mapper.
+        
+        The result is stored in the dictionary persistently once
+        calculated.
+        
+        """
         return util.PopulateDict(
                     lambda tup:tup[0]._props.get(tup[1].key) is tup[1].prop
                 )
     
     def filter_states_for_dep(self, dep, states):
+        """Filter the given list of InstanceStates to those relevant to the 
+        given DependencyProcessor.
+        
+        """
         mapper_for_dep = self._mapper_for_dep
         return [s for s in states if mapper_for_dep[(s.manager.mapper, dep)]]
         
                     yield state
     
     def _generate_actions(self):
+        """Generate the full, unsorted collection of PostSortRecs as
+        well as dependency pairs for this UOWTransaction.
+        
+        """
         # execute presort_actions, until all states
         # have been processed.   a presort_action might
         # add new states to the uow.
         while True:
             ret = False
-            for action in list(self.presort_actions):
+            for action in list(self.presort_actions.values()):
                 if action.execute(self):
                     ret = True
             if not ret:
         self.cycles = cycles = topological.find_cycles(
                                         self.dependencies, 
                                         self.postsort_actions.values())
-
+        
         if cycles:
             # if yes, break the per-mapper actions into
             # per-state actions
         #sort = topological.sort(self.dependencies, postsort_actions)
         #print "--------------"
         #print self.dependencies
-        #print postsort_actions
+        #print list(sort)
         #print "COUNT OF POSTSORT ACTIONS", len(postsort_actions)
         
         # execute
                 #   debug... would like to see how many do this
                 self.session._register_newly_persistent(state)
 
-log.class_logger(UOWTransaction)
-
 class IterateMappersMixin(object):
     def _mappers(self, uow):
         if self.fromparent:

test/orm/test_cascade.py

         assert c not in s, "Should expunge customer when both parents are gone"
 
         
-        
 class DoubleParentOrphanTest(_base.MappedTest):
     """test orphan detection for an entity with two parent relationships"""
 
         eq_(sess.query(A).get(a1.id),
             A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))
 
+class O2MConflictTest(_base.MappedTest):
+    """test that O2M dependency detects a change in parent, does the
+    right thing, and even updates the collection/attribute.
+    
+    """
+    
+    @classmethod
+    def define_tables(cls, metadata):
+        Table("parent", metadata,
+            Column("id", Integer, primary_key=True, test_needs_autoincrement=True)
+        )
+        Table("child", metadata,
+            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+            Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False)
+        )
+    
+    @classmethod
+    def setup_classes(cls):
+        class Parent(_base.ComparableEntity):
+            pass
+        class Child(_base.ComparableEntity):
+            pass
+    
+    @testing.resolve_artifact_names
+    def _do_delete_old_test(self):
+        sess = create_session()
+        
+        p1, p2, c1 = Parent(), Parent(), Child()
+        if Parent.child.property.uselist:
+            p1.child.append(c1)
+        else:
+            p1.child = c1
+        sess.add_all([p1, c1])
+        sess.flush()
+        
+        sess.delete(p1)
+        
+        if Parent.child.property.uselist:
+            p2.child.append(c1)
+        else:
+            p2.child = c1
+        sess.add(p2)
+
+        sess.flush()
+        eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
+
+    @testing.resolve_artifact_names
+    def _do_move_test(self):
+        sess = create_session()
+
+        p1, p2, c1 = Parent(), Parent(), Child()
+        if Parent.child.property.uselist:
+            p1.child.append(c1)
+        else:
+            p1.child = c1
+        sess.add_all([p1, c1])
+        sess.flush()
+
+        if Parent.child.property.uselist:
+            p2.child.append(c1)
+        else:
+            p2.child = c1
+        sess.add(p2)
+
+        sess.flush()
+        eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
+        
+    @testing.resolve_artifact_names
+    def test_o2o_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=False)
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2m_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=True)
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2o_backref_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=False, backref='parent')
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+        
+    @testing.resolve_artifact_names
+    def test_o2o_delcascade_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=False, cascade="all, delete")
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2o_delorphan_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan")
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2o_delorphan_backref_delete_old(self):
+        mapper(Parent, parent, properties={
+            'child':relationship(Child, uselist=False, 
+                                        cascade="all, delete, delete-orphan", 
+                                        backref='parent')
+        })
+        mapper(Child, child)
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2o_backref_delorphan_delete_old(self):
+        mapper(Parent, parent)
+        mapper(Child, child, properties = {
+            'parent' : relationship(Parent, uselist=False, single_parent=True, 
+                                backref=backref('child', uselist=False), 
+                                cascade="all,delete,delete-orphan")
+        })
+        self._do_delete_old_test()
+        self._do_move_test()
+
+    @testing.resolve_artifact_names
+    def test_o2m_backref_delorphan_delete_old(self):
+        mapper(Parent, parent)
+        mapper(Child, child, properties = {
+            'parent' : relationship(Parent, uselist=False, single_parent=True, 
+                                backref=backref('child', uselist=True), 
+                                cascade="all,delete,delete-orphan")
+        })
+        self._do_delete_old_test()
+        self._do_move_test()
+        
 
 class PartialFlushTest(_base.MappedTest):
     """test cascade behavior as it relates to object lists passed to flush().

test/orm/test_unitofworkv2.py

             composite_pk_table, CompositePk
 
 class AssertsUOW(object):
-    def _assert_uow_size(self,
-        session, 
-        expected
-    ):
+    def _get_test_uow(self, session):
         uow = unitofwork.UOWTransaction(session)
         deleted = set(session._deleted)
         new = set(session._new)
             uow.register_object(s)
         for d in deleted:
             uow.register_object(d, isdelete=True)
+        return uow
+        
+    def _assert_uow_size(self,
+        session, 
+        expected
+    ):
+        uow = self._get_test_uow(session)
         postsort_actions = uow._generate_actions()
         print postsort_actions
         eq_(len(postsort_actions), expected, postsort_actions)
 class UOWTest(_fixtures.FixtureTest, testing.AssertsExecutionResults, AssertsUOW):
     run_inserts = None
 
-    
-
 class RudimentaryFlushTest(UOWTest):
 
     def test_one_to_many_save(self):
                     {'id':u1.id}
                 ),
         )
-
+    
     def test_m2o_flush_size(self):
         mapper(User, users)
         mapper(Address, addresses, properties={

test/perf/large_flush.py

+import sqlalchemy as sa
+from sqlalchemy import create_engine, MetaData, orm
+from sqlalchemy import Column, ForeignKey
+from sqlalchemy import Integer, String
+from sqlalchemy.orm import mapper
+from sqlalchemy.test import profiling
+
+class Object(object):
+    pass
+
+class Q(Object):
+    pass
+
+class A(Object):
+    pass
+
+class C(Object):
+    pass
+
+class WC(C):
+    pass
+
+engine = create_engine('sqlite:///:memory:', echo=True)
+
+sm = orm.sessionmaker(bind=engine)
+
+SA_Session = orm.scoped_session(sm)
+
+SA_Metadata = MetaData()
+
+object_table =  sa.Table('Object',
+                          SA_Metadata,
+                          Column('ObjectID', Integer,primary_key=True),
+                          Column('Type', String(1), nullable=False))
+
+q_table = sa.Table('Q',
+                   SA_Metadata,
+                   Column('QID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
+
+c_table = sa.Table('C',
+                   SA_Metadata,
+                   Column('CID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
+
+wc_table = sa.Table('WC',
+                    SA_Metadata,
+                    Column('WCID', Integer, ForeignKey('C.CID'), primary_key=True))
+
+a_table = sa.Table('A',
+                   SA_Metadata,
+                   Column('AID', Integer, ForeignKey('Object.ObjectID'),primary_key=True),
+                   Column('QID', Integer, ForeignKey('Q.QID')),
+                   Column('CID', Integer, ForeignKey('C.CID')))
+
+mapper(Object, object_table, polymorphic_on=object_table.c.Type, polymorphic_identity='O')
+
+mapper(Q, q_table, inherits=Object, polymorphic_identity='Q')
+mapper(C, c_table, inherits=Object, polymorphic_identity='C')
+mapper(WC, wc_table, inherits=C, polymorphic_identity='W')
+
+mapper(A, a_table, inherits=Object, polymorphic_identity='A',
+       properties = {
+                     'Q' : orm.relation(Q,primaryjoin=a_table.c.QID==q_table.c.QID,
+                                        backref='As'
+                                        ),
+                     'C' : orm.relation(C,primaryjoin=a_table.c.CID==c_table.c.CID,
+                                        backref='A',
+                                        uselist=False)
+                     }
+       )
+
+SA_Metadata.create_all(engine)
+
+@profiling.profiled('large_flush', always=True, sort=['file'])
+def generate_error():
+    q = Q()
+    for j in range(100): #at 306 the error does not pop out (depending on recursion depth)
+        a = A()
+        a.Q = q
+        a.C = WC()
+
+    SA_Session.add(q)
+    SA_Session.commit() #here the error pops out
+
+generate_error()