Source

pyyaml / tests / test_emitter.py

Diff from to

tests/test_emitter.py

 
-import test_appliance, sys, StringIO
-
-from yaml import *
 import yaml
 
-class TestEmitter(test_appliance.TestAppliance):
+def _compare_events(events1, events2):
+    assert len(events1) == len(events2), (events1, events2)
+    for event1, event2 in zip(events1, events2):
+        assert event1.__class__ == event2.__class__, (event1, event2)
+        if isinstance(event1, yaml.NodeEvent):
+            assert event1.anchor == event2.anchor, (event1, event2)
+        if isinstance(event1, yaml.CollectionStartEvent):
+            assert event1.tag == event2.tag, (event1, event2)
+        if isinstance(event1, yaml.ScalarEvent):
+            if True not in event1.implicit+event2.implicit:
+                assert event1.tag == event2.tag, (event1, event2)
+            assert event1.value == event2.value, (event1, event2)
 
-    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
-        self._testEmitter(test_name, data_filename)
+def test_emitter_on_data(data_filename, canonical_filename, verbose=False):
+    events = list(yaml.parse(open(data_filename, 'rb')))
+    output = yaml.emit(events)
+    if verbose:
+        print "OUTPUT:"
+        print output
+    new_events = list(yaml.parse(output))
+    _compare_events(events, new_events)
 
-    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
-        self._testEmitter(test_name, canonical_filename, False)
+test_emitter_on_data.unittest = ['.data', '.canonical']
 
-    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
-        self._testEmitter(test_name, canonical_filename, True)
+def test_emitter_on_canonical(canonical_filename, verbose=False):
+    events = list(yaml.parse(open(canonical_filename, 'rb')))
+    for canonical in [False, True]:
+        output = yaml.emit(events, canonical=canonical)
+        if verbose:
+            print "OUTPUT (canonical=%s):" % canonical
+            print output
+        new_events = list(yaml.parse(output))
+        _compare_events(events, new_events)
 
-    def _testEmitter(self, test_name, filename, canonical=None):
-        events = list(parse(file(filename, 'rb')))
-        #self._dump(filename, events, canonical)
-        stream = StringIO.StringIO()
-        emit(events, stream, canonical=canonical)
-        data = stream.getvalue()
-        new_events = list(parse(data))
-        for event, new_event in zip(events, new_events):
-            self.failUnlessEqual(event.__class__, new_event.__class__)
-            if isinstance(event, NodeEvent):
-                self.failUnlessEqual(event.anchor, new_event.anchor)
-            if isinstance(event, CollectionStartEvent):
-                self.failUnlessEqual(event.tag, new_event.tag)
-            if isinstance(event, ScalarEvent):
-                #self.failUnlessEqual(event.implicit, new_event.implicit)
-                if True not in event.implicit+new_event.implicit:
-                    self.failUnlessEqual(event.tag, new_event.tag)
-                self.failUnlessEqual(event.value, new_event.value)
+test_emitter_on_canonical.unittest = ['.canonical']
 
-    def _testEmitterStyles(self, test_name, canonical_filename, data_filename):
-        for filename in [canonical_filename, data_filename]:
-            events = list(parse(file(filename, 'rb')))
-            for flow_style in [False, True]:
-                for style in ['|', '>', '"', '\'', '']:
-                    styled_events = []
-                    for event in events:
-                        if isinstance(event, ScalarEvent):
-                            event = ScalarEvent(event.anchor, event.tag,
-                                    event.implicit, event.value, style=style)
-                        elif isinstance(event, SequenceStartEvent):
-                            event = SequenceStartEvent(event.anchor, event.tag,
-                                    event.implicit, flow_style=flow_style)
-                        elif isinstance(event, MappingStartEvent):
-                            event = MappingStartEvent(event.anchor, event.tag,
-                                    event.implicit, flow_style=flow_style)
-                        styled_events.append(event)
-                    stream = StringIO.StringIO()
-                    emit(styled_events, stream)
-                    data = stream.getvalue()
-                    #print data
-                    new_events = list(parse(data))
-                    for event, new_event in zip(events, new_events):
-                        self.failUnlessEqual(event.__class__, new_event.__class__)
-                        if isinstance(event, NodeEvent):
-                            self.failUnlessEqual(event.anchor, new_event.anchor)
-                        if isinstance(event, CollectionStartEvent):
-                            self.failUnlessEqual(event.tag, new_event.tag)
-                        if isinstance(event, ScalarEvent):
-                            #self.failUnlessEqual(event.implicit, new_event.implicit)
-                            if True not in event.implicit+new_event.implicit:
-                                self.failUnlessEqual(event.tag, new_event.tag)
-                            self.failUnlessEqual(event.value, new_event.value)
+def test_emitter_styles(data_filename, canonical_filename, verbose=False):
+    for filename in [data_filename, canonical_filename]:
+        events = list(yaml.parse(open(filename, 'rb')))
+        for flow_style in [False, True]:
+            for style in ['|', '>', '"', '\'', '']:
+                styled_events = []
+                for event in events:
+                    if isinstance(event, yaml.ScalarEvent):
+                        event = yaml.ScalarEvent(event.anchor, event.tag,
+                                event.implicit, event.value, style=style)
+                    elif isinstance(event, yaml.SequenceStartEvent):
+                        event = yaml.SequenceStartEvent(event.anchor, event.tag,
+                                event.implicit, flow_style=flow_style)
+                    elif isinstance(event, yaml.MappingStartEvent):
+                        event = yaml.MappingStartEvent(event.anchor, event.tag,
+                                event.implicit, flow_style=flow_style)
+                    styled_events.append(event)
+                output = yaml.emit(styled_events)
+                if verbose:
+                    print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style)
+                    print output
+                new_events = list(yaml.parse(output))
+                _compare_events(events, new_events)
 
+test_emitter_styles.unittest = ['.data', '.canonical']
 
-    def _dump(self, filename, events, canonical):
-        print "="*30
-        print "ORIGINAL DOCUMENT:"
-        print file(filename, 'rb').read()
-        print '-'*30
-        print "EMITTED DOCUMENT:"
-        emit(events, sys.stdout, canonical=canonical)
-        
-TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
-TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
-TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
-TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data')
-
-class EventsLoader(Loader):
+class EventsLoader(yaml.Loader):
 
     def construct_event(self, node):
-        if isinstance(node, ScalarNode):
+        if isinstance(node, yaml.ScalarNode):
             mapping = {}
         else:
             mapping = self.construct_mapping(node)
 
 EventsLoader.add_constructor(None, EventsLoader.construct_event)
 
-class TestEmitterEvents(test_appliance.TestAppliance):
+def test_emitter_events(events_filename, verbose=False):
+    events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader))
+    output = yaml.emit(events)
+    if verbose:
+        print "OUTPUT:"
+        print output
+    new_events = list(yaml.parse(output))
+    _compare_events(events, new_events)
 
-    def _testEmitterEvents(self, test_name, events_filename):
-        events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
-        #self._dump(events_filename, events)
-        stream = StringIO.StringIO()
-        emit(events, stream)
-        data = stream.getvalue()
-        new_events = list(parse(data))
-        self.failUnlessEqual(len(events), len(new_events))
-        for event, new_event in zip(events, new_events):
-            self.failUnlessEqual(event.__class__, new_event.__class__)
-            if isinstance(event, NodeEvent):
-                self.failUnlessEqual(event.anchor, new_event.anchor)
-            if isinstance(event, CollectionStartEvent):
-                self.failUnlessEqual(event.tag, new_event.tag)
-            if isinstance(event, ScalarEvent):
-                self.failUnless(event.implicit == new_event.implicit
-                        or event.tag == new_event.tag)
-                self.failUnlessEqual(event.value, new_event.value)
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
 
-    def _dump(self, events_filename, events):
-        print "="*30
-        print "EVENTS:"
-        print file(events_filename, 'rb').read()
-        print '-'*30
-        print "OUTPUT:"
-        emit(events, sys.stdout)
-        
-TestEmitterEvents.add_tests('testEmitterEvents', '.events')
-