Kirill Simonov avatar Kirill Simonov committed 664c38c

Fixed str/bytes issues with Python 3 in _yaml.pyx.

Comments (0)

Files changed (10)

 
 #include <yaml.h>
 
-#if PY_MAJOR_VERSION >= 3
+#if PY_MAJOR_VERSION < 3
+
+#define PyUnicode_FromString(s) PyUnicode_DecodeUTF8((s), strlen(s), 'strict')
+
+#else
 
 #define PyString_CheckExact PyBytes_CheckExact
 #define PyString_AS_STRING  PyBytes_AS_STRING
     char *PyString_AS_STRING(object o)
     int PyString_GET_SIZE(object o)
     object PyString_FromStringAndSize(char *v, int l)
-    object PyUnicode_DecodeUTF8(char *s, int s, char *e)
+    object PyUnicode_FromString(char *u)
+    object PyUnicode_DecodeUTF8(char *u, int s, char *e)
     object PyUnicode_AsUTF8String(object o)
+    int PY_MAJOR_VERSION
 
     ctypedef enum:
         SIZEOF_VOID_P
 import yaml
 
 def get_version_string():
-    return yaml_get_version_string()
+    cdef char *value
+    value = yaml_get_version_string()
+    if PY_MAJOR_VERSION < 3:
+        return value
+    else:
+        return PyUnicode_FromString(value)
 
 def get_version():
     cdef int major, minor, patch
     yaml_get_version(&major, &minor, &patch)
     return (major, minor, patch)
 
-#Mark = yaml.error.Mark
+Mark = yaml.error.Mark
 YAMLError = yaml.error.YAMLError
 ReaderError = yaml.reader.ReaderError
 ScannerError = yaml.scanner.ScannerError
 SequenceNode = yaml.nodes.SequenceNode
 MappingNode = yaml.nodes.MappingNode
 
-cdef class Mark:
-    cdef readonly object name
-    cdef readonly int index
-    cdef readonly int line
-    cdef readonly int column
-    cdef readonly buffer
-    cdef readonly pointer
-
-    def __init__(self, object name, int index, int line, int column,
-            object buffer, object pointer):
-        self.name = name
-        self.index = index
-        self.line = line
-        self.column = column
-        self.buffer = buffer
-        self.pointer = pointer
-
-    def get_snippet(self):
-        return None
-
-    def __str__(self):
-        where = "  in \"%s\", line %d, column %d"   \
-                % (self.name, self.line+1, self.column+1)
-        return where
-
+#cdef class Mark:
+#    cdef readonly object name
+#    cdef readonly int index
+#    cdef readonly int line
+#    cdef readonly int column
+#    cdef readonly buffer
+#    cdef readonly pointer
+#
+#    def __init__(self, object name, int index, int line, int column,
+#            object buffer, object pointer):
+#        self.name = name
+#        self.index = index
+#        self.line = line
+#        self.column = column
+#        self.buffer = buffer
+#        self.pointer = pointer
+#
+#    def get_snippet(self):
+#        return None
+#
+#    def __str__(self):
+#        where = "  in \"%s\", line %d, column %d"   \
+#                % (self.name, self.line+1, self.column+1)
+#        return where
+#
 #class YAMLError(Exception):
 #    pass
 #
             try:
                 self.stream_name = stream.name
             except AttributeError:
-                self.stream_name = '<file>'
+                if PY_MAJOR_VERSION < 3:
+                    self.stream_name = '<file>'
+                else:
+                    self.stream_name = u'<file>'
             self.stream_cache = None
             self.stream_cache_len = 0
             self.stream_cache_pos = 0
         else:
             if PyUnicode_CheckExact(stream) != 0:
                 stream = PyUnicode_AsUTF8String(stream)
-                self.stream_name = '<unicode string>'
+                if PY_MAJOR_VERSION < 3:
+                    self.stream_name = '<unicode string>'
+                else:
+                    self.stream_name = u'<unicode string>'
                 self.unicode_source = 1
             else:
-                self.stream_name = '<byte string>'
+                if PY_MAJOR_VERSION < 3:
+                    self.stream_name = '<byte string>'
+                else:
+                    self.stream_name = u'<byte string>'
             if PyString_CheckExact(stream) == 0:
-                raise TypeError("a string or stream input is required")
+                if PY_MAJOR_VERSION < 3:
+                    raise TypeError("a string or stream input is required")
+                else:
+                    raise TypeError(u"a string or stream input is required")
             self.stream = stream
             yaml_parser_set_input_string(&self.parser, PyString_AS_STRING(stream), PyString_GET_SIZE(stream))
         self.current_token = None
         if self.parser.error == YAML_MEMORY_ERROR:
             return MemoryError
         elif self.parser.error == YAML_READER_ERROR:
-            return ReaderError(self.stream_name, self.parser.problem_offset,
-                    self.parser.problem_value, '?', self.parser.problem)
+            if PY_MAJOR_VERSION < 3:
+                return ReaderError(self.stream_name, self.parser.problem_offset,
+                        self.parser.problem_value, '?', self.parser.problem)
+            else:
+                return ReaderError(self.stream_name, self.parser.problem_offset,
+                        self.parser.problem_value, u'?', PyUnicode_FromString(self.parser.problem))
         elif self.parser.error == YAML_SCANNER_ERROR    \
                 or self.parser.error == YAML_PARSER_ERROR:
             context_mark = None
                         self.parser.problem_mark.index,
                         self.parser.problem_mark.line,
                         self.parser.problem_mark.column, None, None)
+            context = None
+            if self.parser.context != NULL:
+                if PY_MAJOR_VERSION < 3:
+                    context = self.parser.context
+                else:
+                    context = PyUnicode_FromString(self.parser.context)
+            if PY_MAJOR_VERSION < 3:
+                problem = self.parser.problem
+            else:
+                problem = PyUnicode_FromString(self.parser.problem)
             if self.parser.error == YAML_SCANNER_ERROR:
-                if self.parser.context != NULL:
-                    return ScannerError(self.parser.context, context_mark,
-                            self.parser.problem, problem_mark)
-                else:
-                    return ScannerError(None, None,
-                            self.parser.problem, problem_mark)
+                return ScannerError(context, context_mark, problem, problem_mark)
             else:
-                if self.parser.context != NULL:
-                    return ParserError(self.parser.context, context_mark,
-                            self.parser.problem, problem_mark)
-                else:
-                    return ParserError(None, None,
-                            self.parser.problem, problem_mark)
-        raise ValueError("no parser error")
+                return ParserError(context, context_mark, problem, problem_mark)
+        if PY_MAJOR_VERSION < 3:
+            raise ValueError("no parser error")
+        else:
+            raise ValueError(u"no parser error")
 
     def raw_scan(self):
         cdef yaml_token_t token
                         token.data.version_directive.minor),
                     start_mark, end_mark)
         elif token.type == YAML_TAG_DIRECTIVE_TOKEN:
-            handle = PyUnicode_DecodeUTF8(token.data.tag_directive.handle,
-                    strlen(token.data.tag_directive.handle), 'strict')
-            prefix = PyUnicode_DecodeUTF8(token.data.tag_directive.prefix,
-                    strlen(token.data.tag_directive.prefix), 'strict')
+            handle = PyUnicode_FromString(token.data.tag_directive.handle)
+            prefix = PyUnicode_FromString(token.data.tag_directive.prefix)
             return DirectiveToken(u"TAG", (handle, prefix),
                     start_mark, end_mark)
         elif token.type == YAML_DOCUMENT_START_TOKEN:
         elif token.type == YAML_VALUE_TOKEN:
             return ValueToken(start_mark, end_mark)
         elif token.type == YAML_ALIAS_TOKEN:
-            value = PyUnicode_DecodeUTF8(token.data.alias.value,
-                    strlen(token.data.alias.value), 'strict')
+            value = PyUnicode_FromString(token.data.alias.value)
             return AliasToken(value, start_mark, end_mark)
         elif token.type == YAML_ANCHOR_TOKEN:
-            value = PyUnicode_DecodeUTF8(token.data.anchor.value,
-                    strlen(token.data.anchor.value), 'strict')
+            value = PyUnicode_FromString(token.data.anchor.value)
             return AnchorToken(value, start_mark, end_mark)
         elif token.type == YAML_TAG_TOKEN:
-            handle = PyUnicode_DecodeUTF8(token.data.tag.handle,
-                    strlen(token.data.tag.handle), 'strict')
-            suffix = PyUnicode_DecodeUTF8(token.data.tag.suffix,
-                    strlen(token.data.tag.suffix), 'strict')
+            handle = PyUnicode_FromString(token.data.tag.handle)
+            suffix = PyUnicode_FromString(token.data.tag.suffix)
             if not handle:
                 handle = None
             return TagToken((handle, suffix), start_mark, end_mark)
             style = None
             if token.data.scalar.style == YAML_PLAIN_SCALAR_STYLE:
                 plain = True
-                style = ''
+                style = u''
             elif token.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE:
-                style = '\''
+                style = u'\''
             elif token.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE:
-                style = '"'
+                style = u'"'
             elif token.data.scalar.style == YAML_LITERAL_SCALAR_STYLE:
-                style = '|'
+                style = u'|'
             elif token.data.scalar.style == YAML_FOLDED_SCALAR_STYLE:
-                style = '>'
+                style = u'>'
             return ScalarToken(value, plain,
                     start_mark, end_mark, style)
         else:
-            raise ValueError("unknown token type")
+            if PY_MAJOR_VERSION < 3:
+                raise ValueError("unknown token type")
+            else:
+                raise ValueError(u"unknown token type")
 
     def get_token(self):
         if self.current_token is not None:
             encoding = None
             if event.data.stream_start.encoding == YAML_UTF8_ENCODING:
                 if self.unicode_source == 0:
-                    encoding = "utf-8"
+                    encoding = u"utf-8"
             elif event.data.stream_start.encoding == YAML_UTF16LE_ENCODING:
-                encoding = "utf-16-le"
+                encoding = u"utf-16-le"
             elif event.data.stream_start.encoding == YAML_UTF16BE_ENCODING:
-                encoding = "utf-16-be"
+                encoding = u"utf-16-be"
             return StreamStartEvent(start_mark, end_mark, encoding)
         elif event.type == YAML_STREAM_END_EVENT:
             return StreamEndEvent(start_mark, end_mark)
-
         elif event.type == YAML_DOCUMENT_START_EVENT:
             explicit = False
             if event.data.document_start.implicit == 0:
                 tags = {}
                 tag_directive = event.data.document_start.tag_directives.start
                 while tag_directive != event.data.document_start.tag_directives.end:
-                    handle = PyUnicode_DecodeUTF8(tag_directive.handle,
-                            strlen(tag_directive.handle), 'strict')
-                    prefix = PyUnicode_DecodeUTF8(tag_directive.prefix,
-                            strlen(tag_directive.prefix), 'strict')
+                    handle = PyUnicode_FromString(tag_directive.handle)
+                    prefix = PyUnicode_FromString(tag_directive.prefix)
                     tags[handle] = prefix
                     tag_directive = tag_directive+1
             return DocumentStartEvent(start_mark, end_mark,
                 explicit = True
             return DocumentEndEvent(start_mark, end_mark, explicit)
         elif event.type == YAML_ALIAS_EVENT:
-            anchor = PyUnicode_DecodeUTF8(event.data.alias.anchor,
-                    strlen(event.data.alias.anchor), 'strict')
+            anchor = PyUnicode_FromString(event.data.alias.anchor)
             return AliasEvent(anchor, start_mark, end_mark)
         elif event.type == YAML_SCALAR_EVENT:
             anchor = None
             if event.data.scalar.anchor != NULL:
-                anchor = PyUnicode_DecodeUTF8(event.data.scalar.anchor,
-                        strlen(event.data.scalar.anchor), 'strict')
+                anchor = PyUnicode_FromString(event.data.scalar.anchor)
             tag = None
             if event.data.scalar.tag != NULL:
-                tag = PyUnicode_DecodeUTF8(event.data.scalar.tag,
-                        strlen(event.data.scalar.tag), 'strict')
+                tag = PyUnicode_FromString(event.data.scalar.tag)
             value = PyUnicode_DecodeUTF8(event.data.scalar.value,
                     event.data.scalar.length, 'strict')
             plain_implicit = False
                 quoted_implicit = True
             style = None
             if event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE:
-                style = ''
+                style = u''
             elif event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE:
-                style = '\''
+                style = u'\''
             elif event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE:
-                style = '"'
+                style = u'"'
             elif event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE:
-                style = '|'
+                style = u'|'
             elif event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE:
-                style = '>'
+                style = u'>'
             return ScalarEvent(anchor, tag,
                     (plain_implicit, quoted_implicit),
                     value, start_mark, end_mark, style)
         elif event.type == YAML_SEQUENCE_START_EVENT:
             anchor = None
             if event.data.sequence_start.anchor != NULL:
-                anchor = PyUnicode_DecodeUTF8(event.data.sequence_start.anchor,
-                        strlen(event.data.sequence_start.anchor), 'strict')
+                anchor = PyUnicode_FromString(event.data.sequence_start.anchor)
             tag = None
             if event.data.sequence_start.tag != NULL:
-                tag = PyUnicode_DecodeUTF8(event.data.sequence_start.tag,
-                        strlen(event.data.sequence_start.tag), 'strict')
+                tag = PyUnicode_FromString(event.data.sequence_start.tag)
             implicit = False
             if event.data.sequence_start.implicit == 1:
                 implicit = True
         elif event.type == YAML_MAPPING_START_EVENT:
             anchor = None
             if event.data.mapping_start.anchor != NULL:
-                anchor = PyUnicode_DecodeUTF8(event.data.mapping_start.anchor,
-                        strlen(event.data.mapping_start.anchor), 'strict')
+                anchor = PyUnicode_FromString(event.data.mapping_start.anchor)
             tag = None
             if event.data.mapping_start.tag != NULL:
-                tag = PyUnicode_DecodeUTF8(event.data.mapping_start.tag,
-                        strlen(event.data.mapping_start.tag), 'strict')
+                tag = PyUnicode_FromString(event.data.mapping_start.tag)
             implicit = False
             if event.data.mapping_start.implicit == 1:
                 implicit = True
             return SequenceEndEvent(start_mark, end_mark)
         elif event.type == YAML_MAPPING_END_EVENT:
             return MappingEndEvent(start_mark, end_mark)
-
         else:
-            raise ValueError("unknown token type")
+            if PY_MAJOR_VERSION < 3:
+                raise ValueError("unknown event type")
+            else:
+                raise ValueError(u"unknown event type")
 
     def get_event(self):
         if self.current_event is not None:
                     self.parsed_event.start_mark.line,
                     self.parsed_event.start_mark.column,
                     None, None)
-            raise ComposerError("expected a single document in the stream",
-                    document.start_mark, "but found another document", mark)
+            if PY_MAJOR_VERSION < 3:
+                raise ComposerError("expected a single document in the stream",
+                        document.start_mark, "but found another document", mark)
+            else:
+                raise ComposerError(u"expected a single document in the stream",
+                        document.start_mark, u"but found another document", mark)
         return document
 
     cdef object _compose_document(self):
     cdef object _compose_node(self, object parent, object index):
         self._parse_next_event()
         if self.parsed_event.type == YAML_ALIAS_EVENT:
-            anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.alias.anchor,
-                    strlen(self.parsed_event.data.alias.anchor), 'strict')
+            anchor = PyUnicode_FromString(self.parsed_event.data.alias.anchor)
             if anchor not in self.anchors:
                 mark = Mark(self.stream_name,
                         self.parsed_event.start_mark.index,
                         self.parsed_event.start_mark.line,
                         self.parsed_event.start_mark.column,
                         None, None)
-                raise ComposerError(None, None, "found undefined alias", mark)
+                if PY_MAJOR_VERSION < 3:
+                    raise ComposerError(None, None, "found undefined alias", mark)
+                else:
+                    raise ComposerError(None, None, u"found undefined alias", mark)
             yaml_event_delete(&self.parsed_event)
             return self.anchors[anchor]
         anchor = None
         if self.parsed_event.type == YAML_SCALAR_EVENT  \
                 and self.parsed_event.data.scalar.anchor != NULL:
-            anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.anchor,
-                    strlen(self.parsed_event.data.scalar.anchor), 'strict')
+            anchor = PyUnicode_FromString(self.parsed_event.data.scalar.anchor)
         elif self.parsed_event.type == YAML_SEQUENCE_START_EVENT    \
                 and self.parsed_event.data.sequence_start.anchor != NULL:
-            anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.anchor,
-                    strlen(self.parsed_event.data.sequence_start.anchor), 'strict')
+            anchor = PyUnicode_FromString(self.parsed_event.data.sequence_start.anchor)
         elif self.parsed_event.type == YAML_MAPPING_START_EVENT    \
                 and self.parsed_event.data.mapping_start.anchor != NULL:
-            anchor = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.anchor,
-                    strlen(self.parsed_event.data.mapping_start.anchor), 'strict')
+            anchor = PyUnicode_FromString(self.parsed_event.data.mapping_start.anchor)
         if anchor is not None:
             if anchor in self.anchors:
                 mark = Mark(self.stream_name,
                         self.parsed_event.start_mark.line,
                         self.parsed_event.start_mark.column,
                         None, None)
-                raise ComposerError("found duplicate anchor; first occurence",
-                        self.anchors[anchor].start_mark, "second occurence", mark)
+                if PY_MAJOR_VERSION < 3:
+                    raise ComposerError("found duplicate anchor; first occurence",
+                            self.anchors[anchor].start_mark, "second occurence", mark)
+                else:
+                    raise ComposerError(u"found duplicate anchor; first occurence",
+                            self.anchors[anchor].start_mark, u"second occurence", mark)
         self.descend_resolver(parent, index)
         if self.parsed_event.type == YAML_SCALAR_EVENT:
             node = self._compose_scalar_node(anchor)
                         and self.parsed_event.data.scalar.tag[1] == c'\0'):
             tag = self.resolve(ScalarNode, value, (plain_implicit, quoted_implicit))
         else:
-            tag = PyUnicode_DecodeUTF8(self.parsed_event.data.scalar.tag,
-                    strlen(self.parsed_event.data.scalar.tag), 'strict')
+            tag = PyUnicode_FromString(self.parsed_event.data.scalar.tag)
         style = None
         if self.parsed_event.data.scalar.style == YAML_PLAIN_SCALAR_STYLE:
-            style = ''
+            style = u''
         elif self.parsed_event.data.scalar.style == YAML_SINGLE_QUOTED_SCALAR_STYLE:
-            style = '\''
+            style = u'\''
         elif self.parsed_event.data.scalar.style == YAML_DOUBLE_QUOTED_SCALAR_STYLE:
-            style = '"'
+            style = u'"'
         elif self.parsed_event.data.scalar.style == YAML_LITERAL_SCALAR_STYLE:
-            style = '|'
+            style = u'|'
         elif self.parsed_event.data.scalar.style == YAML_FOLDED_SCALAR_STYLE:
-            style = '>'
+            style = u'>'
         node = ScalarNode(tag, value, start_mark, end_mark, style)
         if anchor is not None:
             self.anchors[anchor] = node
                         and self.parsed_event.data.sequence_start.tag[1] == c'\0'):
             tag = self.resolve(SequenceNode, None, implicit)
         else:
-            tag = PyUnicode_DecodeUTF8(self.parsed_event.data.sequence_start.tag,
-                    strlen(self.parsed_event.data.sequence_start.tag), 'strict')
+            tag = PyUnicode_FromString(self.parsed_event.data.sequence_start.tag)
         flow_style = None
         if self.parsed_event.data.sequence_start.style == YAML_FLOW_SEQUENCE_STYLE:
             flow_style = True
                         and self.parsed_event.data.mapping_start.tag[1] == c'\0'):
             tag = self.resolve(MappingNode, None, implicit)
         else:
-            tag = PyUnicode_DecodeUTF8(self.parsed_event.data.mapping_start.tag,
-                    strlen(self.parsed_event.data.mapping_start.tag), 'strict')
+            tag = PyUnicode_FromString(self.parsed_event.data.mapping_start.tag)
         flow_style = None
         if self.parsed_event.data.mapping_start.style == YAML_FLOW_MAPPING_STYLE:
             flow_style = True
             value = PyUnicode_AsUTF8String(value)
             parser.unicode_source = 1
         if PyString_CheckExact(value) == 0:
-            raise TypeError("a string value is expected")
+            if PY_MAJOR_VERSION < 3:
+                raise TypeError("a string value is expected")
+            else:
+                raise TypeError(u"a string value is expected")
         parser.stream_cache = value
         parser.stream_cache_pos = 0
         parser.stream_cache_len = PyString_GET_SIZE(value)
         if self.emitter.error == YAML_MEMORY_ERROR:
             return MemoryError
         elif self.emitter.error == YAML_EMITTER_ERROR:
-            return EmitterError(self.emitter.problem)
-        raise ValueError("no emitter error")
+            if PY_MAJOR_VERSION < 3:
+                problem = self.emitter.problem
+            else:
+                problem = PyUnicode_FromString(self.emitter.problem)
+            return EmitterError(problem)
+        if PY_MAJOR_VERSION < 3:
+            raise ValueError("no emitter error")
+        else:
+            raise ValueError(u"no emitter error")
 
     cdef int _object_to_event(self, object event_object, yaml_event_t *event) except 0:
         cdef yaml_encoding_t encoding
         event_class = event_object.__class__
         if event_class is StreamStartEvent:
             encoding = YAML_UTF8_ENCODING
-            if event_object.encoding == 'utf-16-le':
+            if event_object.encoding == u'utf-16-le' or event_object.encoding == 'utf-16-le':
                 encoding = YAML_UTF16LE_ENCODING
-            elif event_object.encoding == 'utf-16-be':
+            elif event_object.encoding == u'utf-16-be' or event_object.encoding == 'utf-16-be':
                 encoding = YAML_UTF16BE_ENCODING
             if event_object.encoding is None:
                 self.dump_unicode = 1
             tag_directives_end = NULL
             if event_object.tags:
                 if len(event_object.tags) > 128:
-                    raise ValueError("too many tags")
+                    if PY_MAJOR_VERSION < 3:
+                        raise ValueError("too many tags")
+                    else:
+                        raise ValueError(u"too many tags")
                 tag_directives_start = tag_directives_value
                 tag_directives_end = tag_directives_value
                 cache = []
                         handle = PyUnicode_AsUTF8String(handle)
                         cache.append(handle)
                     if not PyString_CheckExact(handle):
-                        raise TypeError("tag handle must be a string")
+                        if PY_MAJOR_VERSION < 3:
+                            raise TypeError("tag handle must be a string")
+                        else:
+                            raise TypeError(u"tag handle must be a string")
                     tag_directives_end.handle = PyString_AS_STRING(handle)
                     if PyUnicode_CheckExact(prefix):
                         prefix = PyUnicode_AsUTF8String(prefix)
                         cache.append(prefix)
                     if not PyString_CheckExact(prefix):
-                        raise TypeError("tag prefix must be a string")
+                        if PY_MAJOR_VERSION < 3:
+                            raise TypeError("tag prefix must be a string")
+                        else:
+                            raise TypeError(u"tag prefix must be a string")
                     tag_directives_end.prefix = PyString_AS_STRING(prefix)
                     tag_directives_end = tag_directives_end+1
             implicit = 1
             if PyUnicode_CheckExact(anchor_object):
                 anchor_object = PyUnicode_AsUTF8String(anchor_object)
             if not PyString_CheckExact(anchor_object):
-                raise TypeError("anchor must be a string")
+                if PY_MAJOR_VERSION < 3:
+                    raise TypeError("anchor must be a string")
+                else:
+                    raise TypeError(u"anchor must be a string")
             anchor = PyString_AS_STRING(anchor_object)
             if yaml_alias_event_initialize(event, anchor) == 0:
                 raise MemoryError
                 if PyUnicode_CheckExact(anchor_object):
                     anchor_object = PyUnicode_AsUTF8String(anchor_object)
                 if not PyString_CheckExact(anchor_object):
-                    raise TypeError("anchor must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("anchor must be a string")
+                    else:
+                        raise TypeError(u"anchor must be a string")
                 anchor = PyString_AS_STRING(anchor_object)
             tag = NULL
             tag_object = event_object.tag
                 if PyUnicode_CheckExact(tag_object):
                     tag_object = PyUnicode_AsUTF8String(tag_object)
                 if not PyString_CheckExact(tag_object):
-                    raise TypeError("tag must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("tag must be a string")
+                    else:
+                        raise TypeError(u"tag must be a string")
                 tag = PyString_AS_STRING(tag_object)
             value_object = event_object.value
             if PyUnicode_CheckExact(value_object):
                 value_object = PyUnicode_AsUTF8String(value_object)
             if not PyString_CheckExact(value_object):
-                raise TypeError("value must be a string")
+                if PY_MAJOR_VERSION < 3:
+                    raise TypeError("value must be a string")
+                else:
+                    raise TypeError(u"value must be a string")
             value = PyString_AS_STRING(value_object)
             length = PyString_GET_SIZE(value_object)
             plain_implicit = 0
                 quoted_implicit = event_object.implicit[1]
             style_object = event_object.style
             scalar_style = YAML_PLAIN_SCALAR_STYLE
-            if style_object == "'":
+            if style_object == "'" or style_object == u"'":
                 scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE
-            elif style_object == "\"":
+            elif style_object == "\"" or style_object == u"\"":
                 scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE
-            elif style_object == "|":
+            elif style_object == "|" or style_object == u"|":
                 scalar_style = YAML_LITERAL_SCALAR_STYLE
-            elif style_object == ">":
+            elif style_object == ">" or style_object == u">":
                 scalar_style = YAML_FOLDED_SCALAR_STYLE
             if yaml_scalar_event_initialize(event, anchor, tag, value, length,
                     plain_implicit, quoted_implicit, scalar_style) == 0:
                 if PyUnicode_CheckExact(anchor_object):
                     anchor_object = PyUnicode_AsUTF8String(anchor_object)
                 if not PyString_CheckExact(anchor_object):
-                    raise TypeError("anchor must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("anchor must be a string")
+                    else:
+                        raise TypeError(u"anchor must be a string")
                 anchor = PyString_AS_STRING(anchor_object)
             tag = NULL
             tag_object = event_object.tag
                 if PyUnicode_CheckExact(tag_object):
                     tag_object = PyUnicode_AsUTF8String(tag_object)
                 if not PyString_CheckExact(tag_object):
-                    raise TypeError("tag must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("tag must be a string")
+                    else:
+                        raise TypeError(u"tag must be a string")
                 tag = PyString_AS_STRING(tag_object)
             implicit = 0
             if event_object.implicit:
                 if PyUnicode_CheckExact(anchor_object):
                     anchor_object = PyUnicode_AsUTF8String(anchor_object)
                 if not PyString_CheckExact(anchor_object):
-                    raise TypeError("anchor must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("anchor must be a string")
+                    else:
+                        raise TypeError(u"anchor must be a string")
                 anchor = PyString_AS_STRING(anchor_object)
             tag = NULL
             tag_object = event_object.tag
                 if PyUnicode_CheckExact(tag_object):
                     tag_object = PyUnicode_AsUTF8String(tag_object)
                 if not PyString_CheckExact(tag_object):
-                    raise TypeError("tag must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("tag must be a string")
+                    else:
+                        raise TypeError(u"tag must be a string")
                 tag = PyString_AS_STRING(tag_object)
             implicit = 0
             if event_object.implicit:
         elif event_class is MappingEndEvent:
             yaml_mapping_end_event_initialize(event)
         else:
-            raise TypeError("invalid event %s" % event_object)
+            if PY_MAJOR_VERSION < 3:
+                raise TypeError("invalid event %s" % event_object)
+            else:
+                raise TypeError(u"invalid event %s" % event_object)
         return 1
 
     def emit(self, event_object):
         cdef yaml_event_t event
         cdef yaml_encoding_t encoding
         if self.closed == -1:
-            if self.use_encoding == 'utf-16-le':
+            if self.use_encoding == u'utf-16-le' or self.use_encoding == 'utf-16-le':
                 encoding = YAML_UTF16LE_ENCODING
-            elif self.use_encoding == 'utf-16-be':
+            elif self.use_encoding == u'utf-16-be' or self.use_encoding == 'utf-16-be':
                 encoding = YAML_UTF16BE_ENCODING
             else:
                 encoding = YAML_UTF8_ENCODING
                 raise error
             self.closed = 0
         elif self.closed == 1:
-            raise SerializerError("serializer is closed")
+            if PY_MAJOR_VERSION < 3:
+                raise SerializerError("serializer is closed")
+            else:
+                raise SerializerError(u"serializer is closed")
         else:
-            raise SerializerError("serializer is already opened")
+            if PY_MAJOR_VERSION < 3:
+                raise SerializerError("serializer is already opened")
+            else:
+                raise SerializerError(u"serializer is already opened")
 
     def close(self):
         cdef yaml_event_t event
         if self.closed == -1:
-            raise SerializerError("serializer is not opened")
+            if PY_MAJOR_VERSION < 3:
+                raise SerializerError("serializer is not opened")
+            else:
+                raise SerializerError(u"serializer is not opened")
         elif self.closed == 0:
             yaml_stream_end_event_initialize(&event)
             if yaml_emitter_emit(&self.emitter, &event) == 0:
         cdef yaml_tag_directive_t *tag_directives_start
         cdef yaml_tag_directive_t *tag_directives_end
         if self.closed == -1:
-            raise SerializerError("serializer is not opened")
+            if PY_MAJOR_VERSION < 3:
+                raise SerializerError("serializer is not opened")
+            else:
+                raise SerializerError(u"serializer is not opened")
         elif self.closed == 1:
-            raise SerializerError("serializer is closed")
+            if PY_MAJOR_VERSION < 3:
+                raise SerializerError("serializer is closed")
+            else:
+                raise SerializerError(u"serializer is closed")
         cache = []
         version_directive = NULL
         if self.use_version:
         tag_directives_end = NULL
         if self.use_tags:
             if len(self.use_tags) > 128:
-                raise ValueError("too many tags")
+                if PY_MAJOR_VERSION < 3:
+                    raise ValueError("too many tags")
+                else:
+                    raise ValueError(u"too many tags")
             tag_directives_start = tag_directives_value
             tag_directives_end = tag_directives_value
             for handle in self.use_tags:
                     handle = PyUnicode_AsUTF8String(handle)
                     cache.append(handle)
                 if not PyString_CheckExact(handle):
-                    raise TypeError("tag handle must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("tag handle must be a string")
+                    else:
+                        raise TypeError(u"tag handle must be a string")
                 tag_directives_end.handle = PyString_AS_STRING(handle)
                 if PyUnicode_CheckExact(prefix):
                     prefix = PyUnicode_AsUTF8String(prefix)
                     cache.append(prefix)
                 if not PyString_CheckExact(prefix):
-                    raise TypeError("tag prefix must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("tag prefix must be a string")
+                    else:
+                        raise TypeError(u"tag prefix must be a string")
                 tag_directives_end.prefix = PyString_AS_STRING(prefix)
                 tag_directives_end = tag_directives_end+1
         if yaml_document_start_event_initialize(&event, version_directive,
                     if PyUnicode_CheckExact(tag_object):
                         tag_object = PyUnicode_AsUTF8String(tag_object)
                     if not PyString_CheckExact(tag_object):
-                        raise TypeError("tag must be a string")
+                        if PY_MAJOR_VERSION < 3:
+                            raise TypeError("tag must be a string")
+                        else:
+                            raise TypeError(u"tag must be a string")
                     tag = PyString_AS_STRING(tag_object)
                 value_object = node.value
                 if PyUnicode_CheckExact(value_object):
                     value_object = PyUnicode_AsUTF8String(value_object)
                 if not PyString_CheckExact(value_object):
-                    raise TypeError("value must be a string")
+                    if PY_MAJOR_VERSION < 3:
+                        raise TypeError("value must be a string")
+                    else:
+                        raise TypeError(u"value must be a string")
                 value = PyString_AS_STRING(value_object)
                 length = PyString_GET_SIZE(value_object)
                 style_object = node.style
                 scalar_style = YAML_PLAIN_SCALAR_STYLE
-                if style_object == "'":
+                if style_object == "'" or style_object == u"'":
                     scalar_style = YAML_SINGLE_QUOTED_SCALAR_STYLE
-                elif style_object == "\"":
+                elif style_object == "\"" or style_object == u"\"":
                     scalar_style = YAML_DOUBLE_QUOTED_SCALAR_STYLE
-                elif style_object == "|":
+                elif style_object == "|" or style_object == u"|":
                     scalar_style = YAML_LITERAL_SCALAR_STYLE
-                elif style_object == ">":
+                elif style_object == ">" or style_object == u">":
                     scalar_style = YAML_FOLDED_SCALAR_STYLE
                 if yaml_scalar_event_initialize(&event, anchor, tag, value, length,
                         plain_implicit, quoted_implicit, scalar_style) == 0:
                     if PyUnicode_CheckExact(tag_object):
                         tag_object = PyUnicode_AsUTF8String(tag_object)
                     if not PyString_CheckExact(tag_object):
-                        raise TypeError("tag must be a string")
+                        if PY_MAJOR_VERSION < 3:
+                            raise TypeError("tag must be a string")
+                        else:
+                            raise TypeError(u"tag must be a string")
                     tag = PyString_AS_STRING(tag_object)
                 sequence_style = YAML_BLOCK_SEQUENCE_STYLE
                 if node.flow_style:
                     if PyUnicode_CheckExact(tag_object):
                         tag_object = PyUnicode_AsUTF8String(tag_object)
                     if not PyString_CheckExact(tag_object):
-                        raise TypeError("tag must be a string")
+                        if PY_MAJOR_VERSION < 3:
+                            raise TypeError("tag must be a string")
+                        else:
+                            raise TypeError(u"tag must be a string")
                     tag = PyString_AS_STRING(tag_object)
                 mapping_style = YAML_BLOCK_MAPPING_STYLE
                 if node.flow_style:

tests/data/invalid-base64-data-2.loader-error

+--- !!binary
+    двоичные данные в base64

tests/data/invalid-python-bytes-2-py3.loader-error

+--- !!python/bytes
+    двоичные данные в base64

tests/data/invalid-python-bytes-py3.loader-error

+--- !!python/bytes
+    binary data encoded in base64 should be here.

tests/lib/test_input_output.py

 def test_unicode_output(unicode_filename, verbose=False):
     data = open(unicode_filename, 'rb').read().decode('utf-8')
     value = ' '.join(data.split())
-    for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
-        for allow_unicode in [False, True]:
-            data1 = yaml.dump(value, allow_unicode=allow_unicode)
+    for allow_unicode in [False, True]:
+        data1 = yaml.dump(value, allow_unicode=allow_unicode)
+        for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
             stream = StringIO.StringIO()
             yaml.dump(value, _unicode_open(stream, 'utf-8'), encoding=encoding, allow_unicode=allow_unicode)
             data2 = stream.getvalue()

tests/lib3/test_input_output.py

+
+import yaml
+import codecs, io
+
+def test_unicode_input(unicode_filename, verbose=False):
+    data = open(unicode_filename, 'rb').read().decode('utf-8')
+    value = ' '.join(data.split())
+    output = yaml.load(data)
+    assert output == value, (output, value)
+    output = yaml.load(io.StringIO(data))
+    assert output == value, (output, value)
+    for input in [data.encode('utf-8'),
+                    codecs.BOM_UTF8+data.encode('utf-8'),
+                    codecs.BOM_UTF16_BE+data.encode('utf-16-be'),
+                    codecs.BOM_UTF16_LE+data.encode('utf-16-le')]:
+        if verbose:
+            print("INPUT:", repr(input[:10]), "...")
+        output = yaml.load(input)
+        assert output == value, (output, value)
+        output = yaml.load(io.BytesIO(input))
+        assert output == value, (output, value)
+
+test_unicode_input.unittest = ['.unicode']
+
+def test_unicode_input_errors(unicode_filename, verbose=False):
+    data = open(unicode_filename, 'rb').read().decode('utf-8')
+    for input in [data.encode('latin1', 'ignore'),
+                    data.encode('utf-16-be'), data.encode('utf-16-le'),
+                    codecs.BOM_UTF8+data.encode('utf-16-be'),
+                    codecs.BOM_UTF16_BE+data.encode('utf-16-le'),
+                    codecs.BOM_UTF16_LE+data.encode('utf-8')+b'!']:
+        try:
+            yaml.load(input)
+        except yaml.YAMLError as exc:
+            if verbose:
+                print(exc)
+        else:
+            raise AssertionError("expected an exception")
+        try:
+            yaml.load(io.BytesIO(input))
+        except yaml.YAMLError as exc:
+            if verbose:
+                print(exc)
+        else:
+            raise AssertionError("expected an exception")
+
+test_unicode_input_errors.unittest = ['.unicode']
+
+def test_unicode_output(unicode_filename, verbose=False):
+    data = open(unicode_filename, 'rb').read().decode('utf-8')
+    value = ' '.join(data.split())
+    for allow_unicode in [False, True]:
+        data1 = yaml.dump(value, allow_unicode=allow_unicode)
+        for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
+            stream = io.StringIO()
+            yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode)
+            data2 = stream.getvalue()
+            data3 = yaml.dump(value, encoding=encoding, allow_unicode=allow_unicode)
+            stream = io.BytesIO()
+            if encoding is None:
+                try:
+                    yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode)
+                except TypeError as exc:
+                    if verbose:
+                        print(exc)
+                    data4 = None
+                else:
+                    raise AssertionError("expected an exception")
+            else:
+                yaml.dump(value, stream, encoding=encoding, allow_unicode=allow_unicode)
+                data4 = stream.getvalue()
+                if verbose:
+                    print("BYTES:", data4[:50])
+                data4 = data4.decode(encoding)
+            for copy in [data1, data2, data3, data4]:
+                if copy is None:
+                    continue
+                assert isinstance(copy, str)
+                if allow_unicode:
+                    try:
+                        copy[4:].encode('ascii')
+                    except UnicodeEncodeError as exc:
+                        if verbose:
+                            print(exc)
+                    else:
+                        raise AssertionError("expected an exception")
+                else:
+                    copy[4:].encode('ascii')
+            assert isinstance(data1, str), (type(data1), encoding)
+            assert isinstance(data2, str), (type(data2), encoding)
+
+test_unicode_output.unittest = ['.unicode']
+
+def test_unicode_transfer(unicode_filename, verbose=False):
+    data = open(unicode_filename, 'rb').read().decode('utf-8')
+    for encoding in [None, 'utf-8', 'utf-16-be', 'utf-16-le']:
+        input = data
+        if encoding is not None:
+            input = ('\ufeff'+input).encode(encoding)
+        output1 = yaml.emit(yaml.parse(input), allow_unicode=True)
+        if encoding is None:
+            stream = io.StringIO()
+        else:
+            stream = io.BytesIO()
+        yaml.emit(yaml.parse(input), stream, allow_unicode=True)
+        output2 = stream.getvalue()
+        assert isinstance(output1, str), (type(output1), encoding)
+        if encoding is None:
+            assert isinstance(output2, str), (type(output1), encoding)
+        else:
+            assert isinstance(output2, bytes), (type(output1), encoding)
+            output2.decode(encoding)
+
+test_unicode_transfer.unittest = ['.unicode']
+
+if __name__ == '__main__':
+    import test_appliance
+    test_appliance.run(globals())
+

tests/lib3/test_yaml.py

 from test_emitter import *
 from test_representer import *
 from test_recursive import *
+from test_input_output import *
 
 if __name__ == '__main__':
     import test_appliance

tests/lib3/test_yaml_ext.py

         globals()[function.__name__] = function
 
 import test_tokens, test_structure, test_errors, test_resolver, test_constructor,   \
-        test_emitter, test_representer, test_recursive
+        test_emitter, test_representer, test_recursive, test_input_output
 wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
-        test_emitter, test_representer, test_recursive])
+        test_emitter, test_representer, test_recursive, test_input_output])
 
 if __name__ == '__main__':
     import test_appliance
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.