1. Python CFFI
  2. Untitled project
  3. array-experiment


array-experiment / array.py

import cffi
ffi = cffi.FFI()
    typedef int wchar;

""") #XXX better

typecodes = {
    'c': 'char',
    'b': 'signed char',
    'B': 'unsigned char',
    'u': 'wchar',
    'h': 'signed short',
    'H': 'unsigned short',
    'i': 'signed int',
    'I': 'unsigned int',
    'l': 'signed long',
    'L': 'unsigned long',
    'f': 'float',
    'd': 'double',

def typecode2ctype(typecode):
        return typecodes[typecode]
    except KeyError:
        if not isinstance(typecode, str):
            raise TypeError('must be char, not %s' %(type(typecode).__name__,))
        if len(typecode) > 1:
            raise TypeError('must be char, not str')
        raise ValueError("bad typecode (must be c, b, B, u, h, H, i, I, l, L, f or d")

missing = object()

def buffer(array, ctype, start=0, items=None):
    if items is None:
        items = len(array)
    assert (start+items) <= len(array)
    return ffi.buffer(array+start, items*ffi.sizeof(ctype))

class cffi_array(object):
    __slots__ = [
        '_len', '_capacity', '_data',
        'ctype', 'arraytype',

    def __new__(cls, ctype):
        self = object.__new__(cls)
        self.ctype = ctype
        self.arraytype = ffi.getctype(self.ctype, '[]')
        self._len = 0
        self._capacity = 0
        self._data = ffi.NULL
        return self

    def _realloc(self, new_size):
        assert new_size > self._len
        new = ffi.new(self.arraytype, new_size)
        #XXX: memcopy
        #XXX: downsizing?

        if self._data is not ffi.NULL:
                target = buffer(new, self.ctype, items=self._len)
                target[:] = buffer(self._data, self.ctype, items=self._len)

        self._data = new
        self._capacity = new_size

    def __buffer__(self):
        return buffer(self._data, self.ctype, items=self._len)

    def __getitem__(self, index):
        if isinstance(index, (int, long)):
            if index >= self._len:
                raise IndexError(index)
            if index < 0:
                return self._data[len(self)+index]
                return self._data[index]
        elif isinstance(index, slice):
            copy = array(self.typecode)
            #XXX: negative step & co
            if index.step:
                for i in range(index.start or 0, index.stop or self._len, index.step):
                    return copy
                start = index.start or 0
                items = (index.stop or self._len) - start
                buf = buffer(self._data, self.ctype, start, items)
                copy._append_buffer(buf, items)
            return copy

    def itemsize(self):
        return ffi.sizeof(self.ctype)

    def __len__(self):
        return self._len

    def append(self, item):
        #XXX: checksize
        if self._len == self._capacity:
            self._realloc(self._capacity*2 or 4)
        self._data[self._len] = item
        self._len += 1

class array(cffi_array):
    __slots__ = [

    def __new__(cls, typecode, initializer=missing, **kw):
        ctype = typecode2ctype(typecode)
        self = cffi_array.__new__(cls, ctype)
        self.typecode = typecode
        if initializer is not missing:
            if isinstance(initializer, str):
        return self

    def buffer_info(self):
        return 0, self._len

    def byteswap(self):

    def _make_hole(self, start, size):
        if (self._capacity-self._len) < size:
            self._realloc(self._len + size*2)

        #XXX: memmove
        for i in range(self._len, start, -1):
            self._data[i+size] = self._data[i]

    def _prepare_assign(self, start, end, item_count):
        size = start - end
        delta = item_count - size
        if not delta:
        elif delta > 0:
            self._make_hole(end, item_count - size)
        elif delfa < 0:
            remove = size - item_count
            del self[end-size+item_count:end]

        self._len += delta

    def __setitem__(self, index, val):
        if val is None:
            raise TypeError
        if isinstance(index, (int, long)):
            if index < 0:
                self._data[len(self)+index] = val
            elif index < self._len:
                self._data[index] = val
                raise IndexError(index)
        elif isinstance(index, slice):
            #XXX: expensive, memmove/copy
            # also takes care of self assign
            val = array(self.typecode, val)
            if not index.step:
                self._prepare_assign(index.start or 0, index.stop or len(self), len(val))
                self._assign_buffer(index.start or 0, val.__buffer__())
                raise NotImplementedError

    def pop(self, index=missing):
        if index is missing:
            if self._len:
                return self._data[self._len]
            raise IndexError("cant pop empty array")
        elif isinstance(index, (int,long)):
            if index < self._len:
                data = self[index]
                del self[index]
                return data
            raise IndexError(index)
        raise TypeError("if given the index must be a integer") #XXX bad error

    def remove(self, value):
        for i, v in enumerate(self):
            if v == value:
                del self[i]

    def count(self, item):
        return sum(1 for x in self if x == item)

    def _swap(self, a, b):
        tmp = self[a]
        self[a] = self[b]
        self[b] = tmp

    def reverse(self):
        for i in range(len(self)/2):
            self._swap(i, -i-1)

    def __gt__(self, other):
        return list(self)>list(other) #XXX: evil

    def __lt__(self, other):
        return not self > other

    def __eq__(self, other):
        if not isinstance(other, array):
            return NotImplemented
        if len(self) != len(other):
            return False
        return all(a == b for a, b in zip(self, other))

    def extend(self, items):
        if isinstance(items, array):
            if items.typecode == self.typecode:
                count = len(items)
                buffer = ffi.buffer(items._data, self.itemsize*count)
                self._append_buffer(buffer, count)
                raise TypeError
        elif self.typecode=='c' and isinstance(items, str):
            self._append_buffer(items, len(items))
        elif isinstance(items, str):
            raise TypeError
        #XXX: speedups
            for item in items:

    def fromlist(self, items):
        old = self._len
            self._len = old

    def tolist(self):
        return list(self)

    def _tobuffer(self):
        return ffi.buffer(self._data, self.itemsize * self._len)

    def tostring(self):
        return str(self._tobuffer())

    def fromstring(self, data):
        #XXX: expensive
        if len(data) % self.itemsize:
            raise ValueError('string size not aligned with itemsize')
        count = len(data)/self.itemsize
        self._append_buffer(data, count)

    def _prepare_extend(self, count):
        if (self._len + count) > self._capacity:
            self._realloc(self._len + count)

    def _append_buffer(self, data, count):
        self._assign_buffer(self._len, data)
        self._len +=count

    def _assign_buffer(self, start, data):
        start = start*self.itemsize
        assign = ffi.buffer(self._data)
        assign[start:start+len(data)] = data

    def fromfile(self, fp, n):
        if not isinstance(fp, file):
            raise TypeError('arg must be open file')
        data = fp.read(n*self.itemsize)
        self._append_buffer(data, len(data)/self.itemsize)
        if len(data)!=n*self.itemsize:
            raise EOFError

    def tofile(self, fp):
        if not isinstance(fp, file):
            raise TypeError('arg must be open file')

    def __add__(self, other):
        new = array(self.typecode, self)
        return new

    def __mul__(self, other):
        #XXX int
        copy = array(self.typecode)
        for i in range(other):

    __rmul__ = __mul__

    def __repr__(self):
        if self._len:
            #XXX: expensive
            return 'array(%r, %r)' % (self.typecode, list(self))
            return 'array(%r)' % (self.typecode, )

    def __copy__(self):
        return array(self.typecode, self)

    def __deepcopy__(self, memo):
        return self.__copy__()

    def __delitem__(self, item):
        if isinstance(item, int):
            if item < 0:
                item = self._len-item
            self._len -= 1
            if item == self._len:
                return # shortcut del last item
            start = item * self.itemsize
            end = start + self.itemsize
            buffer = ffi.buffer(self._data)
            buffer[start:-self.itemsize] = buffer[end:]
        elif isinstance(item, slice):
            assert not slice.step

    def __imul__(self, number):
        evil = array.array(self.typecode, self) *number
        self._data = evil._data
        self._len = evil._len
        self._capacity = evil._capacity
        return self

    def __iadd__(self, other):
        return self