Source

cffi / cffi / builder.py

Full commit
import os

from .api import FFI
from . import model


MODULE_BOILERPLATE = """
#####                                                                  #####
##### NOTE: This module is generated by cffi. DO NOT EDIT IT MANUALLY. #####
#####                                                                  #####

import pickle
from cffi import FFI, model


_ffi = FFI()


### Proxy `_ffi` methods to make things more convenient.


typeof = _ffi.typeof
sizeof = _ffi.sizeof
alignof = _ffi.alignof
offsetof = _ffi.offsetof
new = _ffi.new
cast = _ffi.cast
string = _ffi.string
buffer = _ffi.buffer
callback = _ffi.callback
getctype = _ffi.getctype
gc = _ffi.gc


# Can't have properties on modules. :-(

def get_errno():
    return _ffi.errno

def set_errno(errno):
    _ffi.errno = errno


addressof = _ffi.addressof
new_handle = _ffi.new_handle
from_handle = _ffi.from_handle


### The functions below are generated by cffi.
"""


DLOPEN_FUNC_TEMPLATE = """
def load_%s():
    return _ffi.dlopen(%r, flags=%r)
"""


MAKELIB_FUNC_TEMPLATE = """
def load_%s():
    import os.path
    from cffi.verifier import Verifier
    module_path = os.path.dirname(__file__)
    verifier = Verifier(_ffi, None, module_path, %r, force_generic_engine=True)
    verifier._has_module = True
    return verifier._load_library()
"""


class NotReadyYet(Exception):
    pass


class DeclarationBuilder(object):
    def __init__(self, model_object, built_declarations, our_declarations):
        self._model_object = model_object
        self._built_declarations = built_declarations
        self._our_declarations = our_declarations

    def _format_param(self, param):
        if isinstance(param, model.BaseTypeByIdentity):
            od = (type(param), getattr(param, 'name', None))
            if od not in self._our_declarations:
                return DeclarationBuilder(
                    param, self._built_declarations,
                    self._our_declarations).build()
            if param not in self._built_declarations:
                raise NotReadyYet()
            return "declarations[%r]" % self._built_declarations[param]
        if isinstance(param, tuple):
            return '(%s,)' % ', '.join(self._format_param(p) for p in param)
        return repr(param)

    def build(self):
        try:
            params = [(k, self._format_param(v))
                      for k, v in self._model_object._get_items()]
            if isinstance(self._model_object, model.StructOrUnion):
                params.extend([
                    ('fldnames', self._format_param(
                        self._model_object.fldnames)),
                    ('fldtypes', self._format_param(
                        self._model_object.fldtypes)),
                    ('fldbitsize', self._format_param(
                        self._model_object.fldbitsize)),
                ])
            elif isinstance(self._model_object, model.EnumType):
                params.extend([
                    ('enumerators', self._format_param(
                        self._model_object.enumerators)),
                    ('enumvalues', self._format_param(
                        self._model_object.enumvalues)),
                    ('baseinttype', self._format_param(
                        self._model_object.baseinttype)),
                ])
        except NotReadyYet:
            return None

        return "model.%s(%s)" % (
            self._model_object.__class__.__name__, ', '.join(
                '%s=%s' % param for param in params))


class FFIBuilder(object):
    def __init__(self, module_name, build_path, backend=None):
        module_package = ''
        if '.' in module_name:
            module_package, module_name = module_name.rsplit('.', 1)
        self._module_package = module_package
        self._module_name = module_name
        self._build_path = build_path
        self.ffi = FFI(backend=backend)
        self._built_files = []
        self._module_source = MODULE_BOILERPLATE

    def _filename(self, name, suffix='.py'):
        parts = self._module_package.split('.')
        parts.append(name + suffix)
        return os.path.join(*parts)

    def cdef(self, csource, override=False):
        self.ffi.cdef(csource, override=override)

    def add_dlopen(self, libname, name, flags=0):
        lib = self.ffi.dlopen(name, flags=flags)
        self._module_source += DLOPEN_FUNC_TEMPLATE % (libname, name, flags)
        return lib

    def makelib(self, libname, source='', **kwargs):
        # XXX: We use force_generic_engine here because vengine_cpy collects
        #      types when it writes the source.
        kwargs['force_generic_engine'] = True
        from .verifier import Verifier, _get_so_suffix
        self.ffi.verifier = Verifier(self.ffi, source, **kwargs)
        barefilename = '_'.join([self._module_name, libname])
        libfile_path = self._filename(barefilename, _get_so_suffix())
        libfile_build_path = os.path.join(self._build_path, libfile_path)
        self.ffi.verifier.make_library(libfile_build_path)
        self._module_source += MAKELIB_FUNC_TEMPLATE % (libname, barefilename)
        self._built_files.append(libfile_path)
        return self.ffi.verifier._load_library()

    def _write_declarations(self):
        self._module_source += "def _make_declarations():\n"
        self._module_source += "    declarations = {}\n"

        declarations = self.ffi._parser._declarations
        our_decls = set((type(obj), getattr(obj, 'name', None))
                        for obj in declarations.values())
        built_decls = {}
        decls = [(k, DeclarationBuilder(v, built_decls, our_decls))
                 for k, v in self.ffi._parser._declarations.items()]

        max_tries = (len(decls) + 1) ** 2 / 2

        tries = 0
        while decls:
            tries += 1
            if tries > max_tries:
                raise Exception("Problem serialising declarations.")
            name, dbuilder = decls.pop(0)
            instantiation = dbuilder.build()
            if instantiation is None:
                decls.append((name, dbuilder))
            else:
                built_decls[dbuilder._model_object] = name
                self._module_source += "    declarations[%r] = %s\n" % (
                    name, instantiation)
                if getattr(dbuilder._model_object, 'partial_resolved', None):
                    self._module_source += (
                        "    declarations[%r].partial = True\n" % (name,))
                    self._module_source += (
                        "    declarations[%r].partial_resolved = True\n" % (
                            name,))

        self._module_source += "    return declarations\n\n"
        self._module_source += (
            "_ffi._parser._declarations = _make_declarations()\n")

    def write_ffi_module(self):
        self._write_declarations()
        try:
            os.makedirs(self._build_path)
        except OSError:
            pass

        module_path = self._filename(self._module_name)
        module_build_path = os.path.join(self._build_path, module_path)
        file = open(module_build_path, 'w')
        try:
            file.write(self._module_source)
        finally:
            file.close()
        self._built_files.append(module_path)

    def list_built_files(self):
        return self._built_files