Commits

rkruppe  committed d0cbbf1

Split primitives -> objects decoding and yaml -> primitives conversion

  • Participants
  • Parent commits 639cb6a
  • Branches experiments

Comments (0)

Files changed (3)

File tutagx/meta/__init__.py

-from . import encode, model, process, to_yaml, from_yaml, query, validate
+from . import (
+    decode, encode, model, process, to_yaml, from_yaml, query, validate
+)
 

File tutagx/meta/decode.py

+import abc
+from functools import lru_cache
+
+from tutagx.meta import model, process
+from tutagx.meta.model import ModelMeta, Value
+from tutagx.meta.common import typeident, wrap_model
+import tutagx.util.codegen as cg
+
+
+class ObjectCreationStrategy(metaclass=abc.ABCMeta):
+    def __init__(self, emit):
+        self._emit = emit
+
+    @abc.abstractmethod
+    def construct(self, target, cls):
+        pass
+
+    @abc.abstractmethod
+    def add_attributes(self, target, params):
+        pass
+
+    @abc.abstractmethod
+    def finish(self, obj):
+        pass
+
+    @property
+    def extra_args(self):
+        return ()
+
+
+class PlainObjectCreation(ObjectCreationStrategy):
+    def construct(self, target, cls):
+        self._emit.linef(
+            '{} = classes[{!r}, {!r}](already_complete=False)',
+            target, cls.__module__, cls.__name__
+        )
+
+    def add_attributes(self, target, params):
+        for name, expr in params:
+            self._emit.linef('{}.{} = {}', target, name, expr)
+
+    def finish(self, obj):
+        self._emit.line(obj, '._on_loaded()')
+
+
+def _ocs(cls, emit):
+    ocs_class = getattr(cls, 'CONSTRUCTION_STRATEGY', PlainObjectCreation)
+    ocs = ocs_class(emit)
+    return ocs
+
+
+class _Placeholder:
+    def __init__(self):
+        self._locations = []
+
+    def add_location(self, obj, attr):
+        self._locations.append((obj, attr))
+
+    def replace(self, actual_object):
+        for obj, attr in self._locations:
+            setattr(obj, attr, actual_object)
+        del self._locations[:]
+
+    # N.B. A __del__ method that warns on non-empty _locations would be useful
+    # if not for the fact that cycles with __del__ methods are not collected
+    # (Since the whole point of _Placeholder is replacing object attributes
+    # which refer to the placeholder, cycles would be inevitable.)
+
+
+@lru_cache()
+def make_decoder(cls):
+    RESERVED = ('classes', 'result', 'oid', 'json_val', 'k', '__Placeholder')
+    emit = cg.CodeEmitter()
+    ocs = _ocs(cls, emit)
+    sig = cg.Signature('$obj', 'seen', 'unfinished', *ocs.extra_args)
+    symtab = cg.SymbolTable(RESERVED, sig)
+    classes = {}
+
+    def conversion(expr, t):
+        if isinstance(t, (model.Integer, model.Float, model.String)):
+            return expr
+        if isinstance(t, model.List):
+            conv = yield from list_conv(t, expr)
+            return conv
+        if isinstance(t, model.Dict):
+            conv = yield from dict_conv(t, expr)
+            return conv
+        if isinstance(t, model.Maybe):
+            t_conv = yield from conversion(expr, t.t)
+            if t_conv == expr:
+                return expr
+            return '(None if {0} is None else ({1}))'.format(expr, t_conv)
+        else:
+            func = yield t
+            return sig.call(func, expr)
+
+    def list_conv(node, list_expr):
+        item_expr = yield from conversion('item', node.items)
+        return '[{} for item in {}]'.format(item_expr, list_expr)
+
+    def dict_conv(node, dict_expr):
+        key_expr = yield from conversion('k', node.keys)
+        val_expr = yield from conversion(str(dict_expr) + '[k]', node.values)
+        return '{{{}: {} for k in {}}}'.format(key_expr, val_expr, dict_expr)
+
+    def visit_toplevel(node):
+        if isinstance(node, Value):
+            yield from visit_value(node)
+            return
+        cls = ModelMeta.model_for(node)
+        struct = ModelMeta.struct_for(node)
+        classes[cls.__module__, cls.__name__] = cls
+        member_names = tuple(name for name, _ in struct.members)
+        ocs = _ocs(cls, emit)
+        loaders = []
+        for name, t in struct.members:
+            conv = yield from conversion('obj[{!r}]'.format(name), t)
+            loaders.append(conv)
+
+        emit.line('oid = obj["$id"]')
+        emit.line('if oid in unfinished: result = seen[oid]')
+        with emit.block('else'):
+            ocs.construct('result', cls)
+            emit.line('seen[oid] = result')
+        emit.line('unfinished.discard(oid)')
+        emit.line('result.object_id = oid')
+        ocs.add_attributes('result', zip(member_names, loaders))
+        ocs.finish('result')
+        emit.line('return result')
+
+    def visit_ref(node):
+        cls = ModelMeta.model_for(node)
+        struct_node = ModelMeta.struct_for(node)
+        classes[cls.__module__, cls.__name__] = cls
+        emit.line('oid = obj')
+        emit.line(
+            'assert isinstance(oid, str), repr(oid) + " is not an ID"'
+        )
+        emit.line('if oid in seen: return seen[oid]')
+        ocs = _ocs(cls, emit)
+        ocs.construct('obj', cls)
+        emit.line('seen[oid] = obj')
+        emit.line('unfinished.add(oid)')
+        emit.line('return obj')
+
+    def visit_value(node):
+        cls = ModelMeta.model_for(node)
+        struct = ModelMeta.struct_for(node)
+        classes[cls.__module__, cls.__name__] = cls
+        member_names = tuple(name for name, _ in struct.members)
+        loaders = []
+        for name, t in struct.members:
+            conv = yield from conversion('obj[{!r}]'.format(name), t)
+            loaders.append(conv)
+        ocs = _ocs(cls, emit)
+        ocs.construct('result', cls)
+        ocs.add_attributes('result', zip(member_names, loaders))
+        ocs.finish('result')
+        emit.line('return result')
+
+    visit_integer = visit_string = visit_float = NotImplemented
+    visit_list = visit_dict = NotImplemented
+    visit_struct = visit_maybe = NotImplemented
+
+    def visit_union(node):
+        with emit.block('if isinstance(obj, str)'):
+            emit.line('return __Placeholder()')
+        for tag, t in node.alternatives:
+            with emit.block('if obj["$type"] == ', repr(tag)):
+                conv = yield from conversion('obj', t)
+                emit.line('return ', conv)
+
+    code_ns = {'classes': classes, '__Placeholder': _Placeholder}
+
+    gen = cg.generate_code(
+        locals(), typeident, sig, emit, symtab,
+        entry_point=visit_toplevel, code_ns=code_ns
+    )
+    return gen(wrap_model(cls))
+

File tutagx/meta/from_yaml.py

-import abc
 from collections import namedtuple
-import itertools
 import yaml
 
-from tutagx.meta import model, validate, process
-from tutagx.meta.model import ModelMeta, Value
-from tutagx.meta.common import typeident, wrap_model
-import tutagx.util.codegen as cg
+from tutagx.meta import validate, decode
+from tutagx.meta.decode import ObjectCreationStrategy
 
 _ReaderNamespace = namedtuple('_ReaderNamespace', 'seen unfinished extra')
 
 
-class _Placeholder:
-    def __init__(self):
-        self._locations = []
-
-    def add_location(self, obj, attr):
-        self._locations.append((obj, attr))
-
-    def replace(self, actual_object):
-        for obj, attr in self._locations:
-            setattr(obj, attr, actual_object)
-        del self._locations[:]
-
-    # N.B. A __del__ method that warns on non-empty _locations would be useful
-    # if not for the fact that cycles with __del__ methods are not collected
-    # (Since the whole point of _Placeholder is replacing object attributes
-    # which refer to the placeholder, cycles would be inevitable.)
-
-
-class ObjectCreationStrategy(metaclass=abc.ABCMeta):
-    def __init__(self, emit):
-        self._emit = emit
-
-    @abc.abstractmethod
-    def construct(self, target, cls):
-        pass
-
-    @abc.abstractmethod
-    def add_attributes(self, target, params):
-        pass
-
-    @abc.abstractmethod
-    def finish(self, obj):
-        pass
-
-    @property
-    def extra_args(self):
-        return ()
-
-
-class PlainObjectCreation(ObjectCreationStrategy):
-    def construct(self, target, cls):
-        self._emit.linef(
-            '{} = classes[{!r}, {!r}](already_complete=False)',
-            target, cls.__module__, cls.__name__
-        )
-
-    def add_attributes(self, target, params):
-        for name, expr in params:
-            self._emit.linef('{}.{} = {}', target, name, expr)
-
-    def finish(self, obj):
-        self._emit.line(obj, '._on_loaded()')
-
-
-def _decoder(cls):
-    RESERVED = ('classes', 'result', 'oid', 'json_val', 'k', '__Placeholder')
-    emit = cg.CodeEmitter()
-    ocs_class = getattr(cls, 'CONSTRUCTION_STRATEGY', PlainObjectCreation)
-    ocs = ocs_class(emit)
-    sig = cg.Signature('$obj', 'seen', 'unfinished', *ocs.extra_args)
-    symtab = cg.SymbolTable(RESERVED, sig)
-
-    classes = {}
-
-    def conversion(expr, t):
-        if isinstance(t, (model.Integer, model.Float, model.String)):
-            return expr
-        if isinstance(t, model.List):
-            conv = yield from list_conv(t, expr)
-            return conv
-        if isinstance(t, model.Dict):
-            conv = yield from dict_conv(t, expr)
-            return conv
-        if isinstance(t, model.Maybe):
-            t_conv = yield from conversion(expr, t.t)
-            if t_conv == expr:
-                return expr
-            return '(None if {0} is None else ({1}))'.format(expr, t_conv)
-        else:
-            func = yield t
-            return sig.call(func, expr)
-
-    def list_conv(node, list_expr):
-        item_expr = yield from conversion('item', node.items)
-        return '[{} for item in {}]'.format(item_expr, list_expr)
-
-    def dict_conv(node, dict_expr):
-        key_expr = yield from conversion('k', node.keys)
-        val_expr = yield from conversion(str(dict_expr) + '[k]', node.values)
-        return '{{{}: {} for k in {}}}'.format(key_expr, val_expr, dict_expr)
-
-    def visit_toplevel(node):
-        if isinstance(node, Value):
-            yield from visit_value(node)
-            return
-        cls = ModelMeta.model_for(node)
-        struct = ModelMeta.struct_for(node)
-        classes[cls.__module__, cls.__name__] = cls
-        member_names = tuple(name for name, _ in struct.members)
-        loaders = []
-        for name, t in struct.members:
-            conv = yield from conversion('obj[{!r}]'.format(name), t)
-            loaders.append(conv)
-
-        emit.line('oid = obj["$id"]')
-        emit.line('if oid in unfinished: result = seen[oid]')
-        with emit.block('else'):
-            ocs.construct('result', cls)
-            emit.line('seen[oid] = result')
-        emit.line('unfinished.discard(oid)')
-        emit.line('result.object_id = oid')
-        ocs.add_attributes('result', zip(member_names, loaders))
-        ocs.finish('result')
-        emit.line('return result')
-
-    def visit_ref(node):
-        cls = ModelMeta.model_for(node)
-        struct_node = ModelMeta.struct_for(node)
-        classes[cls.__module__, cls.__name__] = cls
-        emit.line('oid = obj')
-        emit.line(
-            'assert isinstance(oid, str), repr(oid) + " is not an ID"'
-        )
-        emit.line('if oid in seen: return seen[oid]')
-        ocs.construct('obj', cls)
-        emit.line('seen[oid] = obj')
-        emit.line('unfinished.add(oid)')
-        emit.line('return obj')
-
-    def visit_value(node):
-        cls = ModelMeta.model_for(node)
-        struct = ModelMeta.struct_for(node)
-        classes[cls.__module__, cls.__name__] = cls
-        member_names = tuple(name for name, _ in struct.members)
-        loaders = []
-        for name, t in struct.members:
-            conv = yield from conversion('obj[{!r}]'.format(name), t)
-            loaders.append(conv)
-
-        ocs.construct('result', cls)
-        ocs.add_attributes('result', zip(member_names, loaders))
-        ocs.finish('result')
-        emit.line('return result')
-
-    visit_integer = visit_string = visit_float = NotImplemented
-    visit_list = visit_dict = NotImplemented
-    visit_struct = visit_maybe = NotImplemented
-
-    def visit_union(node):
-        with emit.block('if isinstance(obj, str)'):
-            emit.line('return __Placeholder()')
-        for tag, t in node.alternatives:
-            with emit.block('if obj["$type"] == ', repr(tag)):
-                conv = yield from conversion('obj', t)
-                emit.line('return ', conv)
-
-    code_ns = {'classes': classes, '__Placeholder': _Placeholder}
-
-    gen = cg.generate_code(
-        locals(), typeident, sig, emit, symtab,
-        entry_point=visit_toplevel, code_ns=code_ns
-    )
-    return gen(wrap_model(cls))
-
-
 #TODO replace this API with something better
 class YAMLReader:
     def __init__(self, model):
         self._mcls = type(model)
         self._cls = model
-        self._decoder = _decoder(model)
+        self._decoder = decode.make_decoder(model)
 
     def decode(self, val, namespace=None, validator_ns=None, source=None):
         results, errors = self.decode_many(