Source

array-experiment / array.py

import cffi
ffi = cffi.FFI()
ffi.cdef("""
    typedef int wchar;
""") #XXX better

typecodes = {
    'c': 'char',
    'b': 'signed char',
    'B': 'unsigned char',
    'u': 'wchar',
    'h': 'signed short',
    'H': 'unsigned short',
    'i': 'signed int',
    'I': 'unsigned int',
    'l': 'signed long',
    'L': 'unsigned long',
    'f': 'float',
    'd': 'double',
}


def typecode2ctype(typecode):
    try:
        return typecodes[typecode]
    except KeyError:
        if not isinstance(typecode, str):
            raise TypeError('must be char, not %s' %(type(typecode).__name__,))
        if len(typecode) > 1:
            raise TypeError('must be char, not str')
        raise ValueError("bad typecode (must be c, b, B, u, h, H, i, I, l, L, f or d")


missing = object()


class cffi_array(object):
    __slots__ = [
        '_len', '_capacity', '_data',
        '_ctype', '_arraytype',
        '__weakref__',
    ]

    def __new__(cls, ctype):
        self = object.__new__(cls)
        self._ctype = ctype
        self._arraytype = ffi.getctype(self._ctype, '[]')
        self._len = 0
        self._capacity = 0
        self._data = ffi.NULL
        return self

    def _realloc(self, new_size):
        assert new_size > self._len
        new = ffi.new(self._arraytype, new_size)
        #XXX: memcopy
        #XXX: downsizing?

        if self._data is not ffi.NULL:
            ffi.buffer(new, ffi.sizeof(self._data))[:] = ffi.buffer(self._data)

        self._data = new
        self._capacity = new_size

    def __buffer__(self):
        return ffi.buffer(self,self._len * self.itemsize)

    @property
    def itemsize(self):
        return ffi.sizeof(self._ctype)

    def __len__(self):
        return self._len

    def append(self, item):
        #XXX: checksize
        if self._len == self._capacity:
            self._realloc(self._capacity*2 or 4)
        self._data[self._len] = item
        self._len += 1


class array(cffi_array):
    __slots__ = [
        'typecode',
    ]

    def __new__(cls, typecode, initializer=missing, **kw):
        ctype = typecode2ctype(typecode)
        self = cffi_array.__new__(cls, ctype)
        self.typecode = typecode
        if initializer is not missing:
            if isinstance(initializer, str):
                self.fromstring(initializer)
            else:
                self.extend(initializer)
        return self

    def buffer_info(self):
        return 0, self._len

    def byteswap(self):
        pass

    def __getitem__(self, index):
        if index >= self._len:
            raise IndexError(index)
        if index < 0:
            return self._data[len(self)+index]
        else:
            return self._data[index]


    def _prepare_assign(self, start, end, item_count):
        size = start-end
        if size < item_count:
            self._move_after(stop, slice_size-item_count)
        else:
            del self[slice.end-slice_size+item_count:slice.end]

    def __setitem__(self, index, val):
        if val is None:
            raise TypeError
        if isinstance(index, (int, long)):
            if index < 0:
                self._data[len(self)+index] = val
            else:
                self._data[index] = val
        elif isinstance(index, slice):
            #XXX: expensive, memmove/copy
            # also takes care of self assign
            val = array(self.typecode, val)
            self._prepare_assign(index, len(val))
            self._assign_buffer(index.start, val.__buffer__())


    def count(self, item):
        return sum(1 for x in self if x == item)


    def _swap(self, a, b):
        tmp = self[a]
        self[a] = self[b]
        self[b] = tmp

    def reverse(self):
        for i in range(len(self)/2):
            self._swap(i, -i-1)

    def __gt__(self, other):
        return list(self)>list(other) #XXX: evil

    def __lt__(self, other):
        return not self > other

    def __eq__(self, other):
        if not isinstance(other, array):
            return NotImplemented
        if len(self) != len(other):
            return False
        return all(a == b for a, b in zip(self, other))

    def extend(self, items):
        if isinstance(items, array):
            if items.typecode == self.typecode:
                count = len(items)
                buffer = ffi.buffer(items._data, self.itemsize*count)
                self._append_buffer(buffer, count)
            else:
                raise TypeError
        elif self.typecode=='c' and isinstance(items, str):
            self._append_buffer(items, len(items))
        elif isinstance(items, str):
            raise TypeError
        #XXX: speedups
        else:
            for item in items:
                self.append(item)

    def fromlist(self, items):
        old = self._len
        try:
            self.extend(items)
        except:
            self._len = old
            raise

    def tolist(self):
        return list(self)

    def _tobuffer(self):
        return ffi.buffer(self._data, self.itemsize * self._len)

    def tostring(self):
        return str(self._tobuffer())

    def fromstring(self, data):
        #XXX: expensive
        if len(data) % self.itemsize:
            raise ValueError('string size not aligned with itemsize')
        count = len(data)/self.itemsize
        self._append_buffer(data, count)

    def _prepare_extend(self, count):
        if (self._len + count) > self._capacity:
            self._realloc(self._len + count)

    
    def _append_buffer(self, data, count):
        self._prepare_extend(count)
        self._assign_buffer(self._len, data)
        self._len +=count

    def _assign_buffer(self, start, data):
        start = start*self.itemsize
        assign = ffi.buffer(self._data)
        assign[start:start+len(data)] = data

    def fromfile(self, fp, n):
        if not isinstance(fp, file):
            raise TypeError('arg must be open file')
        data = fp.read(n*self.itemsize)
        self._append_buffer(data, len(data)/self.itemsize)
        if len(data)!=n*self.itemsize:
            raise EOFError



    def tofile(self, fp):
        if not isinstance(fp, file):
            raise TypeError('arg must be open file')
        fp.write(self._tobuffer())

    def __add__(self, other):
        new = array(self.typecode, self)
        new.extend(other)
        return new

    def __mul__(self, other):
        #XXX int
        copy = array(self.typecode)
        copy._realloc(self._len*other)
        for i in range(other):
            copy.extend(self)

    __rmul__ = __mul__

    def __repr__(self):
        if self._len:
            #XXX: expensive
            return 'array(%r, %r)' % (self.typecode, list(self))
        else:
            return 'array(%r)' % (self.typecode, )

    def __copy__(self):
        return array(self.typecode, self)

    def __deepcopy__(self, memo):
        return self.__copy__()


    def __delitem__(self, item):
        if isinstance(item, int):
            if item < 0:
                item = self._len-item
            self._len -= 1
            if item == self._len:
                return # shortcut del last item
            start = item * self.itemsize
            end = start + self.itemsize
            buffer = ffi.buffer(self._data)
            buffer[start:-self.itemsize] = buffer[end:]
        elif isinstance(item, slice):
            assert not slice.step


    def __imul__(self, number):
        evil = array.array(self.typecode, self) *number
        self._data = evil._data
        self._len = evil._len
        self._capacity = evil._capacity
        return self

    def __iadd__(self, other):
        self.extend(other)
        return self
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.