Commits

Armin Rigo committed cfbfd12

Write a more reasonable buffer implementation for mmap.

  • Participants
  • Parent commits 34b8e7d

Comments (0)

Files changed (3)

File pypy/module/mmap/interp_mmap.py

 from pypy.interpreter.baseobjspace import W_Root
 from pypy.interpreter.typedef import TypeDef
 from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.interpreter.buffer import RWBuffer
 from rpython.rlib import rmmap, rarithmetic
 from rpython.rlib.rmmap import RValueError, RTypeError
 
         return self.space.wrap(self.mmap.size)
 
     def check_valid(self):
+        # XXX the check_xxx() are inconsistent in this file.  Please review!
+        # For example readline() might raise an interp-level RValueError.
         try:
             self.mmap.check_valid()
         except RValueError, v:
         start, stop, step = space.decode_index(w_index, self.mmap.size)
         if step == 0:  # index only
             return space.wrap(self.mmap.getitem(start))
+        elif step == 1:
+            return space.wrap(self.mmap.getslice(start, stop - start))
         else:
             res = "".join([self.mmap.getitem(i)
                            for i in range(start, stop, step)])
             if len(value) != length:
                 raise OperationError(space.w_ValueError,
                           space.wrap("mmap slice assignment is wrong size"))
-            for i in range(length):
-                self.mmap.setitem(start, value[i])
-                start += step
+            if step == 1:
+                self.mmap.setslice(start, value)
+            else:
+                for i in range(length):
+                    self.mmap.setitem(start, value[i])
+                    start += step
 
     def descr_buffer(self):
-        # XXX improve to work directly on the low-level address
-        from pypy.interpreter.buffer import StringLikeBuffer
-        space = self.space
-        return space.wrap(StringLikeBuffer(space, space.wrap(self)))
+        self.check_valid()
+        return self.space.wrap(MMapBuffer(self.space, self.mmap))
 
 if rmmap._POSIX:
 
 def mmap_error(space, e):
     w_error = space.fromcache(Cache).w_error
     return wrap_oserror(space, e, w_exception_class=w_error)
+
+
+class MMapBuffer(RWBuffer):
+    def __init__(self, space, mmap):
+        self.space = space
+        self.mmap = mmap
+
+    def get_raw_address(self):
+        return self.mmap.data
+
+    def getlength(self):
+        return self.mmap.size
+
+    def getitem(self, index):
+        return self.mmap.data[index]
+
+    def getslice(self, start, stop, step, size):
+        if step == 1:
+            return self.mmap._getslice(start, size)
+        else:
+            return RWBuffer.getslice(self, start, stop, step, size)
+
+    def check_writeable(self):
+        try:
+            self.mmap.check_valid()
+            self.mmap.check_writeable()
+        except RValueError, v:
+            raise OperationError(self.space.w_ValueError,
+                                 self.space.wrap(v.message))
+        except RTypeError, v:
+            raise OperationError(self.space.w_TypeError,
+                                 self.space.wrap(v.message))
+
+    def setitem(self, index, char):
+        self.mmap.check_writeable()
+        self.mmap.data[index] = char
+
+    def setslice(self, start, string):
+        self.mmap.setslice(start, string)
+
+    def get_raw_address(self):
+        return self.mmap.data

File pypy/module/mmap/test/test_mmap.py

         m.close()
         f.close()
 
+    def test_buffer_write(self):
+        from mmap import mmap
+        f = open(self.tmpname + "y", "w+")
+        f.write("foobar")
+        f.flush()
+        m = mmap(f.fileno(), 6)
+        m[5] = '?'
+        b = buffer(m)
+        try:
+            b[:3] = "FOO"
+        except TypeError:     # on CPython: "buffer is read-only" :-/
+            skip("on CPython: buffer is read-only")
+        m.close()
+        f.seek(0)
+        got = f.read()
+        assert got == "FOOba?"
+        f.close()
+
     def test_offset(self):
         from mmap import mmap, ALLOCATIONGRANULARITY
         f = open(self.tmpname + "y", "w+")

File rpython/rlib/rmmap.py

 from rpython.rlib import rposix
 from rpython.translator.tool.cbuild import ExternalCompilationInfo
 from rpython.rlib.nonconst import NonConstant
+from rpython.rlib.rarithmetic import intmask
 
 import sys
 import os
         else: # no '\n' found
             eol = self.size
 
-        res = "".join([data[i] for i in range(self.pos, eol)])
+        res = self._getslice(self.pos, eol - self.pos)
         self.pos += len(res)
         return res
 
     def read(self, num=-1):
         self.check_valid()
-
         if num < 0:
             # read all
             eol = self.size
             if eol > self.size:
                 eol = self.size
 
-        res = [self.data[i] for i in range(self.pos, eol)]
-        res = "".join(res)
+        res = self._getslice(self.pos, eol - self.pos)
         self.pos += len(res)
         return res
 
         if not (0 <= where <= self.size):
             raise RValueError("seek out of range")
 
-        self.pos = where
+        self.pos = intmask(where)
 
     def tell(self):
         self.check_valid()
     def write(self, data):
         self.check_valid()
         self.check_writeable()
-
         data_len = len(data)
-        if self.pos + data_len > self.size:
+        start = self.pos
+        if start + data_len > self.size:
             raise RValueError("data out of range")
 
-        internaldata = self.data
-        start = self.pos
-        for i in range(data_len):
-            internaldata[start+i] = data[i]
+        self._setslice(start, data)
         self.pos = start + data_len
 
     def write_byte(self, byte):
     def getptr(self, offset):
         return rffi.ptradd(self.data, offset)
 
+    def getslice(self, start, length):
+        self.check_valid()
+        return self._getslice(start, length)
+
+    def setslice(self, start, data):
+        self.check_valid()
+        self.check_writeable()
+        self._setslice(start, data)
+
+    def _getslice(self, start, length):
+        return rffi.charpsize2str(self.getptr(start), length)
+
+    def _setslice(self, start, newdata):
+        internaldata = self.data
+        for i in range(len(newdata)):
+            internaldata[start+i] = newdata[i]
+
     def flush(self, offset=0, size=0):
         self.check_valid()