Lenard Lindstrom avatar Lenard Lindstrom committed 6bf8a4e

Implement the BufferProxy.write method and clean up the bufferproxy.c code

Fill in the write method function code. Improve error handling for the raw
and length properties. For all properties, fix a parent with a read-only
buffer. Remove the vestiges of development code.

Comments (0)

Files changed (2)

src/bufferproxy.c

   Python level - __array_interface__ - are exposed.
  */
 
+#define PY_SSIZE_T_CLEAN
 #define PYGAMEAPI_BUFPROXY_INTERNAL
 #include "pygame.h"
 #include "pgcompat.h"
 static int PgBufproxy_Trip(PyObject *);
 static Py_buffer *_proxy_get_view (PgBufproxyObject*);
 
-/* $$ Transitional stuff */
-
-#define NOTIMPLEMENTED(rcode) \
-    PyErr_Format(PyExc_NotImplementedError, \
-                 "Not ready yet. (line %i in %s)", __LINE__, __FILE__); \
-    return rcode
-
-/* Use Dict_AsView alternative with flags arg. */
 static void _release_buffer_from_dict(Py_buffer *);
 
 static int
             return 0;
         }
         view_p->consumer = (PyObject *)proxy;
-        if (proxy->get_buffer(proxy->obj, view_p, PyBUF_RECORDS)) {
+        if (proxy->get_buffer(proxy->obj, view_p, PyBUF_RECORDS_RO)) {
             PyMem_Free(view_p);
             return 0;
         }
 proxy_get_raw(PgBufproxyObject *self, PyObject *closure)
 {
     Py_buffer *view_p = _proxy_get_view(self);
+    PyObject *py_raw = 0;
 
     if (!view_p) {
         return 0;
     }
     if (!PyBuffer_IsContiguous(view_p, 'A')) {
+        _proxy_release_view(self);
         PyErr_SetString(PyExc_ValueError, "the bytes are not contiguous");
         return 0;
     }
-    return Bytes_FromStringAndSize((char *)view_p->buf, view_p->len);
+    py_raw = Bytes_FromStringAndSize((char *)view_p->buf, view_p->len);
+    if (!py_raw) {
+        _proxy_release_view(self);
+        return 0;
+    }
+    return py_raw;        
 }
 
 static PyObject *
 proxy_get_length(PgBufproxyObject *self, PyObject *closure)
 {
     Py_buffer *view_p = _proxy_get_view(self);
+    PyObject *py_length = 0;
 
-    return view_p ? PyInt_FromSsize_t(view_p->len) : 0;
+    if (view_p) {
+        py_length = PyInt_FromSsize_t(view_p->len);
+        if (!py_length) {
+            _proxy_release_view(self);
+        }
+    }
+    return py_length;
 }
 
 /**** Methods ****/
  * Writes raw data to the buffer.
  */
 static PyObject *
-proxy_write(PgBufproxyObject *buffer, PyObject *args, PyObject *kwds)
+proxy_write(PgBufproxyObject *self, PyObject *args, PyObject *kwds)
 {
-    NOTIMPLEMENTED(0);
+    Py_buffer view;
+    const char *buf = 0;
+    Py_ssize_t buflen = 0;
+    Py_ssize_t offset = 0;
+    char *keywords[] = {"buffer", "offset", 0};
+
+#if Py_VERSION_HEX >= 0x02050000
+#define ARG_FORMAT "s#|n"
+#else
+#define ARG_FORMAT "s#|i"  /* In this case Py_ssize_t is an int */
+#endif
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     ARG_FORMAT, keywords,
+                                     &buf, &buflen, &offset)) {
+        return 0;
+    }
+#undef ARG_FORMAT
+
+    if (PyObject_GetBuffer((PyObject *)self, &view, PyBUF_RECORDS)) {
+        return 0;
+    }
+    if (!PyBuffer_IsContiguous(&view, 'A')) {
+        PyBuffer_Release(&view);
+        PyErr_SetString(PyExc_ValueError,
+                        "the BufferProxy bytes are not contiguous");
+        return 0;
+    }
+    if (buflen > view.len) {
+        PyBuffer_Release(&view);
+        PyErr_SetString(PyExc_ValueError,
+                        "'buffer' object length is too large");
+        return 0;
+    }
+    if (offset < 0 || buflen + offset > view.len) {
+        PyBuffer_Release(&view);
+        PyErr_SetString(PyExc_IndexError,
+                        "'offset' is out of range");
+        return 0;
+    }
+    memcpy((char *)view.buf + offset, buf, (size_t)buflen);
+    PyBuffer_Release(&view);
+    Py_RETURN_NONE;
 }
 
 static struct PyMethodDef proxy_methods[] = {
     PyType_GenericAlloc,        /* tp_alloc */
     proxy_new,                  /* tp_new */
     PyObject_GC_Del,            /* tp_free */
-#ifndef __SYMBIAN32__
-    0,                          /* tp_is_gc */
-    0,                          /* tp_bases */
-    0,                          /* tp_mro */
-    0,                          /* tp_cache */
-    0,                          /* tp_subclasses */
-    0,                          /* tp_weaklist */
-    0                           /* tp_del */
-#endif
 };
 
 /**** Module methods ***/

test/bufferproxy_test.py

         self.assertEqual(r[-2:], '>*')
 
     if pygame.HAVE_NEWBUF:
-        def test_newbuf_arg(self):
-            self.NEWBUF_test_newbuf_arg()
+        def test_newbuf(self):
+            self.NEWBUF_test_newbuf()
 
-    def NEWBUF_test_newbuf_arg(self):
-        from pygame.newbuffer import BufferMixin
-        from ctypes import (c_char, c_short, c_ssize_t, addressof, sizeof,
-                            create_string_buffer, cast, POINTER)
-        
-        class Array(BufferMixin):
-            def __init__(self, format, init):
-                if format == 'B':
-                    c_itemtype = c_char
-                elif format == 'h':
-                    c_itemtype = c_short
-                else:
-                    raise ValueError("Unknown item format '" + format + "'")
-                self._format = create_string_buffer(format.encode('ascii'))
-                self.itemsize = sizeof(c_itemtype)
-                self._nitems = len(init)
-                self._len = self._nitems * self.itemsize
-                self.buffer = (c_itemtype * self._nitems)(*init)
-                self._len = sizeof(self.buffer)
-                self.ndim = 1
-                self.shape = (c_ssize_t * 1)(self._nitems)
-                self.strides = (c_ssize_t * 1)(self.itemsize)
-            def buffer_info(self):
-                return (addressof(self.buffer), self._nitems)
-            def tobytes(self):
-                return cast(self.buffer, POINTER(c_char))[0:self._len]
-            def __len__(self):
-                return self._nitems
-            def _get_buffer(self, view, flags):
-                view.buf = addressof(self.buffer)
-                view.len = self._len
-                view.readonly = False
-                view.format = addressof(self._format)
-                view.itemsize = self.itemsize
-                view.ndim = self.ndim
-                view.shape = addressof(self.shape)
-                view.strides = addressof(self.strides)
-                view.suboffsets = None
-                view.internal = None
-                view.obj = self
+    def NEWBUF_test_newbuf(self):
+        if is_pygame_pkg:
+            from pygame.tests.test_utils import array
+        else:
+            from test.test_utils import array
+        from ctypes import string_at
 
-        raw = as_bytes('abc\x00def')
-        b = Array('B', raw)
-        b_address, b_nitems = b.buffer_info()
-        v = BufferProxy(b)
-        self.assertEqual(v.length, len(b))
-        self.assertEqual(v.raw, raw)
-        d = v.__array_interface__
+        exp = array.BufferExporter((10,), 'B', readonly=True)
+        b = BufferProxy(exp)
+        self.assertEqual(b.length, exp.len)
+        self.assertEqual(b.raw, string_at(exp.buf, exp.len).encode('latin_1'))
+        d = b.__array_interface__
         try:
             self.assertEqual(d['typestr'], '|u1')
-            self.assertEqual(d['shape'], (b_nitems,))
-            self.assertEqual(d['strides'], (b.itemsize,))
-            self.assertEqual(d['data'], (b_address, False))
+            self.assertEqual(d['shape'], exp.shape)
+            self.assertEqual(d['strides'], exp.strides)
+            self.assertEqual(d['data'], (exp.buf, True))
         finally:
             d = None
-        b = Array('h', [-1, 0, 2])
-        v = BufferProxy(b)
-        b_address, b_nitems = b.buffer_info()
-        b_nbytes = b.itemsize * b_nitems
-        self.assertEqual(v.length, b_nbytes)
-        try:
-            s = b.tostring()
-        except AttributeError:
-            s = b.tobytes()
-        self.assertEqual(v.raw, s)
-        d = v.__array_interface__
+        exp = array.BufferExporter((3,), '=h')
+        b = BufferProxy(exp)
+        self.assertEqual(b.length, exp.len)
+        self.assertEqual(b.raw, string_at(exp.buf, exp.len))
+        d = b.__array_interface__
         try:
             lil_endian = pygame.get_sdl_byteorder() == pygame.LIL_ENDIAN
-            f = '{}i{}'.format('<' if lil_endian else '>', b.itemsize)
+            f = '{}i{}'.format('<' if lil_endian else '>', exp.itemsize)
             self.assertEqual(d['typestr'], f)
-            self.assertEqual(d['shape'], (b_nitems,))
-            self.assertEqual(d['strides'], (b.itemsize,))
-            self.assertEqual(d['data'], (b_address, False))
+            self.assertEqual(d['shape'], exp.shape)
+            self.assertEqual(d['strides'], exp.strides)
+            self.assertEqual(d['data'], (exp.buf, False))
         finally:
             d = None
 
+        exp = array.BufferExporter((10, 2), '=i')
+        b = BufferProxy(exp)
+        imp = array.BufferImporter(b, array.PyBUF_RECORDS)
+        self.assertTrue(imp.obj is b)
+        self.assertEqual(imp.buf, exp.buf)
+        self.assertEqual(imp.ndim, exp.ndim)
+        self.assertEqual(imp.format, exp.format)
+        self.assertEqual(imp.readonly, exp.readonly)
+        self.assertEqual(imp.itemsize, exp.itemsize)
+        self.assertEqual(imp.len, exp.len)
+        self.assertEqual(imp.shape, exp.shape)
+        self.assertEqual(imp.strides, exp.strides)
+        self.assertTrue(imp.suboffsets is None)
+
+        d = {'typestr': '|u1',
+             'shape': (10,),
+             'strides': (1,),
+             'data': (9, True)} # 9? Will not reading the data anyway.
+        b = BufferProxy(d)
+        imp = array.BufferImporter(b, array.PyBUF_SIMPLE)
+        self.assertTrue(imp.obj is b)
+        self.assertEqual(imp.buf, 9)
+        self.assertEqual(imp.len, 10)
+        self.assertEqual(imp.format, None)
+        self.assertEqual(imp.itemsize, 1)
+        self.assertEqual(imp.ndim, 0)
+        self.assertTrue(imp.readonly)
+        self.assertTrue(imp.shape is None)
+        self.assertTrue(imp.strides is None)
+        self.assertTrue(imp.suboffsets is None)
+
     try:
         pygame.bufferproxy.get_segcount
     except AttributeError:
                           'strides': (12, 4)})
         self.assertEqual(bf.length, 3*3*4)
 
-    def todo_test_raw(self):
+    def test_raw(self):
 
         # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.raw:
 
           # The raw buffer data as string. The string may contain NUL bytes.
 
-        self.fail()
+        bf = BufferProxy({'shape': (len(self.content),),
+                          'typestr': '|u1',
+                          'data': self.data})
+        self.assertEqual(bf.raw, self.content)
+        bf = BufferProxy({'shape': (3, 4),
+                          'typestr': '|u4',
+                          'data': self.data,
+                          'strides': (4, 12)})
+        self.assertEqual(bf.raw, self.content)
+        bf = BufferProxy({'shape': (3, 4),
+                          'typestr': '|u1',
+                          'data': self.data,
+                          'strides': (16, 4)})
+        self.assertRaises(ValueError, getattr, bf, 'raw')
 
     def todo_test_write(self):
 
           # If the length of the passed buffer exceeds the length of the
           # BufferProxy (reduced by the offset), an IndexError will be raised.
 
+        buf = ctypes.create_string_buffer(self.content)
         self.fail()
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.