pyyaml / tests / test_emitter.py

import test_appliance, sys, StringIO

from yaml import *
import yaml

class TestEmitter(test_appliance.TestAppliance):

    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
        self._testEmitter(test_name, data_filename)

    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
        self._testEmitter(test_name, canonical_filename, False)

    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
        self._testEmitter(test_name, canonical_filename, True)

    def _testEmitter(self, test_name, filename, canonical=None):
        events = list(iter(Parser(Scanner(Reader(file(filename, 'rb'))))))
        if canonical is not None:
            events[0].canonical = canonical
        #self._dump(filename, events)
        writer = StringIO.StringIO()
        emitter = Emitter(writer)
        for event in events:
            emitter.emit(event)
        data = writer.getvalue()
        new_events = list(parse(data))
        for event, new_event in zip(events, new_events):
            self.failUnlessEqual(event.__class__, new_event.__class__)
            if isinstance(event, NodeEvent):
                self.failUnlessEqual(event.anchor, new_event.anchor)
            if isinstance(event, CollectionStartEvent):
                self.failUnlessEqual(event.tag, new_event.tag)
            if isinstance(event, ScalarEvent):
                #self.failUnlessEqual(event.implicit, new_event.implicit)
                if not event.implicit and not new_event.implicit:
                    self.failUnlessEqual(event.tag, new_event.tag)
                self.failUnlessEqual(event.value, new_event.value)

    def _dump(self, filename, events):
        writer = sys.stdout
        emitter = Emitter(writer)
        print "="*30
        print "ORIGINAL DOCUMENT:"
        print file(filename, 'rb').read()
        print '-'*30
        print "EMITTED DOCUMENT:"
        for event in events:
            emitter.emit(event)
        
TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
#TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
#TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')

class EventsConstructor(Constructor):

    def construct_event(self, node):
        if isinstance(node, ScalarNode):
            mapping = {}
        else:
            mapping = self.construct_mapping(node)
        class_name = str(node.tag[1:])+'Event'
        if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
            mapping.setdefault('anchor', None)
        if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
            mapping.setdefault('tag', None)
        if class_name == 'ScalarEvent':
            mapping.setdefault('value', '')
        value = getattr(yaml, class_name)(**mapping)
        return value

EventsConstructor.add_constructor(None, EventsConstructor.construct_event)

class TestEmitterEvents(test_appliance.TestAppliance):

    def _testEmitterEvents(self, test_name, events_filename):
        events = list(load_document(file(events_filename, 'rb'), Constructor=EventsConstructor))
        #self._dump(events_filename, events)
        writer = StringIO.StringIO()
        emitter = Emitter(writer)
        for event in events:
            emitter.emit(event)
        data = writer.getvalue()
        new_events = list(parse(data))
        self.failUnlessEqual(len(events), len(new_events))
        for event, new_event in zip(events, new_events):
            self.failUnlessEqual(event.__class__, new_event.__class__)
            if isinstance(event, NodeEvent):
                self.failUnlessEqual(event.anchor, new_event.anchor)
            if isinstance(event, CollectionStartEvent):
                self.failUnlessEqual(event.tag, new_event.tag)
            if isinstance(event, ScalarEvent):
                self.failUnless(event.implicit == new_event.implicit
                        or event.tag == new_event.tag)
                self.failUnlessEqual(event.value, new_event.value)

    def _dump(self, events_filename, events):
        writer = sys.stdout
        emitter = Emitter(writer)
        print "="*30
        print "EVENTS:"
        print file(events_filename, 'rb').read()
        print '-'*30
        print "OUTPUT:"
        for event in events:
            emitter.emit(event)
        
TestEmitterEvents.add_tests('testEmitterEvents', '.events')
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.