Commits

Anonymous committed 1f7f1db

parse/dump nbt w00p
yaml dumping

Comments (0)

Files changed (7)

nbt2yaml/__init__.py

-from parse import parse_nbt
+from parse import parse_nbt, dump_nbt
 from yamlgen import to_yaml

nbt2yaml/parse.py

 
     def __init__(self, name, id):
         self.name = name
+        self.id = id
         Tag._tags[id] = self
 
     @classmethod
     def from_stream(cls, stream):
         return cls._tags[struct.unpack('>b', stream.read(1))[0]]
 
+    def to_stream(self, stream):
+        stream.write(struct.pack('>b', self.id))
+
     def parse(self, stream):
         name = TAG_String._parse_impl(stream)
         data = self._parse_impl(stream)
         return Tag._tuple(self, name, data)
 
-    def _parse_impl(self):
+    def dump(self, element, stream):
+        type_, name, data = element
+        type_.to_stream(stream)
+        TAG_String._dump_impl(name, stream)
+        self._dump_impl(data, stream)
+
+    def _parse_impl(self, stream):
+        raise NotImplementedError()
+
+    def _dump_impl(self, data, stream):
         raise NotImplementedError()
 
     def __repr__(self):
     def _parse_impl(self, stream):
         return struct.unpack(">" + self.format, stream.read(self.size))[0]
 
+    def _dump_impl(self, data, stream):
+        stream.write(
+            struct.pack(">" + self.format, data)
+        )
+
 class EndTag(Tag):
     ""
 
             data = data.decode(self.encoding)
         return data
 
+    def _dump_impl(self, data, stream):
+        if self.encoding:
+            data = data.encode(self.encoding)
+        _dump_length(self.length_tag, len(data), stream)
+        stream.write(data)
+
 class ListTag(Tag):
     def _parse_impl(self, stream):
         element_type = Tag.from_stream(stream)
         length = _data_length(TAG_Int, stream)
 
-        return [
+        return (element_type, [
             element_type._parse_impl(stream) for i in xrange(length)
-        ]
+        ])
+
+    def _dump_impl(self, data, stream):
+        element_type, data = data
+        element_type.to_stream(stream)
+        _dump_length(TAG_Int, len(data), stream)
+        for elem in data:
+            element_type._dump_impl(elem, stream)
 
 class CompoundTag(Tag):
     def _parse_impl(self, stream):
             data.append(c_type_.parse(stream))
         return data
 
+    def _dump_impl(self, data, stream):
+        for elem in data:
+            elem[0].dump(elem, stream)
+        TAG_End.to_stream(stream)
+
 TAG_End = EndTag('end', 0)
 TAG_Byte = FixedTag('byte', 1, 1, 'b')
 TAG_Short = FixedTag('short', 2, 2, 'h')
 def _data_length(length_type, stream):
     return struct.unpack(">" + length_type.format, stream.read(length_type.size))[0]
 
+def _dump_length(length_type, length, stream):
+    stream.write(
+        struct.pack(">" + length_type.format, length)
+    )
 def parse_nbt(stream, gzipped=True):
     if gzipped:
         stream = gzip.GzipFile(fileobj=stream)
     type_ = Tag.from_stream(stream)
     return type_.parse(stream)
 
+def dump_nbt(nbt, stream, gzipped=True):
+    type_ = nbt[0]
+    type_.dump(nbt, stream)

nbt2yaml/yamlgen.py

 import yaml
 from nbt2yaml import parse
 
-class YamlSerialize(object):
-    def __init__(self, struct):
-        self.tag, self.name, self.data = struct
+class ForceType(object):
+    """Represent a data value with an explicit type.
+    
+    This is used to output 'short', 'long', 'double', 'byte'
+    explicitly, so that we can differentiate on the
+    yaml parsing side what specific NBT form to use.
+    
+    """
+    def __init__(self, type_, value):
+        self.type = type_
+        self.value = value
 
+def _type_representer(dumper, struct):
+    return dumper.represent_scalar(u'!%s' % struct.type, repr(struct.value), style='""')
 
-def _tag_representer(dumper, struct):
-    tag, name, data = struct.tag, struct.name, struct.data
-    if tag is parse.TAG_Compound:
-        data = [YamlSerialize(elem) for elem in data]
-    elif tag is parse.TAG_String:
-        data = data.encode('utf-8')
+yaml.add_representer(ForceType, _type_representer)
 
-    return dumper.represent_mapping(
-        u'!%s' % repr(tag), {
-            name.encode('utf-8'):data
-        },
-    )
+def yaml_serialize(struct):
+    tag, name, data = struct.type, struct.name, struct.data
+    name = name.encode('utf-8')
+    return {name:_value_as_yaml(tag, data)}
 
+def _value_as_yaml(type_, value):
+    if type_ is parse.TAG_Compound:
+        return [yaml_serialize(s) for s in value]
+    elif type_ is parse.TAG_String:
+        return value.encode('utf-8')
+    elif type_ in (parse.TAG_Short, parse.TAG_Long, parse.TAG_Double, parse.TAG_Byte):
+        return ForceType(type_.name, value)
+    elif type_ is parse.TAG_List:
+        element_type, data = value
+        return [_value_as_yaml(element_type, d) for d in data]
+    else:
+        return value
 
-yaml.add_representer(YamlSerialize, _tag_representer)
-
-def to_yaml(struct):
-    print yaml.dump(YamlSerialize(struct))
+def to_yaml(struct, canonical=False, default_flow_style=False):
+    return yaml.dump(
+                yaml_serialize(struct), 
+                default_flow_style=default_flow_style, 
+                canonical=canonical)

tests/__init__.py

 import os
 
 def datafile(name):
-    return open(os.path.join(os.path.dirname(__file__), 'files', name))
+    return open(os.path.join(os.path.dirname(__file__), 'files', name))
+
+def eq_(a, b):
+    assert a == b, "%r != %r" % (a, b)

tests/test_dump_nbt.py

+import unittest
+from nbt2yaml import parse_nbt, dump_nbt
+from tests import datafile, eq_
+import gzip
+import StringIO
+
+class DumpNBTTest(unittest.TestCase):
+    def _assert_data(self, fname):
+        unzipped_data = gzip.GzipFile(fileobj=datafile(fname)).read()
+        parsed = parse_nbt(datafile(fname))
+        out = StringIO.StringIO()
+        dump_nbt(parsed, out)
+        eq_(unzipped_data, out.getvalue())
+
+    def test_basic(self):
+        self._assert_data("test.nbt")
+
+    def test_large(self):
+        self._assert_data("bigtest.nbt")

tests/test_parse.py

-import unittest
-from nbt2yaml import parse_nbt
-from tests import datafile
-
-class ParseNBTTest(unittest.TestCase):
-    def test_basic(self):
-        print parse_nbt(datafile("test.nbt"))
-
-    def test_large(self):
-        print parse_nbt(datafile("bigtest.nbt"))

tests/test_parse_nbt.py

+import unittest
+from nbt2yaml import parse_nbt
+from tests import datafile
+
+class ParseNBTTest(unittest.TestCase):
+    def test_basic(self):
+        print parse_nbt(datafile("test.nbt"))
+
+    def test_large(self):
+        print parse_nbt(datafile("bigtest.nbt"))