Kirill Simonov avatar Kirill Simonov committed 659ccb6

Refactored the test suite; updated include and library paths in setup.cfg.

Comments (0)

Files changed (34)

 include README LICENSE setup.py
 recursive-include examples *.py *.cfg *.yaml
-#recursive-include tests *.py
-#recursive-include tests/data *
+recursive-include tests *.py
+recursive-include tests/data *
 testext: buildext
 	${PYTHON} tests/test_build_ext.py ${TEST}
 
+testall:
+	${PYTHON} setup.py test
+
 dist:
 	${PYTHON} setup.py --with-libyaml sdist --formats=zip,gztar
 
 
 # The following options are used to build PyYAML Windows installer
 # for Python 2.3 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vc6/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vc6/output/release/lib
 #define=YAML_DECLARE_STATIC
 
 # The following options are used to build PyYAML Windows installer
 # for Python 2.4 and Python 2.5 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vs2003/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2003/output/release/lib
 #define=YAML_DECLARE_STATIC
 
 # The following options are used to build PyYAML Windows installer
 # for Python 2.6 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vs2008/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2008/output/release/lib
 #define=YAML_DECLARE_STATIC
+
         return spec_file
 
 
+class test(Command):
+
+    user_options = []
+
+    def initialize_options(self):
+        pass
+
+    def finalize_options(self):
+        pass
+
+    def run(self):
+        build_cmd = self.get_finalized_command('build')
+        build_cmd.run()
+        sys.path.insert(0, build_cmd.build_lib)
+        sys.path.insert(0, 'tests')
+        import test_all
+        test_all.main([])
+
+
 if __name__ == '__main__':
 
     setup(
         cmdclass={
             'build_ext': build_ext,
             'bdist_rpm': bdist_rpm,
+            'test': test,
         },
     )
 

tests/canonical.py

+
+import yaml, yaml.composer, yaml.constructor, yaml.resolver
+
+class CanonicalError(yaml.YAMLError):
+    pass
+
+class CanonicalScanner:
+
+    def __init__(self, data):
+        try:
+            self.data = unicode(data, 'utf-8')+u'\0'
+        except UnicodeDecodeError:
+            raise CanonicalError("utf-8 stream is expected")
+        self.index = 0
+        self.tokens = []
+        self.scanned = False
+
+    def check_token(self, *choices):
+        if not self.scanned:
+            self.scan()
+        if self.tokens:
+            if not choices:
+                return True
+            for choice in choices:
+                if isinstance(self.tokens[0], choice):
+                    return True
+        return False
+
+    def peek_token(self):
+        if not self.scanned:
+            self.scan()
+        if self.tokens:
+            return self.tokens[0]
+
+    def get_token(self, choice=None):
+        if not self.scanned:
+            self.scan()
+        token = self.tokens.pop(0)
+        if choice and not isinstance(token, choice):
+            raise CanonicalError("unexpected token "+repr(token))
+        return token
+
+    def get_token_value(self):
+        token = self.get_token()
+        return token.value
+
+    def scan(self):
+        self.tokens.append(yaml.StreamStartToken(None, None))
+        while True:
+            self.find_token()
+            ch = self.data[self.index]
+            if ch == u'\0':
+                self.tokens.append(yaml.StreamEndToken(None, None))
+                break
+            elif ch == u'%':
+                self.tokens.append(self.scan_directive())
+            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
+                self.index += 3
+                self.tokens.append(yaml.DocumentStartToken(None, None))
+            elif ch == u'[':
+                self.index += 1
+                self.tokens.append(yaml.FlowSequenceStartToken(None, None))
+            elif ch == u'{':
+                self.index += 1
+                self.tokens.append(yaml.FlowMappingStartToken(None, None))
+            elif ch == u']':
+                self.index += 1
+                self.tokens.append(yaml.FlowSequenceEndToken(None, None))
+            elif ch == u'}':
+                self.index += 1
+                self.tokens.append(yaml.FlowMappingEndToken(None, None))
+            elif ch == u'?':
+                self.index += 1
+                self.tokens.append(yaml.KeyToken(None, None))
+            elif ch == u':':
+                self.index += 1
+                self.tokens.append(yaml.ValueToken(None, None))
+            elif ch == u',':
+                self.index += 1
+                self.tokens.append(yaml.FlowEntryToken(None, None))
+            elif ch == u'*' or ch == u'&':
+                self.tokens.append(self.scan_alias())
+            elif ch == u'!':
+                self.tokens.append(self.scan_tag())
+            elif ch == u'"':
+                self.tokens.append(self.scan_scalar())
+            else:
+                raise CanonicalError("invalid token")
+        self.scanned = True
+
+    DIRECTIVE = u'%YAML 1.1'
+
+    def scan_directive(self):
+        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
+                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
+            self.index += len(self.DIRECTIVE)
+            return yaml.DirectiveToken('YAML', (1, 1), None, None)
+        else:
+            raise CanonicalError("invalid directive")
+
+    def scan_alias(self):
+        if self.data[self.index] == u'*':
+            TokenClass = yaml.AliasToken
+        else:
+            TokenClass = yaml.AnchorToken
+        self.index += 1
+        start = self.index
+        while self.data[self.index] not in u', \n\0':
+            self.index += 1
+        value = self.data[start:self.index]
+        return TokenClass(value, None, None)
+
+    def scan_tag(self):
+        self.index += 1
+        start = self.index
+        while self.data[self.index] not in u' \n\0':
+            self.index += 1
+        value = self.data[start:self.index]
+        if not value:
+            value = u'!'
+        elif value[0] == u'!':
+            value = 'tag:yaml.org,2002:'+value[1:]
+        elif value[0] == u'<' and value[-1] == u'>':
+            value = value[1:-1]
+        else:
+            value = u'!'+value
+        return yaml.TagToken(value, None, None)
+
+    QUOTE_CODES = {
+        'x': 2,
+        'u': 4,
+        'U': 8,
+    }
+
+    QUOTE_REPLACES = {
+        u'\\': u'\\',
+        u'\"': u'\"',
+        u' ': u' ',
+        u'a': u'\x07',
+        u'b': u'\x08',
+        u'e': u'\x1B',
+        u'f': u'\x0C',
+        u'n': u'\x0A',
+        u'r': u'\x0D',
+        u't': u'\x09',
+        u'v': u'\x0B',
+        u'N': u'\u0085',
+        u'L': u'\u2028',
+        u'P': u'\u2029',
+        u'_': u'_',
+        u'0': u'\x00',
+
+    }
+
+    def scan_scalar(self):
+        self.index += 1
+        chunks = []
+        start = self.index
+        ignore_spaces = False
+        while self.data[self.index] != u'"':
+            if self.data[self.index] == u'\\':
+                ignore_spaces = False
+                chunks.append(self.data[start:self.index])
+                self.index += 1
+                ch = self.data[self.index]
+                self.index += 1
+                if ch == u'\n':
+                    ignore_spaces = True
+                elif ch in self.QUOTE_CODES:
+                    length = self.QUOTE_CODES[ch]
+                    code = int(self.data[self.index:self.index+length], 16)
+                    chunks.append(unichr(code))
+                    self.index += length
+                else:
+                    if ch not in self.QUOTE_REPLACES:
+                        raise CanonicalError("invalid escape code")
+                    chunks.append(self.QUOTE_REPLACES[ch])
+                start = self.index
+            elif self.data[self.index] == u'\n':
+                chunks.append(self.data[start:self.index])
+                chunks.append(u' ')
+                self.index += 1
+                start = self.index
+                ignore_spaces = True
+            elif ignore_spaces and self.data[self.index] == u' ':
+                self.index += 1
+                start = self.index
+            else:
+                ignore_spaces = False
+                self.index += 1
+        chunks.append(self.data[start:self.index])
+        self.index += 1
+        return yaml.ScalarToken(u''.join(chunks), False, None, None)
+
+    def find_token(self):
+        found = False
+        while not found:
+            while self.data[self.index] in u' \t':
+                self.index += 1
+            if self.data[self.index] == u'#':
+                while self.data[self.index] != u'\n':
+                    self.index += 1
+            if self.data[self.index] == u'\n':
+                self.index += 1
+            else:
+                found = True
+
+class CanonicalParser:
+
+    def __init__(self):
+        self.events = []
+        self.parsed = False
+
+    # stream: STREAM-START document* STREAM-END
+    def parse_stream(self):
+        self.get_token(yaml.StreamStartToken)
+        self.events.append(yaml.StreamStartEvent(None, None))
+        while not self.check_token(yaml.StreamEndToken):
+            if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken):
+                self.parse_document()
+            else:
+                raise CanonicalError("document is expected, got "+repr(self.tokens[0]))
+        self.get_token(yaml.StreamEndToken)
+        self.events.append(yaml.StreamEndEvent(None, None))
+
+    # document: DIRECTIVE? DOCUMENT-START node
+    def parse_document(self):
+        node = None
+        if self.check_token(yaml.DirectiveToken):
+            self.get_token(yaml.DirectiveToken)
+        self.get_token(yaml.DocumentStartToken)
+        self.events.append(yaml.DocumentStartEvent(None, None))
+        self.parse_node()
+        self.events.append(yaml.DocumentEndEvent(None, None))
+
+    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
+    def parse_node(self):
+        if self.check_token(yaml.AliasToken):
+            self.events.append(yaml.AliasEvent(self.get_token_value(), None, None))
+        else:
+            anchor = None
+            if self.check_token(yaml.AnchorToken):
+                anchor = self.get_token_value()
+            tag = None
+            if self.check_token(yaml.TagToken):
+                tag = self.get_token_value()
+            if self.check_token(yaml.ScalarToken):
+                self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
+            elif self.check_token(yaml.FlowSequenceStartToken):
+                self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None))
+                self.parse_sequence()
+            elif self.check_token(yaml.FlowMappingStartToken):
+                self.events.append(yaml.MappingStartEvent(anchor, tag, None, None))
+                self.parse_mapping()
+            else:
+                raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0]))
+
+    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
+    def parse_sequence(self):
+        self.get_token(yaml.FlowSequenceStartToken)
+        if not self.check_token(yaml.FlowSequenceEndToken):
+            self.parse_node()
+            while not self.check_token(yaml.FlowSequenceEndToken):
+                self.get_token(yaml.FlowEntryToken)
+                if not self.check_token(yaml.FlowSequenceEndToken):
+                    self.parse_node()
+        self.get_token(yaml.FlowSequenceEndToken)
+        self.events.append(yaml.SequenceEndEvent(None, None))
+
+    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
+    def parse_mapping(self):
+        self.get_token(yaml.FlowMappingStartToken)
+        if not self.check_token(yaml.FlowMappingEndToken):
+            self.parse_map_entry()
+            while not self.check_token(yaml.FlowMappingEndToken):
+                self.get_token(yaml.FlowEntryToken)
+                if not self.check_token(yaml.FlowMappingEndToken):
+                    self.parse_map_entry()
+        self.get_token(yaml.FlowMappingEndToken)
+        self.events.append(yaml.MappingEndEvent(None, None))
+
+    # map_entry: KEY node VALUE node
+    def parse_map_entry(self):
+        self.get_token(yaml.KeyToken)
+        self.parse_node()
+        self.get_token(yaml.ValueToken)
+        self.parse_node()
+
+    def parse(self):
+        self.parse_stream()
+        self.parsed = True
+
+    def get_event(self):
+        if not self.parsed:
+            self.parse()
+        return self.events.pop(0)
+
+    def check_event(self, *choices):
+        if not self.parsed:
+            self.parse()
+        if self.events:
+            if not choices:
+                return True
+            for choice in choices:
+                if isinstance(self.events[0], choice):
+                    return True
+        return False
+
+    def peek_event(self):
+        if not self.parsed:
+            self.parse()
+        return self.events[0]
+
+class CanonicalLoader(CanonicalScanner, CanonicalParser,
+        yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver):
+
+    def __init__(self, stream):
+        if hasattr(stream, 'read'):
+            stream = stream.read()
+        CanonicalScanner.__init__(self, stream)
+        CanonicalParser.__init__(self)
+        yaml.composer.Composer.__init__(self)
+        yaml.constructor.Constructor.__init__(self)
+        yaml.resolver.Resolver.__init__(self)
+
+yaml.CanonicalLoader = CanonicalLoader
+
+def canonical_scan(stream):
+    return yaml.scan(stream, Loader=CanonicalLoader)
+
+yaml.canonical_scan = canonical_scan
+
+def canonical_parse(stream):
+    return yaml.parse(stream, Loader=CanonicalLoader)
+
+yaml.canonical_parse = canonical_parse
+
+def canonical_compose(stream):
+    return yaml.compose(stream, Loader=CanonicalLoader)
+
+yaml.canonical_compose = canonical_compose
+
+def canonical_compose_all(stream):
+    return yaml.compose_all(stream, Loader=CanonicalLoader)
+
+yaml.canonical_compose_all = canonical_compose_all
+
+def canonical_load(stream):
+    return yaml.load(stream, Loader=CanonicalLoader)
+
+yaml.canonical_load = canonical_load
+
+def canonical_load_all(stream):
+    return yaml.load_all(stream, Loader=CanonicalLoader)
+
+yaml.canonical_load_all = canonical_load_all
+

tests/data/construct-python-name-module.code

-[file, Loader, dump, abs, yaml.tokens]
+[file, yaml.Loader, yaml.dump, abs, yaml.tokens]
Add a comment to this file

tests/data/empty-document-bug.empty

Empty file added.

Add a comment to this file

tests/data/invalid-character.stream-error

Binary file modified.

tests/data/invalid-utf8-byte.stream-error

--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
 Invalid byte ('\xFF'): � <--
--------------------------------------------------------------------------------------------------------------------------------
+***************************************************************
Add a comment to this file

tests/data/odd-utf16.stream-error

Binary file modified.

tests/data/serializer-is-already-opened.dumper-error

-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
 dumper.open()
 dumper.open()

tests/data/serializer-is-closed-1.dumper-error

-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
 dumper.open()
 dumper.close()
 dumper.open()

tests/data/serializer-is-closed-2.dumper-error

-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
 dumper.open()
 dumper.close()
-dumper.serialize(ScalarNode(tag='!foo', value='bar'))
+dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))

tests/data/serializer-is-not-opened-1.dumper-error

-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
 dumper.close()

tests/data/serializer-is-not-opened-2.dumper-error

-dumper = Dumper(StringIO.StringIO())
-dumper.serialize(ScalarNode(tag='!foo', value='bar'))
+dumper = yaml.Dumper(StringIO.StringIO())
+dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))

tests/data/unknown.dumper-error

-safe_dump(object)
+yaml.safe_dump(object)

tests/test_all.py

 
-import unittest
+import sys, yaml, test_appliance
 
-def main():
-    import yaml
-    names = ['test_yaml']
-    if yaml.__libyaml__:
-        names.append('test_yaml_ext')
-    suite = unittest.defaultTestLoader.loadTestsFromNames(names)
-    runner = unittest.TextTestRunner()
-    runner.run(suite)
+def main(args=None):
+    collections = []
+    import test_yaml
+    collections.append(test_yaml)
+    if yaml.__with_libyaml__:
+        import test_yaml_ext
+        collections.append(test_yaml_ext)
+    test_appliance.run(collections, args)
 
 if __name__ == '__main__':
     main()

tests/test_appliance.py

 
-import unittest, os
+import sys, os, os.path, types, traceback, pprint
 
-from yaml import *
-from yaml.composer import *
-from yaml.constructor import *
-from yaml.resolver import *
+DATA = 'tests/data'
 
-class TestAppliance(unittest.TestCase):
+def find_test_functions(collections):
+    if not isinstance(collections, list):
+        collections = [collections]
+    functions = []
+    for collection in collections:
+        if not isinstance(collection, dict):
+            collection = vars(collection)
+        keys = collection.keys()
+        keys.sort()
+        for key in keys:
+            value = collection[key]
+            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+                functions.append(value)
+    return functions
 
-    DATA = 'tests/data'
-    SKIP_EXT = '.skip'
+def find_test_filenames(directory):
+    filenames = {}
+    for filename in os.listdir(directory):
+        if os.path.isfile(os.path.join(directory, filename)):
+            base, ext = os.path.splitext(filename)
+            filenames.setdefault(base, []).append(ext)
+    filenames = filenames.items()
+    filenames.sort()
+    return filenames
 
-    all_tests = {}
-    for filename in os.listdir(DATA):
-        if os.path.isfile(os.path.join(DATA, filename)):
-            root, ext = os.path.splitext(filename)
-            all_tests.setdefault(root, []).append(ext)
+def parse_arguments(args):
+    if args is None:
+        args = sys.argv[1:]
+    verbose = False
+    if '-v' in args:
+        verbose = True
+        args.remove('-v')
+    if '--verbose' in args:
+        verbose = True
+    if 'YAML_TEST_VERBOSE' in os.environ:
+        verbose = True
+    include_functions = []
+    if args:
+        include_functions.append(args.pop(0))
+    if 'YAML_TEST_FUNCTIONS' in os.environ:
+        include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
+    include_filenames = []
+    include_filenames.extend(args)
+    if 'YAML_TEST_FILENAMES' in os.environ:
+        include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
+    return include_functions, include_filenames, verbose
 
-    def add_tests(cls, method_name, *extensions):
-        for test in cls.all_tests:
-            available_extensions = cls.all_tests[test]
-            if cls.SKIP_EXT in available_extensions:
-                continue
-            for ext in extensions:
-                if ext not in available_extensions:
-                    break
-            else:
-                filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
-                def test_method(self, test=test, filenames=filenames):
-                    getattr(self, '_'+method_name)(test, *filenames)
-                test = test.replace('-', '_').replace('.', '_')
-                try:
-                    test_method.__name__ = '%s_%s' % (method_name, test)
-                except TypeError:
-                    import new
-                    test_method = new.function(test_method.func_code, test_method.func_globals,
-                            '%s_%s' % (method_name, test), test_method.func_defaults,
-                            test_method.func_closure)
-                setattr(cls, test_method.__name__, test_method)
-    add_tests = classmethod(add_tests)
+def execute(function, filenames, verbose):
+    if verbose:
+        sys.stdout.write('='*75+'\n')
+        sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames)))
+    try:
+        function(verbose=verbose, *filenames)
+    except Exception, exc:
+        info = sys.exc_info()
+        if isinstance(exc, AssertionError):
+            kind = 'FAILURE'
+        else:
+            kind = 'ERROR'
+        if verbose:
+            traceback.print_exc(limit=1, file=sys.stdout)
+        else:
+            sys.stdout.write(kind[0])
+            sys.stdout.flush()
+    else:
+        kind = 'SUCCESS'
+        info = None
+        if not verbose:
+            sys.stdout.write('.')
+    sys.stdout.flush()
+    return (function, filenames, kind, info)
 
-class Error(Exception):
-    pass
+def display(results, verbose):
+    if results and not verbose:
+        sys.stdout.write('\n')
+    total = len(results)
+    failures = 0
+    errors = 0
+    for function, filenames, kind, info in results:
+        if kind == 'SUCCESS':
+            continue
+        if kind == 'FAILURE':
+            failures += 1
+        if kind == 'ERROR':
+            errors += 1
+        sys.stdout.write('='*75+'\n')
+        sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind))
+        if kind == 'ERROR':
+            traceback.print_exception(file=sys.stdout, *info)
+        else:
+            sys.stdout.write('Traceback (most recent call last):\n')
+            traceback.print_tb(info[2], file=sys.stdout)
+            sys.stdout.write('%s: see below\n' % info[0].__name__)
+            sys.stdout.write('~'*75+'\n')
+            for arg in info[1].args:
+                pprint.pprint(arg, stream=sys.stdout, indent=2)
+        for filename in filenames:
+            sys.stdout.write('-'*75+'\n')
+            sys.stdout.write('%s:\n' % filename)
+            data = open(filename, 'rb').read()
+            sys.stdout.write(data)
+            if data and data[-1] != '\n':
+                sys.stdout.write('\n')
+    sys.stdout.write('='*75+'\n')
+    sys.stdout.write('TESTS: %s\n' % total)
+    if failures:
+        sys.stdout.write('FAILURES: %s\n' % failures)
+    if errors:
+        sys.stdout.write('ERRORS: %s\n' % errors)
 
-class CanonicalScanner:
+def run(collections, args=None):
+    test_functions = find_test_functions(collections)
+    test_filenames = find_test_filenames(DATA)
+    include_functions, include_filenames, verbose = parse_arguments(args)
+    results = []
+    for function in test_functions:
+        if include_functions and function.func_name not in include_functions:
+            continue
+        if function.unittest:
+            for base, exts in test_filenames:
+                if include_filenames and base not in include_filenames:
+                    continue
+                filenames = []
+                for ext in function.unittest:
+                    if ext not in exts:
+                        break
+                    filenames.append(os.path.join(DATA, base+ext))
+                else:
+                    skip_exts = getattr(function, 'skip', [])
+                    for skip_ext in skip_exts:
+                        if skip_ext in exts:
+                            break
+                    else:
+                        result = execute(function, filenames, verbose)
+                        results.append(result)
+        else:
+            result = execute(function, [], verbose)
+            results.append(result)
+    display(results, verbose=verbose)
 
-    def __init__(self, data):
-        self.data = unicode(data, 'utf-8')+u'\0'
-        self.index = 0
-        self.scan()
-
-    def check_token(self, *choices):
-        if self.tokens:
-            if not choices:
-                return True
-            for choice in choices:
-                if isinstance(self.tokens[0], choice):
-                    return True
-        return False
-
-    def peek_token(self):
-        if self.tokens:
-            return self.tokens[0]
-
-    def get_token(self, choice=None):
-        token = self.tokens.pop(0)
-        if choice and not isinstance(token, choice):
-            raise Error("unexpected token "+repr(token))
-        return token
-
-    def get_token_value(self):
-        token = self.get_token()
-        return token.value
-
-    def scan(self):
-        self.tokens = []
-        self.tokens.append(StreamStartToken(None, None))
-        while True:
-            self.find_token()
-            ch = self.data[self.index]
-            if ch == u'\0':
-                self.tokens.append(StreamEndToken(None, None))
-                break
-            elif ch == u'%':
-                self.tokens.append(self.scan_directive())
-            elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
-                self.index += 3
-                self.tokens.append(DocumentStartToken(None, None))
-            elif ch == u'[':
-                self.index += 1
-                self.tokens.append(FlowSequenceStartToken(None, None))
-            elif ch == u'{':
-                self.index += 1
-                self.tokens.append(FlowMappingStartToken(None, None))
-            elif ch == u']':
-                self.index += 1
-                self.tokens.append(FlowSequenceEndToken(None, None))
-            elif ch == u'}':
-                self.index += 1
-                self.tokens.append(FlowMappingEndToken(None, None))
-            elif ch == u'?':
-                self.index += 1
-                self.tokens.append(KeyToken(None, None))
-            elif ch == u':':
-                self.index += 1
-                self.tokens.append(ValueToken(None, None))
-            elif ch == u',':
-                self.index += 1
-                self.tokens.append(FlowEntryToken(None, None))
-            elif ch == u'*' or ch == u'&':
-                self.tokens.append(self.scan_alias())
-            elif ch == u'!':
-                self.tokens.append(self.scan_tag())
-            elif ch == u'"':
-                self.tokens.append(self.scan_scalar())
-            else:
-                raise Error("invalid token")
-
-    DIRECTIVE = u'%YAML 1.1'
-
-    def scan_directive(self):
-        if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
-                self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
-            self.index += len(self.DIRECTIVE)
-            return DirectiveToken('YAML', (1, 1), None, None)
-
-    def scan_alias(self):
-        if self.data[self.index] == u'*':
-            TokenClass = AliasToken
-        else:
-            TokenClass = AnchorToken
-        self.index += 1
-        start = self.index
-        while self.data[self.index] not in u', \n\0':
-            self.index += 1
-        value = self.data[start:self.index]
-        return TokenClass(value, None, None)
-
-    def scan_tag(self):
-        self.index += 1
-        start = self.index
-        while self.data[self.index] not in u' \n\0':
-            self.index += 1
-        value = self.data[start:self.index]
-        if value[0] == u'!':
-            value = 'tag:yaml.org,2002:'+value[1:]
-        elif value[0] == u'<' and value[-1] == u'>':
-            value = value[1:-1]
-        else:
-            value = u'!'+value
-        return TagToken(value, None, None)
-
-    QUOTE_CODES = {
-        'x': 2,
-        'u': 4,
-        'U': 8,
-    }
-
-    QUOTE_REPLACES = {
-        u'\\': u'\\',
-        u'\"': u'\"',
-        u' ': u' ',
-        u'a': u'\x07',
-        u'b': u'\x08',
-        u'e': u'\x1B',
-        u'f': u'\x0C',
-        u'n': u'\x0A',
-        u'r': u'\x0D',
-        u't': u'\x09',
-        u'v': u'\x0B',
-        u'N': u'\u0085',
-        u'L': u'\u2028',
-        u'P': u'\u2029',
-        u'_': u'_',
-        u'0': u'\x00',
-
-    }
-
-    def scan_scalar(self):
-        self.index += 1
-        chunks = []
-        start = self.index
-        ignore_spaces = False
-        while self.data[self.index] != u'"':
-            if self.data[self.index] == u'\\':
-                ignore_spaces = False
-                chunks.append(self.data[start:self.index])
-                self.index += 1
-                ch = self.data[self.index]
-                self.index += 1
-                if ch == u'\n':
-                    ignore_spaces = True
-                elif ch in self.QUOTE_CODES:
-                    length = self.QUOTE_CODES[ch]
-                    code = int(self.data[self.index:self.index+length], 16)
-                    chunks.append(unichr(code))
-                    self.index += length
-                else:
-                    chunks.append(self.QUOTE_REPLACES[ch])
-                start = self.index
-            elif self.data[self.index] == u'\n':
-                chunks.append(self.data[start:self.index])
-                chunks.append(u' ')
-                self.index += 1
-                start = self.index
-                ignore_spaces = True
-            elif ignore_spaces and self.data[self.index] == u' ':
-                self.index += 1
-                start = self.index
-            else:
-                ignore_spaces = False
-                self.index += 1
-        chunks.append(self.data[start:self.index])
-        self.index += 1
-        return ScalarToken(u''.join(chunks), False, None, None)
-
-    def find_token(self):
-        found = False
-        while not found:
-            while self.data[self.index] in u' \t':
-                self.index += 1
-            if self.data[self.index] == u'#':
-                while self.data[self.index] != u'\n':
-                    self.index += 1
-            if self.data[self.index] == u'\n':
-                self.index += 1
-            else:
-                found = True
-
-class CanonicalParser:
-
-    def __init__(self):
-        self.events = []
-        self.parse()
-
-    # stream: STREAM-START document* STREAM-END
-    def parse_stream(self):
-        self.get_token(StreamStartToken)
-        self.events.append(StreamStartEvent(None, None))
-        while not self.check_token(StreamEndToken):
-            if self.check_token(DirectiveToken, DocumentStartToken):
-                self.parse_document()
-            else:
-                raise Error("document is expected, got "+repr(self.tokens[self.index]))
-        self.get_token(StreamEndToken)
-        self.events.append(StreamEndEvent(None, None))
-
-    # document: DIRECTIVE? DOCUMENT-START node
-    def parse_document(self):
-        node = None
-        if self.check_token(DirectiveToken):
-            self.get_token(DirectiveToken)
-        self.get_token(DocumentStartToken)
-        self.events.append(DocumentStartEvent(None, None))
-        self.parse_node()
-        self.events.append(DocumentEndEvent(None, None))
-
-    # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
-    def parse_node(self):
-        if self.check_token(AliasToken):
-            self.events.append(AliasEvent(self.get_token_value(), None, None))
-        else:
-            anchor = None
-            if self.check_token(AnchorToken):
-                anchor = self.get_token_value()
-            tag = None
-            if self.check_token(TagToken):
-                tag = self.get_token_value()
-            if self.check_token(ScalarToken):
-                self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
-            elif self.check_token(FlowSequenceStartToken):
-                self.events.append(SequenceStartEvent(anchor, tag, None, None))
-                self.parse_sequence()
-            elif self.check_token(FlowMappingStartToken):
-                self.events.append(MappingStartEvent(anchor, tag, None, None))
-                self.parse_mapping()
-            else:
-                raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
-
-    # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
-    def parse_sequence(self):
-        self.get_token(FlowSequenceStartToken)
-        if not self.check_token(FlowSequenceEndToken):
-            self.parse_node()
-            while not self.check_token(FlowSequenceEndToken):
-                self.get_token(FlowEntryToken)
-                if not self.check_token(FlowSequenceEndToken):
-                    self.parse_node()
-        self.get_token(FlowSequenceEndToken)
-        self.events.append(SequenceEndEvent(None, None))
-
-    # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
-    def parse_mapping(self):
-        self.get_token(FlowMappingStartToken)
-        if not self.check_token(FlowMappingEndToken):
-            self.parse_map_entry()
-            while not self.check_token(FlowMappingEndToken):
-                self.get_token(FlowEntryToken)
-                if not self.check_token(FlowMappingEndToken):
-                    self.parse_map_entry()
-        self.get_token(FlowMappingEndToken)
-        self.events.append(MappingEndEvent(None, None))
-
-    # map_entry: KEY node VALUE node
-    def parse_map_entry(self):
-        self.get_token(KeyToken)
-        self.parse_node()
-        self.get_token(ValueToken)
-        self.parse_node()
-
-    def parse(self):
-        self.parse_stream()
-
-    def get_event(self):
-        return self.events.pop(0)
-
-    def check_event(self, *choices):
-        if self.events:
-            if not choices:
-                return True
-            for choice in choices:
-                if isinstance(self.events[0], choice):
-                    return True
-        return False
-
-    def peek_event(self):
-        return self.events[0]
-
-class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
-
-    def __init__(self, stream):
-        if hasattr(stream, 'read'):
-            stream = stream.read()
-        CanonicalScanner.__init__(self, stream)
-        CanonicalParser.__init__(self)
-        Composer.__init__(self)
-        Constructor.__init__(self)
-        Resolver.__init__(self)
-
-def canonical_scan(stream):
-    return scan(stream, Loader=CanonicalLoader)
-
-def canonical_parse(stream):
-    return parse(stream, Loader=CanonicalLoader)
-
-def canonical_compose(stream):
-    return compose(stream, Loader=CanonicalLoader)
-
-def canonical_compose_all(stream):
-    return compose_all(stream, Loader=CanonicalLoader)
-
-def canonical_load(stream):
-    return load(stream, Loader=CanonicalLoader)
-
-def canonical_load_all(stream):
-    return load_all(stream, Loader=CanonicalLoader)
-

tests/test_build.py

 
-def main():
+if __name__ == '__main__':
     import sys, os, distutils.util
     build_lib = 'build/lib'
     build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
     sys.path.insert(0, build_lib)
     sys.path.insert(0, build_lib_ext)
-    import test_yaml
-    test_yaml.main('test_yaml')
+    import test_yaml, test_appliance
+    test_appliance.run(test_yaml)
 
-if __name__ == '__main__':
-    main()
-

tests/test_build_ext.py

 
 
-def main():
+if __name__ == '__main__':
     import sys, os, distutils.util
     build_lib = 'build/lib'
     build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
     sys.path.insert(0, build_lib)
     sys.path.insert(0, build_lib_ext)
-    import test_yaml_ext
-    test_yaml_ext.main('test_yaml_ext')
+    import test_yaml_ext, test_appliance
+    test_appliance.run(test_yaml_ext)
 
-if __name__ == '__main__':
-    main()
-

tests/test_canonical.py

 
-import test_appliance
+import yaml, canonical
 
-class TestCanonicalAppliance(test_appliance.TestAppliance):
+def test_canonical_scanner(canonical_filename, verbose=False):
+    data = open(canonical_filename, 'rb').read()
+    tokens = list(yaml.canonical_scan(data))
+    assert tokens, tokens
+    if verbose:
+        for token in tokens:
+            print token
 
-    def _testCanonicalScanner(self, test_name, canonical_filename):
-        data = file(canonical_filename, 'rb').read()
-        tokens = list(test_appliance.canonical_scan(data))
-        #for token in tokens:
-        #    print token
+test_canonical_scanner.unittest = ['.canonical']
 
-    def _testCanonicalParser(self, test_name, canonical_filename):
-        data = file(canonical_filename, 'rb').read()
-        event = list(test_appliance.canonical_parse(data))
-        #for event in events:
-        #    print event
+def test_canonical_parser(canonical_filename, verbose=False):
+    data = open(canonical_filename, 'rb').read()
+    events = list(yaml.canonical_parse(data))
+    assert events, events
+    if verbose:
+        for event in events:
+            print event
 
-TestCanonicalAppliance.add_tests('testCanonicalScanner', '.canonical')
-TestCanonicalAppliance.add_tests('testCanonicalParser', '.canonical')
+test_canonical_parser.unittest = ['.canonical']
 
+def test_canonical_error(data_filename, canonical_filename, verbose=False):
+    data = open(data_filename, 'rb').read()
+    try:
+        output = list(yaml.canonical_load_all(data))
+    except yaml.YAMLError, exc:
+        if verbose:
+            print exc
+    else:
+        raise AssertionError("expected an exception")
+
+test_canonical_error.unittest = ['.data', '.canonical']
+test_canonical_error.skip = ['.empty']
+
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
+

tests/test_constructor.py

 
-import test_appliance
+import yaml
+import pprint
 
 import datetime
 try:
     set
 except NameError:
     from sets import Set as set
-
-from yaml import *
-
 import yaml.tokens
 
-class MyLoader(Loader):
-    pass
-class MyDumper(Dumper):
-    pass
-
-class MyTestClass1:
-
-    def __init__(self, x, y=0, z=0):
-        self.x = x
-        self.y = y
-        self.z = z
-
-    def __eq__(self, other):
-        if isinstance(other, MyTestClass1):
-            return self.__class__, self.__dict__ == other.__class__, other.__dict__
-        else:
-            return False
-
-def construct1(constructor, node):
-    mapping = constructor.construct_mapping(node)
-    return MyTestClass1(**mapping)
-def represent1(representer, native):
-    return representer.represent_mapping("!tag1", native.__dict__)
-
-add_constructor("!tag1", construct1, Loader=MyLoader)
-add_representer(MyTestClass1, represent1, Dumper=MyDumper)
-
-class MyTestClass2(MyTestClass1, YAMLObject):
-
-    yaml_loader = MyLoader
-    yaml_dumper = MyDumper
-    yaml_tag = "!tag2"
-
-    def from_yaml(cls, constructor, node):
-        x = constructor.construct_yaml_int(node)
-        return cls(x=x)
-    from_yaml = classmethod(from_yaml)
-
-    def to_yaml(cls, representer, native):
-        return representer.represent_scalar(cls.yaml_tag, str(native.x))
-    to_yaml = classmethod(to_yaml)
-
-class MyTestClass3(MyTestClass2):
-
-    yaml_tag = "!tag3"
-
-    def from_yaml(cls, constructor, node):
-        mapping = constructor.construct_mapping(node)
-        if '=' in mapping:
-            x = mapping['=']
-            del mapping['=']
-            mapping['x'] = x
-        return cls(**mapping)
-    from_yaml = classmethod(from_yaml)
-
-    def to_yaml(cls, representer, native):
-        return representer.represent_mapping(cls.yaml_tag, native.__dict__)
-    to_yaml = classmethod(to_yaml)
-
-class YAMLObject1(YAMLObject):
-
-    yaml_loader = MyLoader
-    yaml_dumper = MyDumper
-    yaml_tag = '!foo'
-
-    def __init__(self, my_parameter=None, my_another_parameter=None):
-        self.my_parameter = my_parameter
-        self.my_another_parameter = my_another_parameter
-
-    def __eq__(self, other):
-        if isinstance(other, YAMLObject1):
-            return self.__class__, self.__dict__ == other.__class__, other.__dict__
-        else:
-            return False
-
-class YAMLObject2(YAMLObject):
-
-    yaml_loader = MyLoader
-    yaml_dumper = MyDumper
-    yaml_tag = '!bar'
-
-    def __init__(self, foo=1, bar=2, baz=3):
-        self.foo = foo
-        self.bar = bar
-        self.baz = baz
-
-    def __getstate__(self):
-        return {1: self.foo, 2: self.bar, 3: self.baz}
-
-    def __setstate__(self, state):
-        self.foo = state[1]
-        self.bar = state[2]
-        self.baz = state[3]
-
-    def __eq__(self, other):
-        if isinstance(other, YAMLObject2):
-            return self.__class__, self.__dict__ == other.__class__, other.__dict__
-        else:
-            return False
-
-class AnObject(object):
-
-    def __new__(cls, foo=None, bar=None, baz=None):
-        self = object.__new__(cls)
-        self.foo = foo
-        self.bar = bar
-        self.baz = baz
-        return self
-
-    def __cmp__(self, other):
-        return cmp((type(self), self.foo, self.bar, self.baz),
-                (type(other), other.foo, other.bar, other.baz))
-
-    def __eq__(self, other):
-        return type(self) is type(other) and    \
-                (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
-
-class AnInstance:
-
-    def __init__(self, foo=None, bar=None, baz=None):
-        self.foo = foo
-        self.bar = bar
-        self.baz = baz
-
-    def __cmp__(self, other):
-        return cmp((type(self), self.foo, self.bar, self.baz),
-                (type(other), other.foo, other.bar, other.baz))
-
-    def __eq__(self, other):
-        return type(self) is type(other) and    \
-                (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
-
-class AState(AnInstance):
-
-    def __getstate__(self):
-        return {
-            '_foo': self.foo,
-            '_bar': self.bar,
-            '_baz': self.baz,
-        }
-
-    def __setstate__(self, state):
-        self.foo = state['_foo']
-        self.bar = state['_bar']
-        self.baz = state['_baz']
-
-class ACustomState(AnInstance):
-
-    def __getstate__(self):
-        return (self.foo, self.bar, self.baz)
-
-    def __setstate__(self, state):
-        self.foo, self.bar, self.baz = state
-
-class InitArgs(AnInstance):
-
-    def __getinitargs__(self):
-        return (self.foo, self.bar, self.baz)
-
-    def __getstate__(self):
-        return {}
-
-class InitArgsWithState(AnInstance):
-
-    def __getinitargs__(self):
-        return (self.foo, self.bar)
-
-    def __getstate__(self):
-        return self.baz
-
-    def __setstate__(self, state):
-        self.baz = state
-
-class NewArgs(AnObject):
-
-    def __getnewargs__(self):
-        return (self.foo, self.bar, self.baz)
-
-    def __getstate__(self):
-        return {}
-
-class NewArgsWithState(AnObject):
-
-    def __getnewargs__(self):
-        return (self.foo, self.bar)
-
-    def __getstate__(self):
-        return self.baz
-
-    def __setstate__(self, state):
-        self.baz = state
-
-class Reduce(AnObject):
-
-    def __reduce__(self):
-        return self.__class__, (self.foo, self.bar, self.baz)
-
-class ReduceWithState(AnObject):
-
-    def __reduce__(self):
-        return self.__class__, (self.foo, self.bar), self.baz
-
-    def __setstate__(self, state):
-        self.baz = state
-
-class MyInt(int):
-
-    def __eq__(self, other):
-        return type(self) is type(other) and int(self) == int(other)
-
-class MyList(list):
-
-    def __init__(self, n=1):
-        self.extend([None]*n)
-
-    def __eq__(self, other):
-        return type(self) is type(other) and list(self) == list(other)
-
-class MyDict(dict):
-
-    def __init__(self, n=1):
-        for k in range(n):
-            self[k] = None
-
-    def __eq__(self, other):
-        return type(self) is type(other) and dict(self) == dict(other)
-
-class FixedOffset(datetime.tzinfo):
-
-    def __init__(self, offset, name):
-        self.__offset = datetime.timedelta(minutes=offset)
-        self.__name = name
-
-    def utcoffset(self, dt):
-        return self.__offset
-
-    def tzname(self, dt):
-        return self.__name
-
-    def dst(self, dt):
-        return datetime.timedelta(0)
-
-
 def execute(code):
     exec code
     return value
 
-class TestConstructorTypes(test_appliance.TestAppliance):
+def _make_objects():
+    global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLObject1, YAMLObject2,  \
+            AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState,    \
+            NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict,  \
+            FixedOffset, execute
 
-    def _testTypes(self, test_name, data_filename, code_filename):
-        data1 = None
-        data2 = None
+    class MyLoader(yaml.Loader):
+        pass
+    class MyDumper(yaml.Dumper):
+        pass
+
+    class MyTestClass1:
+        def __init__(self, x, y=0, z=0):
+            self.x = x
+            self.y = y
+            self.z = z
+        def __eq__(self, other):
+            if isinstance(other, MyTestClass1):
+                return self.__class__, self.__dict__ == other.__class__, other.__dict__
+            else:
+                return False
+
+    def construct1(constructor, node):
+        mapping = constructor.construct_mapping(node)
+        return MyTestClass1(**mapping)
+    def represent1(representer, native):
+        return representer.represent_mapping("!tag1", native.__dict__)
+
+    yaml.add_constructor("!tag1", construct1, Loader=MyLoader)
+    yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper)
+
+    class MyTestClass2(MyTestClass1, yaml.YAMLObject):
+        yaml_loader = MyLoader
+        yaml_dumper = MyDumper
+        yaml_tag = "!tag2"
+        def from_yaml(cls, constructor, node):
+            x = constructor.construct_yaml_int(node)
+            return cls(x=x)
+        from_yaml = classmethod(from_yaml)
+        def to_yaml(cls, representer, native):
+            return representer.represent_scalar(cls.yaml_tag, str(native.x))
+        to_yaml = classmethod(to_yaml)
+
+    class MyTestClass3(MyTestClass2):
+        yaml_tag = "!tag3"
+        def from_yaml(cls, constructor, node):
+            mapping = constructor.construct_mapping(node)
+            if '=' in mapping:
+                x = mapping['=']
+                del mapping['=']
+                mapping['x'] = x
+            return cls(**mapping)
+        from_yaml = classmethod(from_yaml)
+        def to_yaml(cls, representer, native):
+            return representer.represent_mapping(cls.yaml_tag, native.__dict__)
+        to_yaml = classmethod(to_yaml)
+
+    class YAMLObject1(yaml.YAMLObject):
+        yaml_loader = MyLoader
+        yaml_dumper = MyDumper
+        yaml_tag = '!foo'
+        def __init__(self, my_parameter=None, my_another_parameter=None):
+            self.my_parameter = my_parameter
+            self.my_another_parameter = my_another_parameter
+        def __eq__(self, other):
+            if isinstance(other, YAMLObject1):
+                return self.__class__, self.__dict__ == other.__class__, other.__dict__
+            else:
+                return False
+
+    class YAMLObject2(yaml.YAMLObject):
+        yaml_loader = MyLoader
+        yaml_dumper = MyDumper
+        yaml_tag = '!bar'
+        def __init__(self, foo=1, bar=2, baz=3):
+            self.foo = foo
+            self.bar = bar
+            self.baz = baz
+        def __getstate__(self):
+            return {1: self.foo, 2: self.bar, 3: self.baz}
+        def __setstate__(self, state):
+            self.foo = state[1]
+            self.bar = state[2]
+            self.baz = state[3]
+        def __eq__(self, other):
+            if isinstance(other, YAMLObject2):
+                return self.__class__, self.__dict__ == other.__class__, other.__dict__
+            else:
+                return False
+
+    class AnObject(object):
+        def __new__(cls, foo=None, bar=None, baz=None):
+            self = object.__new__(cls)
+            self.foo = foo
+            self.bar = bar
+            self.baz = baz
+            return self
+        def __cmp__(self, other):
+            return cmp((type(self), self.foo, self.bar, self.baz),
+                    (type(other), other.foo, other.bar, other.baz))
+        def __eq__(self, other):
+            return type(self) is type(other) and    \
+                    (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
+
+    class AnInstance:
+        def __init__(self, foo=None, bar=None, baz=None):
+            self.foo = foo
+            self.bar = bar
+            self.baz = baz
+        def __cmp__(self, other):
+            return cmp((type(self), self.foo, self.bar, self.baz),
+                    (type(other), other.foo, other.bar, other.baz))
+        def __eq__(self, other):
+            return type(self) is type(other) and    \
+                    (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
+
+    class AState(AnInstance):
+        def __getstate__(self):
+            return {
+                '_foo': self.foo,
+                '_bar': self.bar,
+                '_baz': self.baz,
+            }
+        def __setstate__(self, state):
+            self.foo = state['_foo']
+            self.bar = state['_bar']
+            self.baz = state['_baz']
+
+    class ACustomState(AnInstance):
+        def __getstate__(self):
+            return (self.foo, self.bar, self.baz)
+        def __setstate__(self, state):
+            self.foo, self.bar, self.baz = state
+
+    class InitArgs(AnInstance):
+        def __getinitargs__(self):
+            return (self.foo, self.bar, self.baz)
+        def __getstate__(self):
+            return {}
+
+    class InitArgsWithState(AnInstance):
+        def __getinitargs__(self):
+            return (self.foo, self.bar)
+        def __getstate__(self):
+            return self.baz
+        def __setstate__(self, state):
+            self.baz = state
+
+    class NewArgs(AnObject):
+        def __getnewargs__(self):
+            return (self.foo, self.bar, self.baz)
+        def __getstate__(self):
+            return {}
+
+    class NewArgsWithState(AnObject):
+        def __getnewargs__(self):
+            return (self.foo, self.bar)
+        def __getstate__(self):
+            return self.baz
+        def __setstate__(self, state):
+            self.baz = state
+
+    class Reduce(AnObject):
+        def __reduce__(self):
+            return self.__class__, (self.foo, self.bar, self.baz)
+
+    class ReduceWithState(AnObject):
+        def __reduce__(self):
+            return self.__class__, (self.foo, self.bar), self.baz
+        def __setstate__(self, state):
+            self.baz = state
+
+    class MyInt(int):
+        def __eq__(self, other):
+            return type(self) is type(other) and int(self) == int(other)
+
+    class MyList(list):
+        def __init__(self, n=1):
+            self.extend([None]*n)
+        def __eq__(self, other):
+            return type(self) is type(other) and list(self) == list(other)
+
+    class MyDict(dict):
+        def __init__(self, n=1):
+            for k in range(n):
+                self[k] = None
+        def __eq__(self, other):
+            return type(self) is type(other) and dict(self) == dict(other)
+
+    class FixedOffset(datetime.tzinfo):
+        def __init__(self, offset, name):
+            self.__offset = datetime.timedelta(minutes=offset)
+            self.__name = name
+        def utcoffset(self, dt):
+            return self.__offset
+        def tzname(self, dt):
+            return self.__name
+        def dst(self, dt):
+            return datetime.timedelta(0)
+
+def _load_code(expression):
+    return eval(expression)
+
+def _serialize_value(data):
+    if isinstance(data, list):
+        return '[%s]' % ', '.join(map(_serialize_value, data))
+    elif isinstance(data, dict):
+        items = []
+        for key, value in data.items():
+            key = _serialize_value(key)
+            value = _serialize_value(value)
+            items.append("%s: %s" % (key, value))
+        items.sort()
+        return '{%s}' % ', '.join(items)
+    elif isinstance(data, datetime.datetime):
+        return repr(data.utctimetuple())
+    elif isinstance(data, unicode):
+        return data.encode('utf-8')
+    else:
+        return str(data)
+
+def test_constructor_types(data_filename, code_filename, verbose=False):
+    _make_objects()
+    native1 = None
+    native2 = None
+    try:
+        native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
+        if len(native1) == 1:
+            native1 = native1[0]
+        native2 = _load_code(open(code_filename, 'rb').read())
         try:
-            data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))
-            if len(data1) == 1:
-                data1 = data1[0]
-            data2 = eval(file(code_filename, 'rb').read())
-            self.failUnlessEqual(type(data1), type(data2))
-            try:
-                self.failUnlessEqual(data1, data2)
-            except (AssertionError, TypeError):
-                if isinstance(data1, dict):
-                    data1 = [(repr(key), value) for key, value in data1.items()]
-                    data1.sort()
-                    data1 = repr(data1)
-                    data2 = [(repr(key), value) for key, value in data2.items()]
-                    data2.sort()
-                    data2 = repr(data2)
-                    if data1 != data2:
-                        raise
-                elif isinstance(data1, list):
-                    self.failUnlessEqual(type(data1), type(data2))
-                    self.failUnlessEqual(len(data1), len(data2))
-                    for item1, item2 in zip(data1, data2):
-                        if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and  \
-                                (item2 != item2 or (item2 == 0.0 and item2 == 1.0)):
-                            continue
-                        if isinstance(item1, datetime.datetime) \
-                                and isinstance(item2, datetime.datetime):
-                            self.failUnlessEqual(item1.microsecond,
-                                    item2.microsecond)
-                        if isinstance(item1, datetime.datetime):
-                            item1 = item1.utctimetuple()
-                        if isinstance(item2, datetime.datetime):
-                            item2 = item2.utctimetuple()
-                        self.failUnlessEqual(item1, item2)
-                else:
-                    raise
-        except:
-            print
-            print "DATA:"
-            print file(data_filename, 'rb').read()
-            print "CODE:"
-            print file(code_filename, 'rb').read()
-            print "NATIVES1:", data1
-            print "NATIVES2:", data2
-            raise
+            if native1 == native2:
+                return
+        except TypeError:
+            pass
+        if verbose:
+            print "SERIALIZED NATIVE1:"
+            print _serialize_value(native1)
+            print "SERIALIZED NATIVE2:"
+            print _serialize_value(native2)
+        assert _serialize_value(native1) == _serialize_value(native2), (native1, native2)
+    finally:
+        if verbose:
+            print "NATIVE1:"
+            pprint.pprint(native1)
+            print "NATIVE2:"
+            pprint.pprint(native2)
 
-TestConstructorTypes.add_tests('testTypes', '.data', '.code')
+test_constructor_types.unittest = ['.data', '.code']
 
+if __name__ == '__main__':
+    import sys, test_constructor
+    sys.modules['test_constructor'] = sys.modules['__main__']
+    import test_appliance
+    test_appliance.run(globals())
+

tests/test_emitter.py

 
-import test_appliance, sys, StringIO
-
-from yaml import *
 import yaml
 
-class TestEmitter(test_appliance.TestAppliance):
+def _compare_events(events1, events2):
+    assert len(events1) == len(events2), (events1, events2)
+    for event1, event2 in zip(events1, events2):
+        assert event1.__class__ == event2.__class__, (event1, event2)
+        if isinstance(event1, yaml.NodeEvent):
+            assert event1.anchor == event2.anchor, (event1, event2)
+        if isinstance(event1, yaml.CollectionStartEvent):
+            assert event1.tag == event2.tag, (event1, event2)
+        if isinstance(event1, yaml.ScalarEvent):
+            if True not in event1.implicit+event2.implicit:
+                assert event1.tag == event2.tag, (event1, event2)
+            assert event1.value == event2.value, (event1, event2)
 
-    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
-        self._testEmitter(test_name, data_filename)
+def test_emitter_on_data(data_filename, canonical_filename, verbose=False):
+    events = list(yaml.parse(open(data_filename, 'rb')))
+    output = yaml.emit(events)
+    if verbose:
+        print "OUTPUT:"
+        print output
+    new_events = list(yaml.parse(output))
+    _compare_events(events, new_events)
 
-    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
-        self._testEmitter(test_name, canonical_filename, False)
+test_emitter_on_data.unittest = ['.data', '.canonical']
 
-    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
-        self._testEmitter(test_name, canonical_filename, True)
+def test_emitter_on_canonical(canonical_filename, verbose=False):
+    events = list(yaml.parse(open(canonical_filename, 'rb')))
+    for canonical in [False, True]:
+        output = yaml.emit(events, canonical=canonical)
+        if verbose:
+            print "OUTPUT (canonical=%s):" % canonical
+            print output
+        new_events = list(yaml.parse(output))
+        _compare_events(events, new_events)
 
-    def _testEmitter(self, test_name, filename, canonical=None):
-        events = list(parse(file(filename, 'rb')))
-        #self._dump(filename, events, canonical)
-        stream = StringIO.StringIO()
-        emit(events, stream, canonical=canonical)
-        data = stream.getvalue()
-        new_events = list(parse(data))
-        for event, new_event in zip(events, new_events):
-            self.failUnlessEqual(event.__class__, new_event.__class__)
-            if isinstance(event, NodeEvent):
-                self.failUnlessEqual(event.anchor, new_event.anchor)
-            if isinstance(event, CollectionStartEvent):
-                self.failUnlessEqual(event.tag, new_event.tag)
-            if isinstance(event, ScalarEvent):
-                #self.failUnlessEqual(event.implicit, new_event.implicit)
-                if True not in event.implicit+new_event.implicit:
-                    self.failUnlessEqual(event.tag, new_event.tag)
-                self.failUnlessEqual(event.value, new_event.value)
+test_emitter_on_canonical.unittest = ['.canonical']
 
-    def _testEmitterStyles(self, test_name, canonical_filename, data_filename):
-        for filename in [canonical_filename, data_filename]:
-            events = list(parse(file(filename, 'rb')))
-            for flow_style in [False, True]:
-                for style in ['|', '>', '"', '\'', '']:
-                    styled_events = []
-                    for event in events:
-                        if isinstance(event, ScalarEvent):
-                            event = ScalarEvent(event.anchor, event.tag,
-                                    event.implicit, event.value, style=style)
-                        elif isinstance(event, SequenceStartEvent):
-                            event = SequenceStartEvent(event.anchor, event.tag,
-                                    event.implicit, flow_style=flow_style)
-                        elif isinstance(event, MappingStartEvent):
-                            event = MappingStartEvent(event.anchor, event.tag,
-                                    event.implicit, flow_style=flow_style)
-                        styled_events.append(event)
-                    stream = StringIO.StringIO()
-                    emit(styled_events, stream)
-                    data = stream.getvalue()
-                    #print data
-                    new_events = list(parse(data))
-                    for event, new_event in zip(events, new_events):
-                        self.failUnlessEqual(event.__class__, new_event.__class__)
-                        if isinstance(event, NodeEvent):
-                            self.failUnlessEqual(event.anchor, new_event.anchor)
-                        if isinstance(event, CollectionStartEvent):
-                            self.failUnlessEqual(event.tag, new_event.tag)
-                        if isinstance(event, ScalarEvent):
-                            #self.failUnlessEqual(event.implicit, new_event.implicit)
-                            if True not in event.implicit+new_event.implicit:
-                                self.failUnlessEqual(event.tag, new_event.tag)
-                            self.failUnlessEqual(event.value, new_event.value)
+def test_emitter_styles(data_filename, canonical_filename, verbose=False):
+    for filename in [data_filename, canonical_filename]:
+        events = list(yaml.parse(open(filename, 'rb')))
+        for flow_style in [False, True]:
+            for style in ['|', '>', '"', '\'', '']:
+                styled_events = []
+                for event in events:
+                    if isinstance(event, yaml.ScalarEvent):
+                        event = yaml.ScalarEvent(event.anchor, event.tag,
+                                event.implicit, event.value, style=style)
+                    elif isinstance(event, yaml.SequenceStartEvent):
+                        event = yaml.SequenceStartEvent(event.anchor, event.tag,
+                                event.implicit, flow_style=flow_style)
+                    elif isinstance(event, yaml.MappingStartEvent):
+                        event = yaml.MappingStartEvent(event.anchor, event.tag,
+                                event.implicit, flow_style=flow_style)
+                    styled_events.append(event)
+                output = yaml.emit(styled_events)
+                if verbose:
+                    print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style)
+                    print output
+                new_events = list(yaml.parse(output))
+                _compare_events(events, new_events)
 
+test_emitter_styles.unittest = ['.data', '.canonical']
 
-    def _dump(self, filename, events, canonical):
-        print "="*30
-        print "ORIGINAL DOCUMENT:"
-        print file(filename, 'rb').read()
-        print '-'*30
-        print "EMITTED DOCUMENT:"
-        emit(events, sys.stdout, canonical=canonical)
-        
-TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
-TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
-TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
-TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data')
-
-class EventsLoader(Loader):
+class EventsLoader(yaml.Loader):
 
     def construct_event(self, node):
-        if isinstance(node, ScalarNode):
+        if isinstance(node, yaml.ScalarNode):
             mapping = {}
         else:
             mapping = self.construct_mapping(node)
 
 EventsLoader.add_constructor(None, EventsLoader.construct_event)
 
-class TestEmitterEvents(test_appliance.TestAppliance):
+def test_emitter_events(events_filename, verbose=False):
+    events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader))
+    output = yaml.emit(events)
+    if verbose:
+        print "OUTPUT:"
+        print output
+    new_events = list(yaml.parse(output))
+    _compare_events(events, new_events)
 
-    def _testEmitterEvents(self, test_name, events_filename):
-        events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
-        #self._dump(events_filename, events)
-        stream = StringIO.StringIO()
-        emit(events, stream)
-        data = stream.getvalue()
-        new_events = list(parse(data))
-        self.failUnlessEqual(len(events), len(new_events))
-        for event, new_event in zip(events, new_events):
-            self.failUnlessEqual(event.__class__, new_event.__class__)
-            if isinstance(event, NodeEvent):
-                self.failUnlessEqual(event.anchor, new_event.anchor)
-            if isinstance(event, CollectionStartEvent):
-                self.failUnlessEqual(event.tag, new_event.tag)
-            if isinstance(event, ScalarEvent):
-                self.failUnless(event.implicit == new_event.implicit
-                        or event.tag == new_event.tag)
-                self.failUnlessEqual(event.value, new_event.value)
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
 
-    def _dump(self, events_filename, events):
-        print "="*30
-        print "EVENTS:"
-        print file(events_filename, 'rb').read()
-        print '-'*30
-        print "OUTPUT:"
-        emit(events, sys.stdout)
-        
-TestEmitterEvents.add_tests('testEmitterEvents', '.events')
-

tests/test_errors.py

 
-import test_appliance
-import test_emitter
+import yaml, test_emitter
 
-import StringIO
+def test_loader_error(error_filename, verbose=False):
+    try:
+        list(yaml.load_all(open(error_filename, 'rb')))
+    except yaml.YAMLError, exc:
+        if verbose:
+            print "%s:" % exc.__class__.__name__, exc
+    else:
+        raise AssertionError("expected an exception")
 
-from yaml import *
+test_loader_error.unittest = ['.loader-error']
 
-class TestErrors(test_appliance.TestAppliance):
+def test_loader_error_string(error_filename, verbose=False):
+    try:
+        list(yaml.load_all(open(error_filename, 'rb').read()))
+    except yaml.YAMLError, exc:
+        if verbose:
+            print "%s:" % exc.__class__.__name__, exc
+    else:
+        raise AssertionError("expected an exception")
 
-    def _testLoaderErrors(self, test_name, invalid_filename):
-        #self._load(invalid_filename)
-        self.failUnlessRaises(YAMLError, lambda: self._load(invalid_filename))
+test_loader_error_string.unittest = ['.loader-error']
 
-    def _testLoaderStringErrors(self, test_name, invalid_filename):
-        #self._load_string(invalid_filename)
-        self.failUnlessRaises(YAMLError, lambda: self._load_string(invalid_filename))
+def test_loader_error_single(error_filename, verbose=False):
+    try:
+        yaml.load(open(error_filename, 'rb').read())
+    except yaml.YAMLError, exc:
+        if verbose:
+            print "%s:" % exc.__class__.__name__, exc
+    else:
+        raise AssertionError("expected an exception")
 
-    def _testLoaderSingleErrors(self, test_name, invalid_filename):
-        #self._load_single(invalid_filename)
-        self.failUnlessRaises(YAMLError, lambda: self._load_single(invalid_filename))
+test_loader_error_single.unittest = ['.single-loader-error']
 
-    def _testEmitterErrors(self, test_name, invalid_filename):
-        events = list(load(file(invalid_filename, 'rb').read(),
-            Loader=test_emitter.EventsLoader))
-        self.failUnlessRaises(YAMLError, lambda: self._emit(events))
+def test_emitter_error(error_filename, verbose=False):
+    events = list(yaml.load(open(error_filename, 'rb'),
+                    Loader=test_emitter.EventsLoader))
+    try:
+        yaml.emit(events)
+    except yaml.YAMLError, exc:
+        if verbose:
+            print "%s:" % exc.__class__.__name__, exc
+    else:
+        raise AssertionError("expected an exception")
 
-    def _testDumperErrors(self, test_name, invalid_filename):
-        code = file(invalid_filename, 'rb').read()
-        self.failUnlessRaises(YAMLError, lambda: self._dump(code))
+test_emitter_error.unittest = ['.emitter-error']
 
-    def _dump(self, code):
-        try:
-            exec code
-        except YAMLError, exc:
-            #print '.'*70
-            #print "%s:" % exc.__class__.__name__, exc
-            raise
+def test_dumper_error(error_filename, verbose=False):
+    code = open(error_filename, 'rb').read()
+    try:
+        import yaml, StringIO
+        exec code
+    except yaml.YAMLError, exc:
+        if verbose:
+            print "%s:" % exc.__class__.__name__, exc
+    else:
+        raise AssertionError("expected an exception")
 
-    def _emit(self, events):
-        try:
-            emit(events)
-        except YAMLError, exc:
-            #print '.'*70
-            #print "%s:" % exc.__class__.__name__, exc
-            raise
+test_dumper_error.unittest = ['.dumper-error']
 
-    def _load(self, filename):
-        try:
-            return list(load_all(file(filename, 'rb')))
-        except YAMLError, exc:
-        #except ScannerError, exc:
-        #except ParserError, exc:
-        #except ComposerError, exc:
-        #except ConstructorError, exc:
-            #print '.'*70
-            #print "%s:" % exc.__class__.__name__, exc
-            raise
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
 
-    def _load_string(self, filename):
-        try:
-            return list(load_all(file(filename, 'rb').read()))
-        except YAMLError, exc:
-        #except ScannerError, exc:
-        #except ParserError, exc:
-        #except ComposerError, exc:
-        #except ConstructorError, exc:
-            #print '.'*70
-            #print "%s:" % filename
-            #print "%s:" % exc.__class__.__name__, exc
-            raise
-
-    def _load_single(self, filename):
-        try:
-            return load(file(filename, 'rb').read())
-        except YAMLError, exc:
-        #except ScannerError, exc:
-        #except ParserError, exc:
-        #except ComposerError, exc:
-        #except ConstructorError, exc:
-            #print '.'*70
-            #print "%s:" % filename
-            #print "%s:" % exc.__class__.__name__, exc
-            raise
-
-TestErrors.add_tests('testLoaderErrors', '.loader-error')
-TestErrors.add_tests('testLoaderStringErrors', '.loader-error')
-TestErrors.add_tests('testLoaderSingleErrors', '.single-loader-error')
-TestErrors.add_tests('testEmitterErrors', '.emitter-error')
-TestErrors.add_tests('testDumperErrors', '.dumper-error')
-

tests/test_mark.py

 
-import test_appliance
+import yaml
 
-from yaml.reader import Mark
+def test_marks(marks_filename, verbose=False):
+    inputs = open(marks_filename, 'rb').read().split('---\n')[1:]
+    for input in inputs:
+        index = 0
+        line = 0
+        column = 0
+        while input[index] != '*':
+            if input[index] == '\n':
+                line += 1
+                column = 0
+            else:
+                column += 1
+            index += 1
+        mark = yaml.Mark(marks_filename, index, line, column, unicode(input), index)
+        snippet = mark.get_snippet(indent=2, max_length=79)
+        if verbose:
+            print snippet
+        assert isinstance(snippet, str), type(snippet)
+        assert snippet.count('\n') == 1, snippet.count('\n')
+        data, pointer = snippet.split('\n')
+        assert len(data) < 82, len(data)
+        assert data[len(pointer)-1] == '*', data[len(pointer)-1]
 
-class TestMark(test_appliance.TestAppliance):
+test_marks.unittest = ['.marks']
 
-    def _testMarks(self, test_name, marks_filename):
-        inputs = file(marks_filename, 'rb').read().split('---\n')[1:]
-        for input in inputs:
-            index = 0
-            line = 0
-            column = 0
-            while input[index] != '*':
-                if input[index] == '\n':
-                    line += 1
-                    column = 0
-                else:
-                    column += 1
-                index += 1
-            mark = Mark(test_name, index, line, column, unicode(input), index)
-            snippet = mark.get_snippet(indent=2, max_length=79)
-            #print "INPUT:"
-            #print input
-            #print "SNIPPET:"
-            #print snippet
-            self.failUnless(isinstance(snippet, str))
-            self.failUnlessEqual(snippet.count('\n'), 1)
-            data, pointer = snippet.split('\n')
-            self.failUnless(len(data) < 82)
-            self.failUnlessEqual(data[len(pointer)-1], '*')
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
 
-TestMark.add_tests('testMarks', '.marks')
-

tests/test_reader.py

 
-import test_appliance
-from yaml.reader import Reader, ReaderError
-
+import yaml.reader
 import codecs
 
-class TestReaderErrors(test_appliance.TestAppliance):
-
-    def _testReaderUnicodeErrors(self, test_name, stream_filename):
-        for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
-            try:
-                data = unicode(file(stream_filename, 'rb').read(), encoding)
-                break
-            except:
-                pass
-        else:
-            return
-        #self._load(data)
-        self.failUnlessRaises(ReaderError,
-                lambda: self._load(data))
-        #self._load(codecs.open(stream_filename, encoding=encoding))
-        self.failUnlessRaises(ReaderError,
-                lambda: self._load(codecs.open(stream_filename, encoding=encoding)))
-
-    def _testReaderStringErrors(self, test_name, stream_filename):
-        data = file(stream_filename, 'rb').read()
-        #self._load(data)
-        self.failUnlessRaises(ReaderError, lambda: self._load(data))
-
-    def _testReaderFileErrors(self, test_name, stream_filename):
-        data = file(stream_filename, 'rb')
-        #self._load(data)
-        self.failUnlessRaises(ReaderError, lambda: self._load(data))
-
-    def _load(self, data):
-        stream = Reader(data)
+def _run_reader(data, verbose):
+    try:
+        stream = yaml.reader.Reader(data)
         while stream.peek() != u'\0':
             stream.forward()
+    except yaml.reader.ReaderError, exc:
+        if verbose:
+            print exc
+    else:
+        raise AssertionError("expected an exception")
 
-TestReaderErrors.add_tests('testReaderUnicodeErrors', '.stream-error')
-TestReaderErrors.add_tests('testReaderStringErrors', '.stream-error')
-TestReaderErrors.add_tests('testReaderFileErrors', '.stream-error')
+def test_stream_error(error_filename, verbose=False):
+    _run_reader(open(error_filename, 'rb'), verbose)
+    _run_reader(open(error_filename, 'rb').read(), verbose)
+    for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
+        try:
+            data = unicode(open(error_filename, 'rb').read(), encoding)