Armin Rigo avatar Armin Rigo committed c924be8

Change check_valid() to make its usage explicit at the level of RPython:
users of rmmap.py have to call the check_*() methods explicitly if they
want that behavior. The result is a bit more consistent.

Comments (0)

Files changed (3)

pypy/module/mmap/interp_mmap.py

 from pypy.interpreter.gateway import interp2app, unwrap_spec
 from pypy.interpreter.buffer import RWBuffer
 from rpython.rlib import rmmap, rarithmetic
-from rpython.rlib.rmmap import RValueError, RTypeError
+from rpython.rlib.rmmap import RValueError, RTypeError, RMMapError
 
 if rmmap.HAVE_LARGEFILE_SUPPORT:
     OFF_T = rarithmetic.r_longlong
         self.mmap.close()
 
     def read_byte(self):
+        self.check_valid()
         try:
             return self.space.wrap(self.mmap.read_byte())
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
 
     def readline(self):
+        self.check_valid()
         return self.space.wrap(self.mmap.readline())
 
     @unwrap_spec(num=int)
 
     @unwrap_spec(tofind='bufferstr')
     def find(self, tofind, w_start=None, w_end=None):
+        self.check_valid()
         space = self.space
         if w_start is None:
             start = self.mmap.pos
 
     @unwrap_spec(tofind='bufferstr')
     def rfind(self, tofind, w_start=None, w_end=None):
+        self.check_valid()
         space = self.space
         if w_start is None:
             start = self.mmap.pos
 
     @unwrap_spec(pos=OFF_T, whence=int)
     def seek(self, pos, whence=0):
+        self.check_valid()
         try:
             self.mmap.seek(pos, whence)
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
 
     def tell(self):
+        self.check_valid()
         return self.space.wrap(self.mmap.tell())
 
     def descr_size(self):
+        self.check_valid()
         try:
             return self.space.wrap(self.mmap.file_size())
         except OSError, e:
 
     @unwrap_spec(data='bufferstr')
     def write(self, data):
+        self.check_valid()
         self.check_writeable()
         try:
             self.mmap.write(data)
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
 
     @unwrap_spec(byte=str)
     def write_byte(self, byte):
+        self.check_valid()
+        self.check_writeable()
         try:
             self.mmap.write_byte(byte)
-        except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
-        except RTypeError, v:
-            raise OperationError(self.space.w_TypeError,
-                                 self.space.wrap(v.message))
+        except RMMapError, v:
+            raise mmap_error(self.space, v)
 
     @unwrap_spec(offset=int, size=int)
     def flush(self, offset=0, size=0):
+        self.check_valid()
         try:
             return self.space.wrap(self.mmap.flush(offset, size))
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
         except OSError, e:
             raise mmap_error(self.space, e)
 
     @unwrap_spec(dest=int, src=int, count=int)
     def move(self, dest, src, count):
+        self.check_valid()
+        self.check_writeable()
         try:
             self.mmap.move(dest, src, count)
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
 
     @unwrap_spec(newsize=int)
     def resize(self, newsize):
         return self.space.wrap(self.mmap.size)
 
     def check_valid(self):
-        # XXX the check_xxx() are inconsistent in this file.  Please review!
-        # For example readline() might raise an interp-level RValueError.
         try:
             self.mmap.check_valid()
         except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
+            raise mmap_error(self.space, v)
 
     def check_writeable(self):
         try:
             self.mmap.check_writeable()
-        except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
-        except RTypeError, v:
-            raise OperationError(self.space.w_TypeError,
-                                 self.space.wrap(v.message))
+        except RMMapError, v:
+            raise mmap_error(self.space, v)
 
     def check_resizeable(self):
         try:
             self.mmap.check_resizeable()
-        except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
-        except RTypeError, v:
-            raise OperationError(self.space.w_TypeError,
-                                 self.space.wrap(v.message))
+        except RMMapError, v:
+            raise mmap_error(self.space, v)
 
     def descr_getitem(self, w_index):
         self.check_valid()
                                        offset))
         except OSError, e:
             raise mmap_error(space, e)
-        except RValueError, e:
-            raise OperationError(space.w_ValueError, space.wrap(e.message))
-        except RTypeError, e:
-            raise OperationError(space.w_TypeError, space.wrap(e.message))
+        except RMMapError, e:
+            raise mmap_error(space, e)
         return space.wrap(self)
 
 elif rmmap._MS_WINDOWS:
                                        offset))
         except OSError, e:
             raise mmap_error(space, e)
-        except RValueError, e:
-            raise OperationError(space.w_ValueError, space.wrap(e.message))
-        except RTypeError, e:
-            raise OperationError(space.w_TypeError, space.wrap(e.message))
+        except RMMapError, e:
+            raise mmap_error(space, e)
         return space.wrap(self)
 
 W_MMap.typedef = TypeDef("mmap",
                                                  space.w_EnvironmentError)
 
 def mmap_error(space, e):
-    w_error = space.fromcache(Cache).w_error
-    return wrap_oserror(space, e, w_exception_class=w_error)
+    if isinstance(e, RValueError):
+        return OperationError(space.w_ValueError,
+                              space.wrap(e.message))
+    elif isinstance(e, RTypeError):
+        return OperationError(space.w_TypeError,
+                              space.wrap(e.message))
+    elif isinstance(e, OSError):
+        w_error = space.fromcache(Cache).w_error
+        return wrap_oserror(space, e, w_exception_class=w_error)
+    else:
+        # bogus 'e'?
+        return OperationError(space.w_SystemError, space.wrap(repr(e)))
+mmap_error._dont_inline_ = True
 
 
 class MMapBuffer(RWBuffer):
         return self.mmap.size
 
     def getitem(self, index):
+        self.check_valid()
         return self.mmap.data[index]
 
     def getslice(self, start, stop, step, size):
+        self.check_valid()
         if step == 1:
-            return self.mmap._getslice(start, size)
+            return self.mmap.getslice(start, size)
         else:
             return RWBuffer.getslice(self, start, stop, step, size)
 
-    def check_writeable(self):
+    def setitem(self, index, char):
+        self.check_valid_writeable()
+        self.mmap.data[index] = char
+
+    def setslice(self, start, string):
+        self.check_valid_writeable()
+        self.mmap.setslice(start, string)
+
+    def get_raw_address(self):
+        self.check_valid()
+        return self.mmap.data
+
+    def check_valid(self):
+        try:
+            self.mmap.check_valid()
+        except RValueError, v:
+            raise mmap_error(self.space, v)
+
+    def check_valid_writeable(self):
         try:
             self.mmap.check_valid()
             self.mmap.check_writeable()
-        except RValueError, v:
-            raise OperationError(self.space.w_ValueError,
-                                 self.space.wrap(v.message))
-        except RTypeError, v:
-            raise OperationError(self.space.w_TypeError,
-                                 self.space.wrap(v.message))
-
-    def setitem(self, index, char):
-        self.mmap.check_writeable()
-        self.mmap.data[index] = char
-
-    def setslice(self, start, string):
-        self.mmap.setslice(start, string)
-
-    def get_raw_address(self):
-        return self.mmap.data
+        except RMMapError, v:
+            raise mmap_error(self.space, v)

rpython/rlib/rmmap.py

+"""Interp-level mmap-like object.
+
+Note that all the methods assume that the mmap is valid (or writable, for
+writing methods).  You have to call check_valid() from the higher-level API,
+as well as maybe check_writeable().  In the case of PyPy, this is done from
+pypy/module/mmap/.
+"""
+
 from rpython.rtyper.tool import rffi_platform
 from rpython.rtyper.lltypesystem import rffi, lltype
 from rpython.rlib import rposix
 _64BIT = "64bit" in platform.architecture()[0]
 _CYGWIN = "cygwin" == sys.platform
 
-class RValueError(Exception):
+class RMMapError(Exception):
     def __init__(self, message):
         self.message = message
 
-class RTypeError(Exception):
-    def __init__(self, message):
-        self.message = message
+class RValueError(RMMapError):
+    pass
+
+class RTypeError(RMMapError):
+    pass
 
 includes = ["sys/types.h"]
 if _POSIX:
         self.close()
 
     def read_byte(self):
-        self.check_valid()
-
         if self.pos < self.size:
             value = self.data[self.pos]
             self.pos += 1
             raise RValueError("read byte out of range")
 
     def readline(self):
-        self.check_valid()
-
         data = self.data
         for pos in xrange(self.pos, self.size):
             if data[pos] == '\n':
         else: # no '\n' found
             eol = self.size
 
-        res = self._getslice(self.pos, eol - self.pos)
+        res = self.getslice(self.pos, eol - self.pos)
         self.pos += len(res)
         return res
 
     def read(self, num=-1):
-        self.check_valid()
         if num < 0:
             # read all
             eol = self.size
             if eol > self.size:
                 eol = self.size
 
-        res = self._getslice(self.pos, eol - self.pos)
+        res = self.getslice(self.pos, eol - self.pos)
         self.pos += len(res)
         return res
 
     def find(self, tofind, start, end, reverse=False):
-        self.check_valid()
-
         # XXX naive! how can we reuse the rstr algorithm?
         if start < 0:
             start += self.size
             p += step
 
     def seek(self, pos, whence=0):
-        self.check_valid()
-
         dist = pos
         how = whence
 
         self.pos = intmask(where)
 
     def tell(self):
-        self.check_valid()
         return self.pos
 
     def file_size(self):
-        self.check_valid()
-
         size = self.size
         if _MS_WINDOWS:
             if self.file_handle != INVALID_HANDLE:
         return size
 
     def write(self, data):
-        self.check_valid()
-        self.check_writeable()
         data_len = len(data)
         start = self.pos
         if start + data_len > self.size:
             raise RValueError("data out of range")
 
-        self._setslice(start, data)
+        self.setslice(start, data)
         self.pos = start + data_len
 
     def write_byte(self, byte):
-        self.check_valid()
-
         if len(byte) != 1:
             raise RTypeError("write_byte() argument must be char")
 
-        self.check_writeable()
         if self.pos >= self.size:
             raise RValueError("write byte out of range")
 
         return rffi.ptradd(self.data, offset)
 
     def getslice(self, start, length):
-        self.check_valid()
-        return self._getslice(start, length)
-
-    def setslice(self, start, data):
-        self.check_valid()
-        self.check_writeable()
-        self._setslice(start, data)
-
-    def _getslice(self, start, length):
         return rffi.charpsize2str(self.getptr(start), length)
 
-    def _setslice(self, start, newdata):
+    def setslice(self, start, newdata):
         internaldata = self.data
         for i in range(len(newdata)):
             internaldata[start+i] = newdata[i]
 
     def flush(self, offset=0, size=0):
-        self.check_valid()
-
         if size == 0:
             size = self.size
         if offset < 0 or size < 0 or offset + size > self.size:
         return 0
 
     def move(self, dest, src, count):
-        self.check_valid()
-
-        self.check_writeable()
-
         # check boundings
         if (src < 0 or dest < 0 or count < 0 or
                 src + count > self.size or dest + count > self.size):
         c_memmove(datadest, datasrc, count)
 
     def resize(self, newsize):
-        self.check_valid()
-
-        self.check_resizeable()
-
         if _POSIX:
             if not has_mremap:
                 raise RValueError("mmap: resizing not available--no mremap()")
             raise winerror
 
     def len(self):
-        self.check_valid()
-
         return self.size
 
     def getitem(self, index):
-        self.check_valid()
         # simplified version, for rpython
         if index < 0:
             index += self.size
         return self.data[index]
 
     def setitem(self, index, value):
-        self.check_valid()
-        self.check_writeable()
-
         if len(value) != 1:
             raise RValueError("mmap assignment must be "
                               "single-character string")

rpython/rlib/test/test_rmmap.py

             m = mmap.mmap(no, 1)
             m.close()
             try:
-                m.read(1)
+                m.check_valid()
             except RValueError:
                 pass
             else:
         def func(no):
             m = mmap.mmap(no, 6, access=mmap.ACCESS_READ)
             try:
-                m.write('x')
+                m.check_writeable()
             except RTypeError:
                 pass
             else:
                 assert False
             try:
-                m.resize(7)
+                m.check_resizeable()
             except RTypeError:
                 pass
             else:
         f.write("foobar")
         f.flush()
         m = mmap.mmap(f.fileno(), 6, prot=mmap.PROT_READ)
-        py.test.raises(RTypeError, m.write, "foo")
+        py.test.raises(RTypeError, m.check_writeable)
         m.close()
         f.close()
 
         f.write("foobar")
         f.flush()
         m = mmap.mmap(f.fileno(), 6, prot=~mmap.PROT_WRITE)
-        py.test.raises(RTypeError, m.write_byte, 'a')
-        py.test.raises(RTypeError, m.write, "foo")
+        py.test.raises(RTypeError, m.check_writeable)
+        py.test.raises(RTypeError, m.check_writeable)
         m.close()
         f.close()
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.