Source

tutagx / tutagx / meta / common.py

Full commit
import abc
import logging
import math
import contextlib
from tutagx.meta import process
from tutagx.meta.model import ModelMeta, Ref, Value

__all__ = ['FunctionCodeGen']

log = logging.getLogger(__name__)


def typeident(cls):
    """
    Create a Python identifier from a model class.
    The results are by no means unique, but they do depend heavily on the
    type passed in.
    Therefore, this process is an easy way to name functions in generated code
    that nevertheless leads to somewhat readable output.
    """
    def visit_float(node):
        return 'float'

    def visit_integer(node):
        return 'int'

    def visit_string(node):
        return 'str'

    def visit_list(node):
        return 'list_' + visit(node.items)

    def visit_maybe(node):
        return 'maybe_' + visit(node.t)

    def visit_ref(node):
        return visit(ModelMeta.struct_for(node))

    visit_value = visit_ref

    def visit_dict(node):
        return 'dict_' + visit(node.keys) + '_' + visit(node.values)

    def visit_struct(node):
        # Uses member names only for readability.
        # It also avoids excessive recursion and cycles - in fact, not
        # recursing here breaks ALL cycles (unless I'm horribly mistaken).
        # This saves us a seen set/dict and repeatedly checking it.
        return 'struct_' + node.name

    def visit_union(node):
        return 'union_' + '_'.join(name for name, t in node.alternatives)

    visit = process.process(locals())
    node = wrap_model(cls)
    return visit(node)


class FunctionCodeGen(process.ModelProcess):
    """
    Utilities to dynamically generate a bunch of functions as source code.
    Makes sure identifiers don't clash, as long as gensmy() is used and
    reserved identifers are passed to the constructor.

    #TODO document all the _CONSTANTS used
    """
    _PRELUDE = ""
    _ARGS = ()  # subclasses override this, e.g. (obj, seen) for serialization

    def __init__(self, cls, reserved):
        import builtins
        super().__init__()
        # To avoid generating symbols that clash with stuff the actual
        # subclass will need, we pre-fill the symbol table with
        # identifiers that the generated code may use
        self._symbols = set(reserved) | set(dir(builtins)) | set(self._ARGS)
        self._functions = {}
        self._buffer = [self._PRELUDE]
        self._indent = 0
        self._scheduled = []
        if cls.is_value_type:
            node = Value(cls)
        else:
            node = Ref(cls)
        self.toplevel_node = node
        entry_point = self.genfunc(node)
        # create functions that have been delayed (see genfunc)
        while self._scheduled:
            name, node = self._scheduled.pop()
            self._create_function(name, node)
        f = self.compile(entry_point)
        setattr(self, self._TARGET_FUNC_ATTR, f)

    def gensym(self, hint):
        """
        Choose an identifier that's not taken yet, based on a hint.
        It is guaranteed that the chosen identifier is either identical to the
        hint or starts with the hint.
        """
        if hint not in self._symbols:
            self._symbols.add(hint)
            return hint
        i = 1
        name = hint + '1'
        while name in self._symbols:
            i += 1
            name = hint + str(i)
        self._symbols.add(name)
        return name

    def genfunc(self, node):
        """
        If genfunc() was already called with ``node``, return the identifier
        of the function generated for it.
        Otherwise, create a new function for the model type, and return its
        identifier.
        """
        if node in self._functions:
            return self._functions[node]
        name = self.gensym(self._HINT_PREFIX + typeident(node))
        self._functions[node] = name
        # Delay creation of actual function definition.
        # Late binding(tm) allows us to get away with this, and it prevents
        # deeply nested functions and ugly ``global`` statements.
        self._scheduled.append((name, node))
        return name

    def _create_function(self, name, node):
        args = ', '.join(self._ARGS)
        line_count = len(self._buffer)
        with self.block('def {}({}):', name, args):
            self.visit(node)

    def line(self, line, *args):
        """
        Given a format string, format it with the further arguments, add
        the current indentation, and append it to the output.
        """
        line = ' ' * (4 * self._indent) + line.format(*args)
        self._buffer.append(line)

    @contextlib.contextmanager
    def block(self, line, *args):
        """
        Allow generating Python blocks via a context manager.
        Pass on the arguments to line(), starting a block with indent(),
        and closing it with dedent() after with statement's body finished.
        """
        self.line(line, *args)
        self.indent()
        yield
        self.dedent()

    def indent(self):
        """
        Add a level of indentation for future .line() calls
        """
        self._indent += 1

    def dedent(self):
        """
        Remove a level of indentation for future .line() calls.
        """
        if self._indent == 0:
            raise RuntimeError("Cannot dedent further")
        self._indent -= 1

    def compile(self, entry_point):
        """
        Compile the code output so far and return the object (most likely
        a function) called entry_point.
        Can be called multiple times without losing previously compiled code.
        """
        self._buffer = ['\n'.join(self._buffer)]
        source = self._buffer[0]
        namespace = self.make_namespace()
        line_count = source.count('\n')
        line_no_len = int(math.ceil(math.log10(line_count)))
        fmt = '{0:>' + str(line_no_len) + '}\t|{1}'
        src_with_lineno = '\n'.join(
            fmt.format(i, line)
            for i, line in enumerate(source.split('\n'), 1)
        )
        log.debug("compiling for %r {\n%s\n}", type(self), src_with_lineno)
        exec(source, namespace)
        return namespace[entry_point]

    @abc.abstractmethod
    def make_namespace(self):
        """
        Called by compile() to allow giving extra data to the compiled code.
        The return value, a dictionary, is used as namedspace for exec().
        Note that this alters the dictionary, returning a copy may be useful.
        """
        pass

def wrap_model(cls):
    if not isinstance(cls, ModelMeta):
        return cls
    if cls.is_value_type:
        return Value(cls)
    return Ref(cls)