Commits

Armin Rigo committed f93cc8e

Give up trying to reuse one of CPython's buffer or memoryview classes.
Write our own simple class.

Comments (0)

Files changed (3)

c/_cffi_backend.c

 # define restore_errno_only   restore_errno
 #endif
 
+#include "minibuffer.h"
+
 #ifdef HAVE_WCHAR_H
 # include "wchar_helper.h"
 #endif
         return NULL;
     }
     /*WRITE(cd->c_data, size)*/
-#if PY_MAJOR_VERSION < 3 && !defined(PyMemoryView_Check)   /* Python 2.6 */
-    return PyBuffer_FromReadWriteMemory(cd->c_data, size);
-#else
-    {
-      Py_buffer view;
-      if (PyBuffer_FillInfo(&view, NULL, cd->c_data, size,
-                            /*readonly=*/0, PyBUF_CONTIG | PyBUF_FORMAT) < 0)
-        return NULL;
-      return PyMemoryView_FromBuffer(&view);
-    }
-#endif
+    return minibuffer_new(cd->c_data, size);
 }
 
 static PyObject *b_get_errno(PyObject *self, PyObject *noarg)
         INITERROR;
     if (PyType_Ready(&CDataIter_Type) < 0)
         INITERROR;
+    if (PyType_Ready(&MiniBuffer_Type) < 0)
+        INITERROR;
 
     v = PyCapsule_New((void *)cffi_exports, "cffi", NULL);
     if (v == NULL || PyModule_AddObject(m, "_C_API", v) < 0)
+
+/* Implementation of a C object with the 'buffer' or 'memoryview'
+ * interface at C-level (as approriate for the version of Python we're
+ * compiling for), but only a minimal but *consistent* part of the
+ * 'buffer' interface at application level.
+ */
+
+typedef struct {
+    PyObject_HEAD
+    char      *mb_data;
+    Py_ssize_t mb_size;
+} MiniBufferObj;
+
+static Py_ssize_t mb_length(MiniBufferObj *self)
+{
+    return self->mb_size;
+}
+
+static PyObject *mb_item(MiniBufferObj *self, Py_ssize_t idx)
+{
+    if (idx < 0 || idx >= self->mb_size ) {
+        PyErr_SetString(PyExc_IndexError, "buffer index out of range");
+        return NULL;
+    }
+    return PyBytes_FromStringAndSize(self->mb_data + idx, 1);
+}
+
+static PyObject *mb_slice(MiniBufferObj *self,
+                          Py_ssize_t left, Py_ssize_t right)
+{
+    Py_ssize_t size = self->mb_size;
+    if (left < 0)     left = 0;
+    if (right > size) right = size;
+    if (left > right) left = right;
+    return PyString_FromStringAndSize(self->mb_data + left, right - left);
+}
+
+static int mb_ass_item(MiniBufferObj *self, Py_ssize_t idx, PyObject *other)
+{
+    if (idx < 0 || idx >= self->mb_size) {
+        PyErr_SetString(PyExc_IndexError,
+                        "buffer assignment index out of range");
+        return -1;
+    }
+    if (PyBytes_Check(other) && PyBytes_GET_SIZE(other) == 1) {
+        self->mb_data[idx] = PyBytes_AS_STRING(other)[0];
+        return 0;
+    }
+    else {
+        PyErr_Format(PyExc_TypeError,
+                     "must assign a "STR_OR_BYTES
+                     " of length 1, not %.200s", Py_TYPE(other)->tp_name);
+        return -1;
+    }
+}
+
+static int mb_ass_slice(MiniBufferObj *self,
+                        Py_ssize_t left, Py_ssize_t right, PyObject *other)
+{
+    const void *buffer;
+    Py_ssize_t buffer_len, count;
+    Py_ssize_t size = self->mb_size;
+
+    if (PyObject_AsReadBuffer(other, &buffer, &buffer_len) < 0)
+        return -1;
+
+    if (left < 0)     left = 0;
+    if (right > size) right = size;
+    if (left > right) left = right;
+
+    count = right - left;
+    if (count != buffer_len) {
+        PyErr_SetString(PyExc_TypeError,
+                        "right operand length must match slice length");
+        return -1;
+    }
+    memcpy(self->mb_data + left, buffer, count);
+    return 0;
+}
+
+static Py_ssize_t mb_getdata(MiniBufferObj *self, Py_ssize_t idx, void **pp)
+{
+    *pp = self->mb_data;
+    return self->mb_size;
+}
+
+static Py_ssize_t mb_getsegcount(MiniBufferObj *self, Py_ssize_t *lenp)
+{
+    if (lenp)
+        *lenp = self->mb_size;
+    return 1;
+}
+
+static int mb_getbuf(MiniBufferObj *self, Py_buffer *view, int flags)
+{
+    return PyBuffer_FillInfo(view, NULL, self->mb_data, self->mb_size,
+                             /*readonly=*/0, PyBUF_CONTIG | PyBUF_FORMAT);
+}
+
+static PySequenceMethods mb_as_sequence = {
+    (lenfunc)mb_length, /*sq_length*/
+    (binaryfunc)0, /*sq_concat*/
+    (ssizeargfunc)0, /*sq_repeat*/
+    (ssizeargfunc)mb_item, /*sq_item*/
+    (ssizessizeargfunc)mb_slice, /*sq_slice*/
+    (ssizeobjargproc)mb_ass_item, /*sq_ass_item*/
+    (ssizessizeobjargproc)mb_ass_slice, /*sq_ass_slice*/
+};
+
+static PyBufferProcs mb_as_buffer = {
+    (readbufferproc)mb_getdata,
+    (writebufferproc)mb_getdata,
+    (segcountproc)mb_getsegcount,
+    (charbufferproc)mb_getdata,
+    (getbufferproc)mb_getbuf,
+    (releasebufferproc)0,
+};
+
+static PyTypeObject MiniBuffer_Type = {
+    PyVarObject_HEAD_INIT(NULL, 0)
+    "_cffi_backend.buffer",
+    sizeof(MiniBufferObj),
+    0,
+    (destructor)PyObject_Del,                   /* tp_dealloc */
+    0,                                          /* tp_print */
+    0,                                          /* tp_getattr */
+    0,                                          /* tp_setattr */
+    0,                                          /* tp_compare */
+    0,                                          /* tp_repr */
+    0,                                          /* tp_as_number */
+    &mb_as_sequence,                            /* tp_as_sequence */
+    0,                                          /* tp_as_mapping */
+    0,                                          /* tp_hash */
+    0,                                          /* tp_call */
+    0,                                          /* tp_str */
+    PyObject_GenericGetAttr,                    /* tp_getattro */
+    0,                                          /* tp_setattro */
+    &mb_as_buffer,                              /* tp_as_buffer */
+    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GETCHARBUFFER |
+        Py_TPFLAGS_HAVE_NEWBUFFER,              /* tp_flags */
+};
+
+static PyObject *minibuffer_new(char *data, Py_ssize_t size)
+{
+    MiniBufferObj *ob = PyObject_New(MiniBufferObj, &MiniBuffer_Type);
+    if (ob != NULL) {
+        ob->mb_data = data;
+        ob->mb_size = size;
+    }
+    return (PyObject *)ob;
+}
     type_or_class = "type"
     mandatory_b_prefix = ''
     mandatory_u_prefix = 'u'
-    readbuf = lambda buf: buf[:]
-    bufchar = lambda x: x
     bytechr = chr
     class U(object):
         def __add__(self, other):
     unichr = chr
     mandatory_b_prefix = 'b'
     mandatory_u_prefix = ''
-    readbuf = lambda buf: buf.tobytes()
-    if sys.version_info < (3, 3):
-        bufchar = lambda x: bytes([ord(x)])
-    else:
-        bufchar = ord
     bytechr = lambda n: bytes([n])
     u = ""
 
     assert (p < s) ^ (p > s)
 
 def test_buffer():
+    import __builtin__
     BShort = new_primitive_type("short")
     s = newp(new_pointer_type(BShort), 100)
     assert sizeof(s) == size_of_ptr()
     assert sizeof(BShort) == 2
-    assert len(readbuf(buffer(s))) == 2
+    assert len(buffer(s)) == 2
     #
     BChar = new_primitive_type("char")
     BCharArray = new_array_type(new_pointer_type(BChar), None)
     c = newp(BCharArray, b"hi there")
+    #
     buf = buffer(c)
-    assert readbuf(buf) == b"hi there\x00"
+    assert str(buf).startswith('<_cffi_backend.buffer object at 0x')
+    # --mb_length--
     assert len(buf) == len(b"hi there\x00")
-    assert buf[0] == bufchar('h')
-    assert buf[2] == bufchar(' ')
-    assert list(buf) == list(map(bufchar, "hi there\x00"))
-    buf[2] = bufchar('-')
-    assert c[2] == b'-'
-    assert readbuf(buf) == b"hi-there\x00"
-    c[2] = b'!'
-    assert buf[2] == bufchar('!')
-    assert readbuf(buf) == b"hi!there\x00"
-    c[2] = b'-'
-    buf[:2] = b'HI'
-    assert string(c) == b'HI-there'
-    if sys.version_info < (2, 7) or sys.version_info >= (3, 3):
-        assert buf[:4:2] == b'H-'
-        if '__pypy__' not in sys.builtin_module_names:
-            # XXX pypy doesn't support the following assignment so far
-            buf[:4:2] = b'XY'
-            assert string(c) == b'XIYthere'
+    # --mb_item--
+    for i in range(-12, 12):
+        try:
+            expected = b"hi there\x00"[i]
+        except IndexError:
+            py.test.raises(IndexError, "buf[i]")
+        else:
+            assert buf[i] == expected
+    # --mb_slice--
+    assert buf[:] == b"hi there\x00"
+    for i in range(-12, 12):
+        assert buf[i:] == b"hi there\x00"[i:]
+        assert buf[:i] == b"hi there\x00"[:i]
+        for j in range(-12, 12):
+            assert buf[i:j] == b"hi there\x00"[i:j]
+    # --misc--
+    assert list(buf) == list(b"hi there\x00")
+    # --mb_as_buffer--
+    py.test.raises(TypeError, __builtin__.buffer, c)
+    bf1 = __builtin__.buffer(buf)
+    assert len(bf1) == len(buf) and bf1[3] == "t"
+    if hasattr(__builtin__, 'memoryview'):      # Python >= 2.7
+        py.test.raises(TypeError, memoryview, c)
+        mv1 = memoryview(buf)
+        assert len(mv1) == len(buf) and mv1[3] == "t"
+    # --mb_ass_item--
+    expected = list(b"hi there\x00")
+    for i in range(-12, 12):
+        try:
+            expected[i] = chr(i & 0xff)
+        except IndexError:
+            py.test.raises(IndexError, "buf[i] = chr(i & 0xff)")
+        else:
+            buf[i] = chr(i & 0xff)
+        assert list(buf) == expected
+    # --mb_ass_slice--
+    buf[:] = b"hi there\x00"
+    assert list(buf) == list(c) == list(b"hi there\x00")
+    py.test.raises(TypeError, 'buf[:] = b"shorter"')
+    py.test.raises(TypeError, 'buf[:] = b"this is much too long!"')
+    buf[4:2] = b""   # no effect, but should work
+    assert buf[:] == b"hi there\x00"
+    expected = list(b"hi there\x00")
+    x = 0
+    for i in range(-12, 12):
+        for j in range(-12, 12):
+            start = i if i >= 0 else i + len(buf)
+            stop  = j if j >= 0 else j + len(buf)
+            start = max(0, min(len(buf), start))
+            stop  = max(0, min(len(buf), stop))
+            sample = chr(x & 0xff) * (stop - start)
+            x += 1
+            buf[i:j] = sample
+            expected[i:j] = sample
+            assert list(buf) == expected
 
 def test_getcname():
     BUChar = new_primitive_type("unsigned char")