Commits

Armin Rigo committed 47c0f94

Fix keepalive issues in buffer(array(..)).

  • Participants
  • Parent commits 849a04a

Comments (0)

Files changed (2)

File pypy/module/array/interp_array.py

 from pypy.objspace.std.register_all import register_all
 from pypy.rlib.rarithmetic import ovfcheck
 from pypy.rlib.unroll import unrolling_iterable
-from pypy.rlib.objectmodel import specialize
+from pypy.rlib.objectmodel import specialize, keepalive_until_here
 from pypy.rpython.lltypesystem import lltype, rffi
 
 
 unroll_typecodes = unrolling_iterable(types.keys())
 
 class ArrayBuffer(RWBuffer):
-    def __init__(self, data, bytes):
-        self.data = data
-        self.len = bytes
+    def __init__(self, array):
+        self.array = array
 
     def getlength(self):
-        return self.len
+        return self.array.len * self.array.itemsize
 
     def getitem(self, index):
-        return self.data[index]
+        array = self.array
+        data = array._charbuf_start()
+        char = data[index]
+        array._charbuf_stop()
+        return char
 
     def setitem(self, index, char):
-        self.data[index] = char
+        array = self.array
+        data = array._charbuf_start()
+        data[index] = char
+        array._charbuf_stop()
 
 
 def make_array(mytype):
             oldlen = self.len
             new = len(s) / mytype.bytes
             self.setlen(oldlen + new)
-            cbuf = self.charbuf()
+            cbuf = self._charbuf_start()
             for i in range(len(s)):
                 cbuf[oldlen * mytype.bytes + i] = s[i]
+            self._charbuf_stop()
 
         def fromlist(self, w_lst):
             s = self.len
             else:
                 self.fromsequence(w_iterable)
 
-        def charbuf(self):
-            return  rffi.cast(rffi.CCHARP, self.buffer)
+        def _charbuf_start(self):
+            return rffi.cast(rffi.CCHARP, self.buffer)
+
+        def _charbuf_stop(self):
+            keepalive_until_here(self)
 
         def w_getitem(self, space, idx):
             item = self.buffer[idx]
         self.fromstring(space.str_w(w_s))
 
     def array_tostring__Array(space, self):
-        cbuf = self.charbuf()
-        return self.space.wrap(rffi.charpsize2str(cbuf, self.len * mytype.bytes))
+        cbuf = self._charbuf_start()
+        s = rffi.charpsize2str(cbuf, self.len * mytype.bytes)
+        self._charbuf_stop()
+        return self.space.wrap(s)
 
     def array_fromfile__Array_ANY_ANY(space, self, w_f, w_n):
         if not isinstance(w_f, W_File):
     # Misc methods
 
     def buffer__Array(space, self):
-        b = ArrayBuffer(self.charbuf(), self.len * mytype.bytes)
-        return space.wrap(b)
+        return space.wrap(ArrayBuffer(self))
 
     def array_buffer_info__Array(space, self):
         w_ptr = space.wrap(rffi.cast(lltype.Unsigned, self.buffer))
             raise OperationError(space.w_RuntimeError, space.wrap(msg))
         if self.len == 0:
             return
-        bytes = self.charbuf()
+        bytes = self._charbuf_start()
         tmp = [bytes[0]] * mytype.bytes
         for start in range(0, self.len * mytype.bytes, mytype.bytes):
             stop = start + mytype.bytes - 1
                 tmp[i] = bytes[start + i]
             for i in range(mytype.bytes):
                 bytes[stop - i] = tmp[i]
+        self._charbuf_stop()
 
     def repr__Array(space, self):
         if self.len == 0:

File pypy/module/array/test/test_array.py

         a = self.array('h', 'Hi')
         buf = buffer(a)
         assert buf[1] == 'i'
-        #raises(TypeError, buf.__setitem__, 1, 'o')
+
+    def test_buffer_write(self):
+        a = self.array('c', 'hello')
+        buf = buffer(a)
+        print repr(buf)
+        try:
+            buf[3] = 'L'
+        except TypeError:
+            skip("buffer(array) returns a read-only buffer on CPython")
+        assert a.tostring() == 'helLo'
+
+    def test_buffer_keepalive(self):
+        buf = buffer(self.array('c', 'text'))
+        assert buf[2] == 'x'
+        #
+        a = self.array('c', 'foobarbaz')
+        buf = buffer(a)
+        a.fromstring('some extra text')
+        assert buf[:] == 'foobarbazsome extra text'
 
     def test_list_methods(self):
         assert repr(self.array('i')) == "array('i')"