Commits

Mike Bayer committed d928096

- Added a mutex to the identity map which mutexes
remove operations against iteration methods,
which now pre-buffer before returning an
iterable. This because asyncrhonous gc
can remove items via the gc thread at any time.
[ticket:1891]

Comments (0)

Files changed (3)

     ConcurrentModificationError in an "except:"
     clause.
 
+  - Added a mutex to the identity map which mutexes
+    remove operations against iteration methods,
+    which now pre-buffer before returning an 
+    iterable.   This because asyncrhonous gc 
+    can remove items via the gc thread at any time.
+    [ticket:1891]
+    
   - The Session class is now present in sqlalchemy.orm.*.
     We're moving away from the usage of create_session(),
     which has non-standard defaults, for those situations

lib/sqlalchemy/orm/identity.py

         self._mutable_attrs = set()
         self._modified = set()
         self._wr = weakref.ref(self)
-
+        
     def replace(self, state):
         raise NotImplementedError()
         
             
     def has_key(self, key):
         return key in self
-        
+    
     def popitem(self):
         raise NotImplementedError("IdentityMap uses remove() to remove data")
 
         raise NotImplementedError("IdentityMap uses remove() to remove data")
         
 class WeakInstanceDict(IdentityMap):
+    def __init__(self):
+        IdentityMap.__init__(self)
+        self._remove_mutex = base_util.threading.Lock()
 
     def __getitem__(self, key):
         state = dict.__getitem__(self, key)
         self.remove(state)
         
     def remove(self, state):
-        if dict.pop(self, state.key) is not state:
-            raise AssertionError("State %s is not present in this identity map" % state)
+        self._remove_mutex.acquire()
+        try:
+            if dict.pop(self, state.key) is not state:
+                raise AssertionError("State %s is not present in this identity map" % state)
+        finally:
+            self._remove_mutex.release()
+            
         self._manage_removed_state(state)
     
     def discard(self, state):
         if o is None:
             return default
         return o
+
+
+    def items(self):
+    # Py2K
+        return list(self.iteritems())
     
-    # Py2K        
-    def items(self):
-        return list(self.iteritems())
+    def iteritems(self):
+    # end Py2K
+        self._remove_mutex.acquire()
+        try:
+            result = []
+            for state in dict.values(self):
+                value = state.obj()
+                if value is not None:
+                    result.append((state.key, value))
 
-    def iteritems(self):
-        for state in dict.itervalues(self):
-    # end Py2K
-    # Py3K
-    #def items(self):
-    #    for state in dict.values(self):
-            value = state.obj()
-            if value is not None:
-                yield state.key, value
-
+            return iter(result)
+        finally:
+            self._remove_mutex.release()
+        
+    def values(self):
     # Py2K
-    def values(self):
         return list(self.itervalues())
 
     def itervalues(self):
-        for state in dict.itervalues(self):
     # end Py2K
-    # Py3K
-    #def values(self):
-    #    for state in dict.values(self):
-            instance = state.obj()
-            if instance is not None:
-                yield instance
+        self._remove_mutex.acquire()
+        try:
+            result = []
+            for state in dict.values(self):
+                value = state.obj()
+                if value is not None:
+                    result.append(value)
 
+            return iter(result)
+        finally:
+            self._remove_mutex.release()
+    
     def all_states(self):
-        # Py3K
-        # return list(dict.values(self))
+        self._remove_mutex.acquire()
+        try:
+            # Py3K
+            # return list(dict.values(self))
         
-        # Py2K
-        return dict.values(self)
-        # end Py2K
-    
+            # Py2K
+            return dict.values(self)
+            # end Py2K
+        finally:
+            self._remove_mutex.release()
+            
     def prune(self):
         return 0
         

test/orm/test_session.py

 import inspect
 import pickle
 from sqlalchemy.orm import create_session, sessionmaker, attributes, \
-    make_transient
+    make_transient, Session
 from sqlalchemy.orm.attributes import instance_state
 import sqlalchemy as sa
 from sqlalchemy.test import engines, testing, config
         assert b in sess
         assert len(list(sess)) == 1
 
+    @testing.resolve_artifact_names
+    def test_identity_map_mutate(self):
+        mapper(User, users)
+
+        sess = Session()
+        
+        sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
+        sess.commit()
+        
+        u1, u2, u3 = sess.query(User).all()
+        for i, (key, value) in enumerate(sess.identity_map.iteritems()):
+            if i == 2:
+                del u3
+                gc_collect()
+        
+        
 class DisposedStates(_base.MappedTest):
     run_setup_mappers = 'once'
     run_inserts = 'once'