Commits

Armin Rigo committed 1101ea5

Moved the "weak list" logic out of _cffi_backend/handle.py.
Use it in module/_io/interp_iobase.py. This should fix the
issue #1683.

Comments (0)

Files changed (6)

pypy/module/_cffi_backend/handle.py

 from pypy.interpreter.error import OperationError, operationerrfmt
 from pypy.interpreter.gateway import unwrap_spec
 from pypy.module._cffi_backend import ctypeobj, ctypeptr, cdataobj
-from pypy.module._weakref.interp__weakref import dead_ref
 from rpython.rtyper.lltypesystem import lltype, rffi
+from rpython.rlib import rweaklist
 
 
-def reduced_value(s):
-    while True:
-        divide = s & 1
-        s >>= 1
-        if not divide:
-            return s
-
-# ____________________________________________________________
-
-
-class CffiHandles:
+class CffiHandles(rweaklist.RWeakListMixin):
     def __init__(self, space):
-        self.handles = []
-        self.look_distance = 0
-
-    def reserve_next_handle_index(self):
-        # The reservation ordering done here is tweaked for pypy's
-        # memory allocator.  We look from index 'look_distance'.
-        # Look_distance increases from 0.  But we also look at
-        # "look_distance/2" or "/4" or "/8", etc.  If we find that one
-        # of these secondary locations is free, we assume it's because
-        # there was recently a minor collection; so we reset
-        # look_distance to 0 and start again from the lowest locations.
-        length = len(self.handles)
-        for d in range(self.look_distance, length):
-            if self.handles[d]() is None:
-                self.look_distance = d + 1
-                return d
-            s = reduced_value(d)
-            if self.handles[s]() is None:
-                break
-        # restart from the beginning
-        for d in range(0, length):
-            if self.handles[d]() is None:
-                self.look_distance = d + 1
-                return d
-        # full! extend, but don't use '+=' here
-        self.handles = self.handles + [dead_ref] * (length // 3 + 5)
-        self.look_distance = length + 1
-        return length
-
-    def store_handle(self, index, content):
-        self.handles[index] = weakref.ref(content)
-
-    def fetch_handle(self, index):
-        if 0 <= index < len(self.handles):
-            return self.handles[index]()
-        return None
+        self.initialize()
 
 def get(space):
     return space.fromcache(CffiHandles)

pypy/module/_io/interp_iobase.py

 from pypy.interpreter.gateway import interp2app
 from pypy.interpreter.error import OperationError, operationerrfmt
 from rpython.rlib.rstring import StringBuilder
-from rpython.rlib import rweakref
+from rpython.rlib import rweakref, rweaklist
 
 
 DEFAULT_BUFFER_SIZE = 8192
         self.space = space
         self.w_dict = space.newdict()
         self.__IOBase_closed = False
-        self.streamholder = None # needed by AutoFlusher
         get_autoflusher(space).add(self)
 
     def getdict(self, space):
             space.call_method(self, "flush")
         finally:
             self.__IOBase_closed = True
-            get_autoflusher(space).remove(self)
 
     def flush_w(self, space):
         if self._CLOSED():
 # functions to make sure that all streams are flushed on exit
 # ------------------------------------------------------------
 
-class StreamHolder(object):
-    def __init__(self, w_iobase):
-        self.w_iobase_ref = rweakref.ref(w_iobase)
 
-    def autoflush(self, space):
-        w_iobase = self.w_iobase_ref()
-        if w_iobase is not None:
-            try:
-                space.call_method(w_iobase, 'flush')
-            except OperationError:
-                # Silencing all errors is bad, but getting randomly
-                # interrupted here is equally as bad, and potentially
-                # more frequent (because of shutdown issues).
-                pass 
-
-
-class AutoFlusher(object):
+class AutoFlusher(rweaklist.RWeakListMixin):
     def __init__(self, space):
-        self.streams = {}
+        self.initialize()
 
     def add(self, w_iobase):
-        assert w_iobase.streamholder is None
         if rweakref.has_weakref_support():
-            holder = StreamHolder(w_iobase)
-            w_iobase.streamholder = holder
-            self.streams[holder] = None
+            self.add_handle(w_iobase)
         #else:
         #   no support for weakrefs, so ignore and we
         #   will not get autoflushing
 
-    def remove(self, w_iobase):
-        holder = w_iobase.streamholder
-        if holder is not None:
-            try:
-                del self.streams[holder]
-            except KeyError:
-                # this can happen in daemon threads
-                pass
-
     def flush_all(self, space):
-        while self.streams:
-            for streamholder in self.streams.keys():
+        while True:
+            handles = self.get_all_handles()
+            if len(handles) == 0:
+                break
+            self.initialize()  # reset the state here
+            for wr in handles:
+                w_iobase = wr()
+                if w_iobase is None:
+                    continue
                 try:
-                    del self.streams[streamholder]
-                except KeyError:
-                    pass    # key was removed in the meantime
-                else:
-                    streamholder.autoflush(space)
+                    space.call_method(w_iobase, 'flush')
+                except OperationError:
+                    # Silencing all errors is bad, but getting randomly
+                    # interrupted here is equally as bad, and potentially
+                    # more frequent (because of shutdown issues).
+                    pass 
 
 def get_autoflusher(space):
     return space.fromcache(AutoFlusher)

pypy/module/_weakref/interp__weakref.py

 from rpython.rlib import jit
 from rpython.rlib.rshrinklist import AbstractShrinkList
 from rpython.rlib.objectmodel import specialize
+from rpython.rlib.rweakref import dead_ref
 import weakref
 
 
 
 # ____________________________________________________________
 
-class Dummy:
-    pass
-dead_ref = weakref.ref(Dummy())
-for i in range(5):
-    if dead_ref() is not None:
-        import gc; gc.collect()
-assert dead_ref() is None
-
 
 class W_WeakrefBase(W_Root):
     def __init__(w_self, space, w_obj, w_callable):

rpython/rlib/rweaklist.py

+import weakref
+from rpython.rlib.rweakref import dead_ref
+
+
+def _reduced_value(s):
+    while True:
+        divide = s & 1
+        s >>= 1
+        if not divide:
+            return s
+
+
+class RWeakListMixin(object):
+    _mixin_ = True
+
+    def initialize(self):
+        self.handles = []
+        self.look_distance = 0
+
+    def get_all_handles(self):
+        return self.handles
+
+    def reserve_next_handle_index(self):
+        # The reservation ordering done here is tweaked for pypy's
+        # memory allocator.  We look from index 'look_distance'.
+        # Look_distance increases from 0.  But we also look at
+        # "look_distance/2" or "/4" or "/8", etc.  If we find that one
+        # of these secondary locations is free, we assume it's because
+        # there was recently a minor collection; so we reset
+        # look_distance to 0 and start again from the lowest locations.
+        length = len(self.handles)
+        for d in range(self.look_distance, length):
+            if self.handles[d]() is None:
+                self.look_distance = d + 1
+                return d
+            s = _reduced_value(d)
+            if self.handles[s]() is None:
+                break
+        # restart from the beginning
+        for d in range(0, length):
+            if self.handles[d]() is None:
+                self.look_distance = d + 1
+                return d
+        # full! extend, but don't use '+=' here
+        self.handles = self.handles + [dead_ref] * (length // 3 + 5)
+        self.look_distance = length + 1
+        return length
+
+    def add_handle(self, content):
+        index = self.reserve_next_handle_index()
+        self.store_handle(index, content)
+        return index
+
+    def store_handle(self, index, content):
+        self.handles[index] = weakref.ref(content)
+
+    def fetch_handle(self, index):
+        if 0 <= index < len(self.handles):
+            return self.handles[index]()
+        return None

rpython/rlib/rweakref.py

 def has_weakref_support():
     return True      # returns False if --no-translation-rweakref
 
+class Dummy:
+    pass
+dead_ref = weakref.ref(Dummy())
+for i in range(5):
+    if dead_ref() is not None:
+        import gc; gc.collect()
+assert dead_ref() is None      # a known-to-be-dead weakref object
+
 
 class RWeakValueDictionary(object):
     """A dictionary containing weak values."""

rpython/rlib/test/test_rweaklist.py

+import gc
+from rpython.rlib.rweaklist import RWeakListMixin
+
+
+class A(object):
+    pass
+
+
+def test_simple():
+    a1 = A(); a2 = A()
+    wlist = RWeakListMixin(); wlist.initialize()
+    i = wlist.add_handle(a1)
+    assert i == 0
+    i = wlist.reserve_next_handle_index()
+    assert i == 1
+    wlist.store_handle(i, a2)
+    assert wlist.fetch_handle(0) is a1
+    assert wlist.fetch_handle(1) is a2
+    #
+    del a2
+    for i in range(5):
+        gc.collect()
+        if wlist.fetch_handle(1) is None:
+            break
+    else:
+        raise AssertionError("handle(1) did not disappear")
+    assert wlist.fetch_handle(0) is a1
+
+def test_reuse():
+    alist = [A() for i in range(200)]
+    wlist = RWeakListMixin(); wlist.initialize()
+    for i in range(200):
+        j = wlist.reserve_next_handle_index()
+        assert j == i
+        wlist.store_handle(i, alist[i])
+    #
+    del alist[1::2]
+    del alist[1::2]
+    del alist[1::2]
+    del alist[1::2]
+    del alist[1::2]
+    for i in range(5):
+        gc.collect()
+    #
+    for i in range(200):
+        a = wlist.fetch_handle(i)
+        if i % 32 == 0:
+            assert a is alist[i // 32]
+        else:
+            assert a is None
+    #
+    maximum = -1
+    for i in range(200):
+        j = wlist.reserve_next_handle_index()
+        maximum = max(maximum, j)
+        wlist.store_handle(j, A())
+    assert maximum <= 240