Commits

Lenard Lindstrom committed 8c3c3d9

add get_buffer and release_buffer newbuffer.Py_buffer methods

Comments (0)

Files changed (2)

 #include <Python.h>
 #include "pgcompat.h"
 
-static PyObject *BufferSubtype_New(PyTypeObject *, Py_buffer *);
-static PyObject *Py_buffer_New(Py_buffer *);
+/* Buffer_TYPE states:
+ *
+ * Buffer_Type properties are read-only when the BUFOBJ_FILLED flag is set.
+ *
+ * The BUFOBJ_MEMFREE flag is set when the BufferObject object allocates
+ * the memory for the Py_buffer. It is now responsible for freeing the
+ * memory.
+ *
+ * The BUFOBJ_MUTABLE flag can only be changed by a tp_init slot function
+ * call.
+ *
+ * A Buffer_Type object will only release a Py_buffer if both the
+ * BUFOBJ_MUTABLE and BUFOBJ_FILLED flags are set. The PyBuffer_Release
+ * function is only called by a call of the Buffer_Type release_buffer
+ * method, a call to tp_init, or during garbage collection.
+ *
+ * If only the BUFOBJ_FILLED flag is set, then field values cannot be changed.
+ *
+ * If the view_p BufferObject field is NULL, then the BUFOBJ_MUTABLE flag
+ * must be set. Also, the BUFOBJ_MEMFREE must not be set.
+ *
+ * The view_p->obj field should always be valid. It either points to
+ * an object or is NULL. 
+ */
+#define BUFOBJ_FILLED 0x0001
+#define BUFOBJ_MEMFREE 0x0002
+#define BUFOBJ_MUTABLE 0x0004
+
+static PyObject *BufferSubtype_New(PyTypeObject *, Py_buffer *, int, int);
+static PyObject *Buffer_New(Py_buffer *, int, int);
 
 typedef struct buffer_object_t {
     PyObject_HEAD
     Py_buffer *view_p;
+    int flags;
 } BufferObject;
 
 static int
 }
 
 static int
-check_view(BufferObject *op, const char *name)
+check_view_get(BufferObject *op, const char *name)
 {
     if (!op->view_p) {
         PyErr_Format(PyExc_AttributeError,
-                     "property %400s is undefined for a NULL view reference",
+                     "property %400s is undefined for an unallocated view",
+                     name);
+        return -1;
+    }
+    return 0;
+}
+
+static int
+check_view_set(BufferObject *op, const char *name)
+{
+    if (op->view_p) {
+        if (op->flags & BUFOBJ_FILLED) {
+            PyErr_Format(PyExc_AttributeError,
+                         "property %400s is read-only", name);
+            return -1;
+        }
+    }
+    else {
+        PyErr_Format(PyExc_AttributeError,
+                     "property %400s is undefined for an unallocated view",
                      name);
         return -1;
     }
     return 0;
 }
 
+/* Restore a BufferObject instance to the equivalent of initializing
+ * with a null Py_buffer address.
+ */
+static void
+Buffer_Reset(BufferObject *bp) {
+    Py_buffer *view_p;
+    int flags;
+
+    if (!bp) {
+        return;
+    }
+    view_p = bp->view_p;
+    flags = bp->flags;
+    bp->view_p = 0;
+    bp->flags = BUFOBJ_MUTABLE;
+    if (flags & BUFOBJ_MUTABLE) {
+        if (flags & BUFOBJ_FILLED) {
+            PyBuffer_Release(view_p);
+        }
+        else if (view_p && view_p->obj) /* Conditional && */ {
+            Py_DECREF(view_p->obj);
+        }
+        if (flags & BUFOBJ_MEMFREE) {
+            PyMem_Free(view_p);
+        }
+    }
+}
+
 static PyObject *
 buffer_new(PyTypeObject *subtype, PyObject *args, PyObject *kwds)
 {
-    PyObject *arg = 0;
+    BufferObject *bp = (BufferObject *)subtype->tp_alloc(subtype, 0);
+
+    if (bp) {
+        bp->view_p = 0;
+        bp->flags = BUFOBJ_MUTABLE;
+    }
+    return (PyObject *)bp;
+}
+
+static int
+buffer_init(BufferObject *self, PyObject *args, PyObject *kwds)
+{
+    PyObject *py_address = 0;
+    int filled = 0;
+    int preserve = 0;
     Py_buffer *view_p = 0;
-    char *keywords[] = {"buffer_address", 0};
+    char *keywords[] = {"buffer_address", "filled", "preserve", 0};
 
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O:Py_buffer", keywords, &arg)) {
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|Oii:Py_buffer", keywords,
+                                     &py_address, &filled, &preserve)) {
+        return -1;
+    }
+    if (py_address == Py_None) {
+        py_address = 0;
+    }
+    if (py_address) {
+        if (INT_CHECK(py_address)) {
+            view_p = (Py_buffer *)PyLong_AsVoidPtr(py_address);
+            if (PyErr_Occurred()) {
+                return -1;
+            }
+        }
+        else {
+            PyErr_Format(PyExc_TypeError,
+                         "argument %400s must be an integer, not '%400s'",
+                         keywords[0], Py_TYPE(py_address)->tp_name);
+            return -1;
+        }
+    }
+    if (!view_p) {
+        if (filled) {
+            PyErr_Format(PyExc_ValueError,
+                         "argument %400s cannot be True for a NULL %400s",
+                         keywords[1], keywords[0]);
+            return -1;
+        }
+        else if (preserve) {
+            PyErr_Format(PyExc_ValueError,
+                         "argument %400s cannot be True for a NULL %400s",
+                         keywords[2], keywords[0]);
+            return -1;
+        }
+    }
+    Buffer_Reset(self);
+    self->view_p = view_p;
+    if (preserve) {
+        /* remove mutable flag */
+        self->flags &= ~BUFOBJ_MUTABLE;
+    }
+    if (filled) {
+        /* add filled flag */
+        self->flags |= BUFOBJ_FILLED;
+    }
+    else if (view_p) {
+        view_p->obj = 0;
+        view_p->buf = 0;
+        view_p->len = 0;
+        view_p->itemsize = 0;
+        view_p->readonly = 1;
+        view_p->format = 0;
+        view_p->ndim = 0;
+        view_p->shape = 0;
+        view_p->strides = 0;
+        view_p->suboffsets = 0;
+        view_p->internal = 0;
+    }
+    return 0;
+}
+
+static void
+buffer_dealloc(BufferObject *self)
+{
+    PyObject_GC_UnTrack(self);
+    Buffer_Reset(self);
+    Py_TYPE(self)->tp_free(self);
+}
+
+static int
+buffer_traverse(BufferObject *self, visitproc visit, void *arg)
+{
+    if (self->view_p && self->view_p->obj) /* Conditional && */ {
+        Py_VISIT(self->view_p->obj);
+    }
+    return 0;
+}
+
+static PyObject *
+buffer_get_buffer(BufferObject *self, PyObject *args, PyObject *kwds)
+{
+    PyObject *obj;
+    int flags = PyBUF_SIMPLE;
+    int bufobj_flags = self->flags;
+    char *keywords[] = {"obj", "flags", 0};
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O|i", keywords, &obj, &flags)) {
         return 0;
     }
-    if (!arg) {
-        arg = Py_None;
+    if (bufobj_flags & BUFOBJ_FILLED) {
+        PyErr_SetString(PyExc_ValueError,
+                        "The Py_buffer struct is already filled in");
+        return 0;
     }
-    if (INT_CHECK(arg)) {
-        view_p = (Py_buffer *)PyLong_AsVoidPtr(arg);
-        if (PyErr_Occurred()) {
-            return 0;
+    self->flags = BUFOBJ_MUTABLE & bufobj_flags;
+    if (!self->view_p) {
+        self->view_p = PyMem_New(Py_buffer, 1);
+        if (!self->view_p) {
+            return PyErr_NoMemory();
+        }
+        bufobj_flags |= BUFOBJ_MEMFREE;
+    }
+    if (PyObject_GetBuffer(obj, self->view_p, flags)) {
+        if (bufobj_flags & BUFOBJ_MEMFREE) {
+            PyMem_Free(self->view_p);
+            self->view_p = 0;
+        }
+        return 0;
+    }
+    self->flags |= (bufobj_flags & BUFOBJ_MEMFREE) | BUFOBJ_FILLED;
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+buffer_release_buffer(BufferObject *self)
+{
+    int flags = self->flags;
+    Py_buffer *view_p = self->view_p;
+
+    if ((flags & BUFOBJ_FILLED) && (flags & BUFOBJ_MUTABLE)) {
+        self->flags = BUFOBJ_FILLED;  /* Guard against reentrant calls */
+        PyBuffer_Release(view_p);
+        self->flags = BUFOBJ_MUTABLE;
+        if (flags & BUFOBJ_MEMFREE) {
+            self->view_p = 0;
+            PyMem_Free(view_p);
+        }
+        else {
+            view_p->obj = 0;
+            view_p->buf = 0;
+            view_p->len = 0;
+            view_p->itemsize = 0;
+            view_p->readonly = 1;
+            view_p->format = 0;
+            view_p->ndim = 0;
+            view_p->shape = 0;
+            view_p->strides = 0;
+            view_p->suboffsets = 0;
+            view_p->internal = 0;
         }
     }
-    else if (arg != Py_None) {
-        PyErr_Format(PyExc_TypeError,
-                     "%400s() argument must be a Python integer, not '%400s'",
-                     Py_TYPE(subtype)->tp_name, Py_TYPE(arg)->tp_name);
-        return 0;
-    }
-    return BufferSubtype_New(subtype, view_p);
+    Py_RETURN_NONE;
 }
 
+static struct PyMethodDef buffer_methods[] = {
+    {"get_buffer", (PyCFunction)buffer_get_buffer, METH_VARARGS | METH_KEYWORDS,
+     "fill in Py_buffer fields with a PyObject_GetBuffer call"},
+    {"release_buffer", (PyCFunction)buffer_release_buffer, METH_NOARGS,
+     "release the Py_buffer with a PyBuffer_Release call"},
+    {0, 0, 0, 0}
+};
+
 static PyObject *
 buffer_get_obj(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->obj) {
 static int
 buffer_set_obj(BufferObject *self, PyObject *value, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    PyObject *tmp;
+
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (check_value(value, (const char *)closure)) {
         return -1;
     }
-    Py_XDECREF(self->view_p->obj);
+    tmp = self->view_p->obj;
     if (value != Py_None) {
         Py_INCREF(value);
         self->view_p->obj = value;
     else {
         self->view_p->obj = 0;
     }
+    if (tmp) {
+        Py_DECREF(tmp);
+    }
     return 0;
 }
 
 static PyObject *
 buffer_get_buf(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->buf) {
 static int
 buffer_set_buf(BufferObject *self, PyObject *value, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     return set_void_ptr(&self->view_p->buf, value, (const char *)closure);
 static PyObject *
 buffer_get_len(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     return PyLong_FromSsize_t(self->view_p->len);
 static int
 buffer_set_len(BufferObject *self, PyObject *value, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     return set_py_ssize_t(&self->view_p->len, value, (const char *)closure);
 static PyObject *
 buffer_get_readonly(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     return PyBool_FromLong((long)self->view_p->readonly);
 {
     int readonly = 1;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (check_value(value, (const char *)closure)) {
 static PyObject *
 buffer_get_format(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->format) {
 {
     void *vp = 0;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (set_void_ptr(&vp, value, (const char *)closure)) {
 static PyObject *
 buffer_get_ndim(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     return PyLong_FromLong(self->view_p->ndim);
 {
     Py_ssize_t ndim = 0;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (set_py_ssize_t(&ndim, value, (const char *)closure)) {
 static PyObject *
 buffer_get_shape(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->shape) {
 {
     void *vp;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (set_void_ptr(&vp, value, (const char *)closure)) {
 static PyObject *
 buffer_get_strides(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->strides) {
 {
     void *vp;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (set_void_ptr(&vp, value, (const char *)closure)) {
 static PyObject *
 buffer_get_suboffsets(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->suboffsets) {
 {
     void *vp;
 
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     if (set_void_ptr(&vp, value, (const char *)closure)) {
 static PyObject *
 buffer_get_itemsize(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     return PyLong_FromSsize_t(self->view_p->itemsize);
 static int
 buffer_set_itemsize(BufferObject *self, PyObject *value, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     return set_py_ssize_t(&self->view_p->itemsize, value, (const char *)closure);
 static PyObject *
 buffer_get_internal(BufferObject *self, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_get(self, (const char *)closure)) {
         return 0;
     }
     if (!self->view_p->internal) {
 static int
 buffer_set_internal(BufferObject *self, PyObject *value, void *closure)
 {
-    if (check_view(self, (const char *)closure)) {
+    if (check_view_set(self, (const char *)closure)) {
         return -1;
     }
     return set_void_ptr(&self->view_p->internal, value, (const char *)closure);
 };
 
 #define BUFFER_TYPE_FULLNAME "newbuffer.Py_buffer"
-#define BUFFER_TPFLAGS (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE)
+#define BUFFER_TPFLAGS (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | \
+                        Py_TPFLAGS_HAVE_GC)
 
 static PyTypeObject Py_buffer_Type =
 {
     BUFFER_TYPE_FULLNAME,       /* tp_name */
     sizeof (BufferObject),      /* tp_basicsize */
     0,                          /* tp_itemsize */
-    0,                          /* tp_dealloc */
+    (destructor)buffer_dealloc, /* tp_dealloc */
     0,                          /* tp_print */
     0,                          /* tp_getattr */
     0,                          /* tp_setattr */
     0,                          /* tp_as_buffer */
     BUFFER_TPFLAGS,             /* tp_flags */
     "Python level Py_buffer struct wrapper\n",
-    0,                          /* tp_traverse */
+    (traverseproc)buffer_traverse,  /* tp_traverse */
     0,                          /* tp_clear */
     0,                          /* tp_richcompare */
     0,                          /* tp_weaklistoffset */
     0,                          /* tp_iter */
     0,                          /* tp_iternext */
-    0,                          /* tp_methods */
+    buffer_methods,             /* tp_methods */
     0,                          /* tp_members */
     buffer_getset,              /* tp_getset */
     0,                          /* tp_base */
     0,                          /* tp_descr_get */
     0,                          /* tp_descr_set */
     0,                          /* tp_dictoffset */
-    0,                          /* tp_init */
+    (initproc)buffer_init,      /* tp_init */
     PyType_GenericAlloc,        /* tp_alloc */
     buffer_new,                 /* tp_new */
-    PyObject_Del,               /* tp_free */
+    PyObject_GC_Del,            /* tp_free */
 };
 
 static PyObject *
-BufferSubtype_New(PyTypeObject *subtype, Py_buffer *view_p)
+BufferSubtype_New(PyTypeObject *subtype,
+                  Py_buffer *view_p,
+                  int filled,
+                  int preserve)
 {
-    PyObject *op = Py_buffer_Type.tp_alloc(subtype, 0);
-    if (!op) {
+    BufferObject *bp = (BufferObject *)Py_buffer_Type.tp_alloc(subtype, 0);
+    if (!bp) {
         return 0;
     }
-    ((BufferObject *)op)->view_p = view_p;
-    return op;
+    bp->view_p = view_p;
+    bp->flags = 0;
+    if (bp->view_p) {
+        if (filled) {
+            bp->flags |= BUFOBJ_FILLED;
+        }
+        else {
+            bp->view_p->obj = 0;
+        }
+        if (!preserve) {
+            bp->flags |= BUFOBJ_MUTABLE;
+        }
+    }
+    else {
+        bp->flags = BUFOBJ_MUTABLE;
+    }
+    return (PyObject *)bp;
 }
 
 static PyObject *
-Py_buffer_New(Py_buffer *view_p)
+Buffer_New(Py_buffer *view_p, int filled, int preserve)
 {
-    return BufferSubtype_New(&Py_buffer_Type, view_p);
+    return BufferSubtype_New(&Py_buffer_Type, view_p, filled, preserve);
 }
 
 static PyObject *
 static int
 mixin_getbuffer(PyObject *self, Py_buffer *view_p, int flags)
 {
-    BufferObject *py_view = (BufferObject *)Py_buffer_New(view_p);
+    PyObject *py_view = Buffer_New(view_p, 0, 1);
     PyObject *py_rval = 0;
     int rval = -1;
 
     if (py_view) {
         view_p->obj = 0;
-        py_rval = PyObject_CallMethod(self, "_get_buffer", "Oi",
-                                      (PyObject *)py_view, flags);
-        py_view->view_p = 0;
+        py_rval = PyObject_CallMethod(self, "_get_buffer", "(Oi)",
+                                      py_view, flags);
+        Buffer_Reset((BufferObject *)py_view);
         Py_DECREF(py_view);
         if (py_rval == Py_None) {
             rval = 0;
 static void
 mixin_releasebuffer(PyObject *self, Py_buffer *view_p)
 {
-    BufferObject *py_view = (BufferObject *)Py_buffer_New(view_p);
+    PyObject *py_view = Buffer_New(view_p, 1, 1);
     PyObject *py_rval = 0;
 
     if (py_view) {
-        py_rval = PyObject_CallMethod(self, "_release_buffer", "O",
-                                      (PyObject *)py_view);
+        py_rval = PyObject_CallMethod(self, "_release_buffer", "(O)", py_view);
         if (py_rval) {
             Py_DECREF(py_rval);
         }
         else {
             PyErr_Clear();
         }
-        py_view->view_p = 0;
+        Buffer_Reset((BufferObject *)py_view);
         Py_DECREF(py_view);
     }
     else {
 MODINIT_DEFINE(newbuffer)
 {
     PyObject *module;
-    PyObject *obj;
 
 #if PY3
     static struct PyModuleDef _module = {

newbuffer_test.py

                        PyBUF_RECORDS_RO, PyBUF_FULL, PyBUF_FULL_RO,
                        PyBUF_CONTIG, PyBUF_CONTIG_RO)
 from ctypes import *
+import gc
 
 class Py_bufferTest(unittest.TestCase):
     """Py_buffer type verification"""
+
+    class Exporter(BufferMixin):
+        raw = 'Some stuff.'.encode('latin_1')
+        content = create_string_buffer(raw)
+        shape = (c_ssize_t * 1)(len(raw))
+        strides = (c_ssize_t * 1)(1)
+        format = create_string_buffer('B')
+        def __init__(self):
+            self.state = "initial"
+            self.flags = None
+        def _get_buffer(self, view, flags):
+            self.state = "get"
+            self.flags = flags
+            if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE:
+                raise BufferError("buffer is read-only.")
+            view.obj = self
+            view.buf = addressof(self.content)
+            view.readonly = True
+            view.len = len(self.raw)
+            if flags == PyBUF_SIMPLE:
+                view.itemsize = view.len
+            else:
+                view.itemsize = 1
+            if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
+                view.format = addressof(self.format)
+            else:
+                view.format = None
+            if (flags & PyBUF_ND) == PyBUF_ND:
+                view.ndim = 1
+                view.shape = addressof(self.shape)
+            else:
+                view.shape = None
+            if (flags & PyBUF_STRIDES) == PyBUF_STRIDES:
+                view.strides = addressof(self.strides)
+            else:
+                view.strides = None
+            view.suboffsets = None
+        def _release_buffer(self, view):
+            self.state = "release"
+        def __len__(self):
+            return len(self.raw)
+
+    # Allocation Py_buffer memory globally so it is not garbage
+    # collected until after all the Py_buffer objects are freed.
+    # Otherwise one may be faced with view_allocation being garbage
+    # collected before the Py_buffer object that references it, leading
+    # to a segment fault.
+    view_allocation = create_string_buffer(PyBUFFER_SIZEOF)
+    view_addr = addressof(view_allocation)
+
+    def __del__(self):
+        # Make sure any remaining Py_buffer instances are garbage collected
+        # before the self.view_allocation memory is freed. This step is
+        # unnecessary for reference counting CPython, but does not hurt either.
+        gc.collect()
+
     def test_null(self):
-        """Verify properties are disabled with a NULL Py_buffer pointer"""
+        """verify properties are disabled with a NULL Py_buffer pointer"""
         b = Py_buffer()
         self.assertFalse(b)
         self.assertRaises(AttributeError, getattr, b, 'obj')
         self.assertRaises(AttributeError, setattr, b, 'itemsize', 0)
         self.assertRaises(AttributeError, setattr, b, 'internal', None)
 
+        b = Py_buffer(None)
+        self.assertFalse(b)
+
+        b = Py_buffer(0)
+        self.assertFalse(b)
+
+        self.assertRaises(ValueError, Py_buffer, None, True)
+        self.assertRaises(ValueError, Py_buffer, None, False, True)
+        self.assertRaises(ValueError, Py_buffer, None, True, True)
+        self.assertRaises(ValueError, Py_buffer, 0, True)
+        self.assertRaises(ValueError, Py_buffer, 0, False, True)
+        self.assertRaises(ValueError, Py_buffer, 0, True, True)
+
     def test_not_null(self):
-        """Verify the properties when Py_buffer memory is allocated"""
+        """verify the properties when Py_buffer memory is allocated"""
         import weakref
         import gc
 
             pass
 
         self.assertTrue(PyBUFFER_SIZEOF > 0)
-        allocation = create_string_buffer(PyBUFFER_SIZEOF)
-        b = Py_buffer(addressof(allocation))
+        memset(self.view_addr, 0xdb, PyBUFFER_SIZEOF)
+        b = Py_buffer(self.view_addr)
         self.assertTrue(b)
-        b.obj = None
-        b.buf = None
-        b.len = 0
-        b.readonly = False
-        b.format = None
-        b.ndim = 0
-        b.shape = None
-        b.strides = None
-        b.suboffsets = None
-        b.itemsize = 0
-        b.internal = None
         self.assertTrue(b.obj is None)
         self.assertTrue(b.buf is None)
         self.assertEqual(b.len, 0)
-        self.assertFalse(b.readonly)
+        self.assertTrue(b.readonly)
         self.assertTrue(b.format is None)
         self.assertEqual(b.ndim, 0)
         self.assertTrue(b.shape is None)
         self.assertRaises(AttributeError, getattr, b, 'wrong')
 
     def test_property_del(self):
-        """Verify Py_buffer properties can not be deleted"""
-        allocation = create_string_buffer(PyBUFFER_SIZEOF)
-        b = Py_buffer(addressof(allocation))
+        """verify Py_buffer properties can not be deleted"""
+        b = Py_buffer(self.view_addr)
         self.assertRaises(AttributeError, delattr, b, 'obj')
         b.obj = []
         try:
         self.assertRaises(AttributeError, delattr, b, 'itemsize')
         self.assertRaises(AttributeError, delattr, b, 'internal')
 
+    def test_get_buffer(self):
+        """verify Py_buffer type get_buffer method"""
+        b = Py_buffer(self.view_addr)
+        self.assertTrue(b.obj is None)
+        b.len = 99
+        ba = bytearray([1, 10, 100, 0, 86, 12])
+        b.get_buffer(ba, (PyBUF_RECORDS))
+        self.assertTrue(b)
+        self.assertTrue(b.obj is ba)
+        self.assertEqual(cast(b.format, c_char_p).value, 'B'.encode('latin_1'))
+        self.assertFalse(b.readonly)
+        self.assertEqual(b.len, len(ba))
+        self.assertEqual(b.ndim, 1)
+        self.assertEqual(b.itemsize, sizeof(c_byte))
+        self.assertEqual(cast(b.shape, POINTER(c_ssize_t))[0], len(ba))
+        self.assertEqual(cast(b.strides, POINTER(c_ssize_t))[0], sizeof(c_byte))
+        self.assertEqual(string_at(b.buf, b.len), ba)
+        self.assertRaises(AttributeError, setattr, b, 'len', 1)
+
+        b = Py_buffer(self.view_addr)
+        e = self.Exporter()
+        self.assertTrue(e.state, "initial")
+        self.assertRaises(BufferError, b.get_buffer, e, PyBUF_WRITABLE)
+        b.get_buffer(e, PyBUF_RECORDS_RO)
+        self.assertTrue(e.state, "get")
+        self.assertTrue(e.flags, PyBUF_RECORDS_RO)
+        self.assertTrue(b)
+        self.assertTrue(b.obj is e)
+        self.assertEqual(cast(b.format, c_char_p).value, 'B'.encode('latin_1'))
+        self.assertTrue(b.readonly)
+        self.assertEqual(b.len, len(e))
+        self.assertEqual(b.ndim, 1)
+        self.assertEqual(b.itemsize, sizeof(c_byte))
+        self.assertEqual(cast(b.shape, POINTER(c_ssize_t))[0], len(e))
+        self.assertEqual(cast(b.strides, POINTER(c_ssize_t))[0], sizeof(c_byte))
+        self.assertEqual(string_at(b.buf, b.len), e.raw)
+
+        b = Py_buffer()
+        self.assertFalse(b)
+        self.assertRaises(AttributeError, getattr, b, 'obj')
+        self.assertRaises(AttributeError, getattr, b, 'len')
+        iba = '1234567890'.encode('latin_1')
+        b.get_buffer(iba, PyBUF_RECORDS_RO)
+        self.assertTrue(b)
+        self.assertTrue(b.obj is iba)
+        self.assertEqual(cast(b.format, c_char_p).value, 'B'.encode('latin_1'))
+        self.assertTrue(b.readonly)
+        self.assertEqual(b.len, len(iba))
+        self.assertEqual(b.ndim, 1)
+        self.assertEqual(b.itemsize, sizeof(c_byte))
+        self.assertEqual(cast(b.shape, POINTER(c_ssize_t))[0], len(iba))
+        self.assertEqual(cast(b.strides, POINTER(c_ssize_t))[0], sizeof(c_byte))
+        self.assertEqual(string_at(b.buf, b.len), iba)
+
+        b = Py_buffer(self.view_addr, preserve=1)
+        iba = 'abcde'.encode('latin_1')
+        b.get_buffer(iba, PyBUF_STRIDES)
+        b2 = Py_buffer(self.view_addr, filled=1, preserve=1)
+        b3 = Py_buffer(self.view_addr, filled=1)
+        self.assertTrue(b2)
+        self.assertTrue(b2.obj is iba)
+        self.assertTrue(b3)
+        self.assertTrue(b3.obj is iba)
+        b2 = None
+        gc.collect()
+        self.assertTrue(b3.obj is iba)
+        b3 = None
+        gc.collect()
+        self.assertTrue(b.obj is None)
+
+    def test_release_buffer(self):
+        """verify Py_buffer type release_buffer method"""
+        e = self.Exporter()
+
+        b = Py_buffer()
+        b.release_buffer()
+        self.assertFalse(b)
+        self.assertRaises(AttributeError, delattr, b, 'obj')
+        self.assertRaises(AttributeError, delattr, b, 'buf')
+        b.get_buffer(e, PyBUF_ND)
+        self.assertTrue(b.obj is e)
+        b.release_buffer()
+        self.assertEqual(e.state, "release")
+        self.assertFalse(b)
+        self.assertRaises(AttributeError, getattr, b, 'obj')
+
+        e.__init__()
+        self.assertEqual(e.state, "initial")
+        b = Py_buffer(self.view_addr)
+        b.release_buffer()
+        self.assertTrue(b)
+        self.assertTrue(b.obj is None)
+        self.assertTrue(b.buf is None)
+        self.assertEqual(b.len, 0)
+        self.assertTrue(b.readonly)
+        self.assertTrue(b.format is None)
+        self.assertEqual(b.ndim, 0)
+        self.assertTrue(b.shape is None)
+        self.assertTrue(b.strides is None)
+        self.assertTrue(b.suboffsets is None)
+        self.assertEqual(b.itemsize, 0)
+        self.assertTrue(b.internal is None)
+        b.get_buffer(e, PyBUF_ND)
+        self.assertEqual(e.state, "get")
+        self.assertTrue(b.obj is e)
+        b.release_buffer()
+        self.assertEqual(e.state, "release")
+        self.assertTrue(b.obj is None)
+
+    def test___init__(self):
+        """verify the __init__ function can be called more than one"""
+        e = self.Exporter()
+
+        b = Py_buffer()
+        b.get_buffer(e, PyBUF_ND)
+        self.assertEqual(e.state, "get")
+        self.assertTrue(b.obj is e)
+        b.__init__(self.view_addr, preserve=True)
+        self.assertEqual(e.state, "release")
+        self.assertTrue(b.obj is None)
+        e.__init__()
+        self.assertEqual(e.state, "initial")
+        b.get_buffer(e, PyBUF_ND)
+        b.__init__(self.view_addr, filled=True)
+        self.assertEqual(e.state, "get")
+        self.assertTrue(b.obj is e)
+        b.__init__(self.view_addr)
+        self.assertEqual(e.state, "release")
+        self.assertTrue(b.obj is None)
+        b.__init__()
+        self.assertFalse(b)
+
+    def test_fillfields(self):
+        """verify the fill_fields method"""
+        self.fail()
+
+    def test_gc(self):
+        """verify object cleanup at garbage collection time"""
+        e = self.Exporter()
+
+        b = Py_buffer()
+        b.get_buffer(e, PyBUF_ND)
+        self.assertEqual(e.state, "get")
+        b = None
+        gc.collect()
+        self.assertEqual(e.state, "release")
+
+        b = Py_buffer(self.view_addr, preserve=1)
+        b.get_buffer(e, PyBUF_ND | PyBUF_FORMAT)
+        self.assertEqual(e.state, "get")
+        self.assertEqual(e.flags, PyBUF_ND | PyBUF_FORMAT)
+        b = None
+        gc.collect()
+        self.assertEqual(e.state, "get")
+        b = Py_buffer(self.view_addr, filled=1)
+        try:
+            self.assertEqual(e.state, "get")
+        finally:
+            b.release_buffer()
+        self.assertEqual(e.state, "release")
+
 class BufferMixinGetBufferTest(unittest.TestCase):
     """BufferMixin._get_buffer verification"""
     class B(BufferMixin):
             raise BufferError()
 
     def test_is_called(self):
-        """Verify the _get_buffer method is properly called"""
+        """verify the _get_buffer method is properly called"""
         b = self.B()
+        m = Py_buffer()
         self.assertFalse(b.is_called)
-        self.assertRaises(BufferError, memoryview, b)
+        self.assertRaises(BufferError, m.get_buffer, b, PyBUF_RECORDS)
         self.assertTrue(b.is_called)
         self.assertTrue(isinstance(b.view, Py_buffer))
         self.assertTrue(b.is_view_alive)
         self.assertTrue(b.is_view_obj_null)
         self.assertFalse(b.view)
         self.assertTrue(isinstance(b.flags, int))
+        self.assertEqual(b.flags, PyBUF_RECORDS)
 
     def test_wrong_return_type(self):
-        """Verify exception raised for wrong return value"""
+        """verify exception raised for wrong return value"""
         class B(BufferMixin):
             def _get_buffer(self, view, flags):
                 return 0
-        self.assertRaises(ValueError, memoryview, B())
+        m = Py_buffer()
+        self.assertRaises(ValueError, m.get_buffer, B())
 
     def test_default_method(self):
-        """Verify the default _get_buffer method is abstract"""
+        """verify the default _get_buffer method is abstract"""
+        m = Py_buffer()
         self.assertTrue(hasattr(BufferMixin, '_get_buffer'))
-        self.assertRaises(NotImplementedError, memoryview, BufferMixin())
+        self.assertRaises(NotImplementedError, m.get_buffer, BufferMixin())
 
 class BufferMixinReleaseBufferTest(unittest.TestCase):
     """BufferMixin._release_buffer verification"""
     def test_default_method(self):
-        """Verify the default _release_buffer method exists"""
+        """verify the default _release_buffer method exists"""
         self.assertTrue(hasattr(BufferMixin, '_release_buffer'))
         # It just returns without doing anything
         b = BufferMixin()
             self.is_view_obj_self = view.obj is self
 
     def test_is_called(self):
-        """Verify the _release_buffer method actually gets called"""
+        """verify the _release_buffer method actually gets called"""
         import gc
 
         b = self.B()
         self.assertFalse(b.is_called)
-        m = memoryview(b)
+        m = Py_buffer()
+        m.get_buffer(b)
         self.assertFalse(b.is_called)
-        m = None
-        gc.collect()
+        m.release_buffer()
         self.assertTrue(b.is_called)
         self.assertTrue(isinstance(b.view, Py_buffer))
         self.assertFalse(b.view)
         self.assertTrue(b.is_view_obj_self)
 
 class BufferMixinTest(unittest.TestCase):
-    """BufferMixin._release_buffer verification"""
+    """overall BufferMixin verification"""
     class B(BufferMixin):
         width = 3
         height = 8
         shape = c_shape_arr_t(width, height)
         strides = c_shape_arr_t(height * itemsize, itemsize)
         format = create_string_buffer("=I".encode('ascii'))
+        length = sizeof(c_int32) * shape[0] * shape[1]
         c_int32_arr_t = c_int32 * (width * height)
         def __init__(self):
             nitems = self.width * self.height
             self.allocation = self.c_int32_arr_t(*range(1, nitems + 1))
+            self.released = False
         def _get_buffer(self, view, flags):
             view.buf = addressof(self.allocation)
-            view.len = sizeof(c_int32) * self.shape[0] * self.shape[1]
+            view.len = self.length
             view.readonly = False
             view.format = addressof(self.format)
             view.ndim = 2
             view.itemsize = self.itemsize
             view.internal = None
             view.obj = self
+        def _release_buffer(self, view):
+            view.release_buffer()
+            self.released = True
 
     def test_operation(self):
-        """check that the pieces come together"""
+        """verify the buffer info given by _get_buffer() reaches the consumer"""
         b = self.B()
-        m = memoryview(b)
+        m = Py_buffer()
+        m.get_buffer(b, PyBUF_RECORDS_RO)
+        self.assertTrue(m.obj is b)
+        self.assertEqual(m.buf, addressof(b.allocation))
+        self.assertEqual(m.len, b.length)
+        self.assertFalse(m.readonly)
+        self.assertEqual(string_at(m.format).encode('latin_1'),
+                         b.format.value)
         self.assertEqual(m.ndim, 2)
-        self.assertEqual(m.format.encode('ascii'), b.format.value)
-        self.assertEqual(m.shape, (b.width, b.height))
-        self.assertEqual(m.strides, (b.height * b.itemsize, b.itemsize))
-        self.assertFalse(m.readonly)
+        self.assertEqual(cast(m.shape, POINTER(c_ssize_t))[0:2],
+                         [b.width, b.height])
+        self.assertEqual(cast(m.strides, POINTER(c_ssize_t))[0:2],
+                         [b.height * b.itemsize, b.itemsize])
+        self.assertTrue(m.suboffsets is None)
+        self.assertEqual(m.itemsize, b.itemsize)
+        self.assertTrue(m.internal is None)
+        m.release_buffer()
+        self.assertTrue(b.released)
 
 class ModuleConstantsTest(unittest.TestCase):
-    """Verify module constants"""
+    """verify module constants"""
     def test_PyBUF_x(self):
         """the PyBUF_<flag> PyObject_GetBuffer flags"""
         self.assertEqual(PyBUF_SIMPLE, 0)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.