Commits

Mike Bayer  committed 417f2f9

check_reverse was failing a not well covered m2m case.

  • Participants
  • Parent commits d95ac76

Comments (0)

Files changed (4)

File lib/sqlalchemy/orm/dependency.py

         
         """
         for p in self.prop._reverse_property:
-            if not p.viewonly and p._dependency_processor:
-                return p.key < self.key
+            if not p.viewonly and p._dependency_processor and \
+                uow.has_dep(p._dependency_processor):
+                return True
         else:
             return False
 

File lib/sqlalchemy/orm/unitofwork.py

         else:
             return history.as_state()
     
+    def has_dep(self, processor):
+        return (processor, True) in self.presort_actions
+        
     def register_preprocessor(self, processor, fromparent):
         key = (processor, fromparent)
         if key not in self.presort_actions:

File test/aaa_profiling/test_memusage.py

             # dont need to clear_mappers()
             del B
             del A
-
+            
         metadata.create_all()
         try:
             go()

File test/orm/test_manytomany.py

-from sqlalchemy.test.testing import assert_raises, assert_raises_message
+from sqlalchemy.test.testing import assert_raises, assert_raises_message, eq_
 import sqlalchemy as sa
 from sqlalchemy.test import testing
 from sqlalchemy import Integer, String, ForeignKey
         # TODO: seems like just a test for an ancient exception throw.
         # how about some data/inserts/queries/assertions for this one
 
+class M2MTest4(_base.MappedTest):
+    @classmethod
+    def define_tables(cls, metadata):
+        table1 = Table("table1", metadata,
+            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+            Column('col2', String(30))
+            )
 
+        table2 = Table("table2", metadata,
+            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+            Column('col2', String(30)),
+            )
+
+        table3 = Table('table3', metadata,
+            Column('t1', Integer, ForeignKey('table1.col1')),
+            Column('t2', Integer, ForeignKey('table2.col1')),
+            )
+    
+    @testing.resolve_artifact_names
+    def test_delete_parent(self):
+        class A(_base.ComparableEntity):
+            pass
+        class B(_base.ComparableEntity):
+            pass
+
+        mapper(A, table1, properties={
+            'bs':relationship(B, secondary=table3, backref='as', order_by=table3.c.t1)
+        })
+        mapper(B, table2)
+
+        sess = create_session()
+        a1 = A(col2='a1')
+        a2 = A(col2='a2')
+        b1 = B(col2='b1')
+        b2 = B(col2='b2')
+        a1.bs.append(b1)
+        a2.bs.append(b2)
+        for x in [a1,a2]:
+            sess.add(x)
+        sess.flush()
+        sess.expunge_all()
+
+        alist = sess.query(A).order_by(A.col1).all()
+        eq_(
+            [
+                A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')])
+            ],
+            alist)
+
+        for a in alist:
+            sess.delete(a)
+        sess.flush()
+        eq_(sess.query(table3).count(), 0)
+
+