Kirill Simonov avatar Kirill Simonov committed 26ef251

Emitter is done!!!

Comments (0)

Files changed (15)

lib/yaml/__init__.py

 from composer import *
 from resolver import *
 from constructor import *
+from emitter import *
 
 from tokens import *
 from events import *

lib/yaml/composer.py

 
     def compose_scalar_node(self):
         event = self.parser.get()
-        return ScalarNode(event.tag, event.value,
+        return ScalarNode(event.tag, event.value, event.implicit,
                 event.start_mark, event.end_mark)
 
     def compose_sequence_node(self):

lib/yaml/emitter.py

 class EmitterError(YAMLError):
     pass
 
+class ScalarAnalysis:
+    def __init__(self, scalar, empty, multiline,
+            allow_flow_plain, allow_block_plain,
+            allow_single_quoted, allow_double_quoted, allow_block):
+        self.scalar = scalar
+        self.empty = empty
+        self.multiline = multiline
+        self.allow_flow_plain = allow_flow_plain
+        self.allow_block_plain = allow_block_plain
+        self.allow_single_quoted = allow_single_quoted
+        self.allow_double_quoted = allow_double_quoted
+        self.allow_block = allow_block
+
 class Emitter:
 
     DEFAULT_TAG_PREFIXES = {
 
         # Characteristics of the last emitted character:
         #  - current position.
-        #  - is it a line break?
         #  - is it a whitespace?
         #  - is it an indention character
         #    (indentation space, '-', '?', or ':')?
         self.best_width = 80
         self.tag_prefixes = None
 
-        # Scalar analysis.
-        self.analysis = None
+        # Analyses cache.
+        self.anchor_text = None
+        self.tag_text = None
+        self.scalar_analysis = None
+        self.scalar_style = None
 
     def emit(self, event):
-        if self.events:
-            self.events.append(event)
-            event = self.events.pop(0)
-        self.event = event
-        if self.need_more_events():
-            self.event.insert(0, event)
-            return
-        self.state()
-        self.event = None
+        self.events.append(event)
+        while not self.need_more_events():
+            self.event = self.events.pop(0)
+            self.state()
+            self.event = None
 
     # In some cases, we wait for a few next events before emitting.
 
     def need_more_events(self):
-        if isinstance(self.event, DocumentStartEvent):
+        if not self.events:
+            return True
+        event = self.events[0]
+        if isinstance(event, DocumentStartEvent):
             return self.need_events(1)
-        elif isinstance(self.event, SequenceStartEvent):
+        elif isinstance(event, SequenceStartEvent):
             return self.need_events(2)
-        elif isinstance(self.event, MappingStartEvent):
+        elif isinstance(event, MappingStartEvent):
             return self.need_events(3)
         else:
             return False
 
     def need_events(self, count):
         level = 0
-        for event in self.events:
-            if isinstance(event, (DocumentStart, CollectionStart)):
+        for event in self.events[1:]:
+            if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
                 level += 1
-            elif isinstance(event, (DocumentEnd, CollectionEnd)):
+            elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
                 level -= 1
-            elif isinstance(event, StreamEnd):
+            elif isinstance(event, StreamEndEvent):
                 level = -1
             if level < 0:
                 return False
-        return (len(self.events) < count)
+        return (len(self.events) < count+1)
 
     def increase_indent(self, flow=False, indentless=False):
         self.indents.append(self.indent)
 
     def expect_stream_start(self):
         if isinstance(self.event, StreamStartEvent):
-            self.encoding = event.encoding
-            self.canonical = event.canonical
+            self.encoding = self.event.encoding
+            self.canonical = self.event.canonical
             if self.event.indent and self.event.indent > 1:
                 self.best_indent = self.event.indent
             if self.event.width and self.event.width > self.best_indent:
     def expect_document_start(self, first=False):
         if isinstance(self.event, DocumentStartEvent):
             if self.event.version:
-                self.write_version_directive(self.event.version)
+                version_text = self.analyze_version(self.event.version)
+                self.write_version_directive(version_text)
             self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
             if self.event.tags:
-                for handle in self.event.tags:
+                handles = self.event.tags.keys()
+                handles.sort()
+                for handle in handles:
                     prefix = self.event.tags[handle]
                     self.tag_prefixes[prefix] = handle
-                    self.write_tag_directive(handle, prefix)
-            implicit = (first and self.event.implicit and not self.canonical
+                    handle_text = self.analyze_tag_handle(handle)
+                    prefix_text = self.analyze_tag_prefix(prefix)
+                    self.write_tag_directive(handle_text, prefix_text)
+            implicit = (first and not self.event.explicit and not self.canonical
                     and not self.event.version and not self.event.tags
-                    and not self.check_next_empty_scalar())
+                    and not self.check_empty_document())
             if not implicit:
                 self.write_indent()
                 self.write_indicator(u'---', True)
     def expect_document_end(self):
         if isinstance(self.event, DocumentEndEvent):
             self.write_indent()
-            if not event.implicit:
+            if self.event.explicit:
                 self.write_indicator(u'...', True)
                 self.write_indent()
             self.state = self.expect_document_start
                     % self.event)
 
     def expect_document_root(self):
+        self.states.append(self.expect_document_end)
         self.expect_node(root=True)
 
     # Node handlers.
         self.simple_key_context = simple_key
         if isinstance(self.event, AliasEvent):
             self.expect_alias()
-        elif isinstance(event, (ScalarEvent, CollectionEvent)):
-            self.process_anchor()
+        elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
+            self.process_anchor(u'&')
             self.process_tag()
             if isinstance(self.event, ScalarEvent):
                 self.expect_scalar()
-            elif isinstance(self.event, SequenceEvent):
+            elif isinstance(self.event, SequenceStartEvent):
                 if self.flow_level or self.canonical or self.event.flow_style   \
                         or self.check_empty_sequence():
                     self.expect_flow_sequence()
                 else:
                     self.expect_block_sequence()
-            elif isinstance(self.event, MappingEvent):
+            elif isinstance(self.event, MappingStartEvent):
                 if self.flow_level or self.canonical or self.event.flow_style   \
                         or self.check_empty_mapping():
                     self.expect_flow_mapping()
             raise EmitterError("expected NodeEvent, but got %s" % self.event)
 
     def expect_alias(self):
-        self.write_anchor(u'*', self.event.anchor)
+        if self.event.anchor is None:
+            raise EmitterError("anchor is not specified for alias")
+        self.process_anchor(u'*')
         self.state = self.states.pop()
 
     def expect_scalar(self):
         return self.expect_block_mapping_key(first=True)
 
     def expect_block_mapping_key(self, first=False):
-        if not first and isinstance(self.event, SequenceEndEvent):
+        if not first and isinstance(self.event, MappingEndEvent):
             self.indent = self.indents.pop()
             self.state = self.states.pop()
         else:
         self.states.append(self.expect_block_mapping_key)
         self.expect_node(mapping=True)
 
+    # Checkers.
+
+    def check_empty_sequence(self):
+        return (isinstance(self.event, SequenceStartEvent) and self.events
+                and isinstance(self.events[0], SequenceEndEvent))
+
+    def check_empty_mapping(self):
+        return (isinstance(self.event, MappingStartEvent) and self.events
+                and isinstance(self.events[0], MappingEndEvent))
+
+    def check_empty_document(self):
+        if not isinstance(self.event, DocumentStartEvent) or not self.events:
+            return False
+        event = self.events[0]
+        return (isinstance(event, ScalarEvent) and event.anchor is None
+                and event.tag is None and event.implicit and event.value == u'')
+
+    def check_simple_key(self):
+        length = 0
+        if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
+            if self.anchor_text is None:
+                self.anchor_text = self.analyze_anchor(self.event.anchor)
+            length += len(self.anchor_text)
+        if isinstance(self.event, (ScalarEvent, CollectionStartEvent))  \
+                and self.event.tag is not None:
+            if self.tag_text is None:
+                self.tag_text = self.analyze_tag(self.event.tag)
+            length += len(self.tag_text)
+        if isinstance(self.event, ScalarEvent):
+            if self.scalar_analysis is None:
+                self.scalar_analysis = self.analyze_scalar(self.event.value)
+            length += len(self.scalar_analysis.scalar)
+        return (length < 128 and (isinstance(self.event, AliasEvent)
+            or (isinstance(self.event, ScalarEvent) and not self.scalar_analysis.multiline)
+            or self.check_empty_sequence() or self.check_empty_mapping()))
+
+    # Anchor, Tag, and Scalar processors.
+
+    def process_anchor(self, indicator):
+        if self.event.anchor is None:
+            return
+        if self.anchor_text is None:
+            self.anchor_text = self.analyze_anchor(self.event.anchor)
+        if self.anchor_text:
+            self.write_indicator(indicator+self.anchor_text, True)
+        self.anchor_text = None
+
+    def process_tag(self):
+        if self.event.tag is None:
+            return
+        if isinstance(self.event, ScalarEvent) and self.best_scalar_style() == '':
+            return
+        if self.tag_text is None:
+            self.tag_text = self.analyze_tag(self.event.tag)
+        if self.tag_text:
+            self.write_indicator(self.tag_text, True)
+        self.tag_text = None
+
+    def best_scalar_style(self):
+        if self.scalar_analysis is None:
+            self.scalar_analysis = self.analyze_scalar(self.event.value)
+        if self.canonical:
+            return '"'
+        if (self.event.implicit and not self.event.style
+                and ((self.flow_level and self.scalar_analysis.allow_flow_plain)
+                    or (not self.flow_level and self.scalar_analysis.allow_block_plain))
+                and (len(self.scalar_analysis.scalar) > 0
+                    or (not self.flow_level and not self.simple_key_context))):
+            return ''
+        elif self.event.style == '\'' and self.scalar_analysis.allow_single_quoted:
+            return '\''
+        elif self.event.style in ['|', '>'] and not self.flow_level and self.scalar_analysis.allow_block:
+            return self.event.style
+        else:
+            return '"'
+        return style
+
+    def process_scalar(self):
+        if self.scalar_analysis is None:
+            self.scalar_analysis = self.analyze_scalar(self.event.value)
+        style = self.best_scalar_style()
+        if self.scalar_analysis.multiline and not self.simple_key_context   \
+                and style not in ['|', '>']:
+            self.write_indent()
+        if style == '"':
+            self.write_double_quoted(self.scalar_analysis.scalar,
+                    split=(not self.simple_key_context))
+        elif style == '\'':
+            self.write_single_quoted(self.scalar_analysis.scalar,
+                    split=(not self.simple_key_context))
+        elif style == '>':
+            self.write_folded(self.scalar_analysis.scalar)
+        elif style == '|':
+            self.write_literal(self.scalar_analysis.scalar)
+        else:
+            self.write_plain(self.scalar_analysis.scalar,
+                    split=(not self.simple_key_context))
+        self.scalar_analysis = None
+
+    # Analyzers.
+
+    def analyze_version(self, version):
+        major, minor = version
+        if major != 1:
+            raise EmitterError("unsupported YAML version: %d.%d" % (major, minor))
+        return u'%d.%d' % (major, minor)
+
+    def analyze_tag_handle(self, handle):
+        if not handle:
+            raise EmitterError("tag handle must not be empty")
+        if handle[0] != u'!' or handle[-1] != u'!':
+            raise EmitterError("tag handle must start and end with '!': %r"
+                    % (handle.encode('utf-8')))
+        for ch in handle[1:-1]:
+            if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z'    \
+                    or ch in u'-_'):
+                raise EmitterError("invalid character %r in the tag handle: %r"
+                        % (ch.encode('utf-8'), handle.encode('utf-8')))
+        return handle
+
+    def analyze_tag_prefix(self, prefix):
+        if not prefix:
+            raise EmitterError("tag prefix must not be empty")
+        chunks = []
+        start = end = 0
+        if prefix[0] == u'!':
+            end = 1
+        while end < len(prefix):
+            ch = prefix[end]
+            if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z'  \
+                    or ch in u'-;/?!:@&=+$,_.~*\'()[]':
+                end += 1
+            else:
+                if start < end:
+                    chunks.append(prefix[start:end])
+                start = end = end+1
+                data = ch.encode('utf-8')
+                for ch in data:
+                    chunks.append(u'%%%02X' % ord(ch))
+        if start < end:
+            chunks.append(prefix[start:end])
+        return u''.join(chunks)
+
+    def analyze_tag(self, tag):
+        if not tag:
+            raise EmitterError("tag must not be empty")
+        handle = None
+        suffix = tag
+        for prefix in self.tag_prefixes:
+            if tag.startswith(prefix)   \
+                    and (prefix == u'!' or len(prefix) < len(tag)):
+                handle = self.tag_prefixes[prefix]
+                suffix = tag[len(prefix):]
+        chunks = []
+        start = end = 0
+        while end < len(suffix):
+            ch = suffix[end]
+            if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z'  \
+                    or ch in u'-;/?:@&=+$,_.~*\'()[]'   \
+                    or (ch == u'!' and handle != u'!'):
+                end += 1
+            else:
+                if start < end:
+                    chunks.append(suffix[start:end])
+                start = end = end+1
+                data = ch.encode('utf-8')
+                for ch in data:
+                    chunks.append(u'%%%02X' % ord(ch))
+        if start < end:
+            chunks.append(suffix[start:end])
+        suffix_text = u''.join(chunks)
+        if handle:
+            return u'%s%s' % (handle, suffix_text)
+        else:
+            return u'!<%s>' % suffix_text
+
+    def analyze_anchor(self, anchor):
+        if not anchor:
+            raise EmitterError("anchor must not be empty")
+        for ch in anchor:
+            if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z'    \
+                    or ch in u'-_'):
+                raise EmitterError("invalid character %r in the anchor: %r"
+                        % (ch.encode('utf-8'), text.encode('utf-8')))
+        return anchor
+
+    def analyze_scalar(self, scalar):   # It begs for refactoring.
+        if not scalar:
+            return ScalarAnalysis(scalar=scalar, empty=True, multiline=False,
+                    allow_flow_plain=False, allow_block_plain=True,
+                    allow_single_quoted=True, allow_double_quoted=True,
+                    allow_block=False)
+        contains_block_indicator = False
+        contains_flow_indicator = False
+        contains_line_breaks = False
+        contains_unicode_characters = False
+        contains_special_characters = False
+        contains_inline_spaces = False          # non-space space+ non-space
+        contains_inline_breaks = False          # non-space break+ non-space
+        contains_leading_spaces = False         # ^ space+ (non-space | $)
+        contains_leading_breaks = False         # ^ break+ (non-space | $)
+        contains_trailing_spaces = False        # non-space space+ $
+        contains_trailing_breaks = False        # non-space break+ $
+        contains_inline_breaks_spaces = False   # non-space break+ space+ non-space
+        contains_mixed_breaks_spaces = False    # anything else
+        if scalar.startswith(u'---') or scalar.startswith(u'...'):
+            contains_block_indicator = True
+            contains_flow_indicator = True
+        first = True
+        last = (len(scalar) == 1)
+        preceeded_by_space = False
+        followed_by_space = (len(scalar) > 1 and
+                scalar[1] in u'\0 \t\r\n\x85\u2028\u2029')
+        spaces = breaks = mixed = leading = False
+        index = 0
+        while index < len(scalar):
+            ch = scalar[index]
+            if first:
+                if ch in u'#,[]{}#&*!|>\'\"%@`': 
+                    contains_flow_indicator = True
+                    contains_block_indicator = True
+                if ch in u'?:':
+                    contains_flow_indicator = True
+                    if followed_by_space or last:
+                        contains_block_indicator = True
+                if ch == u'-' and followed_by_space or last:
+                    contains_flow_indicator = True
+                    contains_block_indicator = True
+            else:
+                if ch in u',?[]{}':
+                    contains_flow_indicator = True
+                if ch == u':':
+                    contains_flow_indicator = True
+                    if followed_by_space or last:
+                        contains_block_indicator = True
+                if ch == u'#' and preceeded_by_space:
+                    contains_flow_indicator = True
+                    contains_block_indicator = True
+            if ch in u'\n\x85\u2028\u2029':
+                contains_line_breaks = True
+            if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'):
+                if ch < u'\x80':
+                    contains_special_characters = True
+                else:
+                    contains_special_characters = True
+                    # TODO: We need an option to allow unescaped unicode
+                    # characters.
+                    contains_unicode_characters = True
+            if ch == u' ':
+                if not spaces and not breaks:
+                    leading = first
+                spaces = True
+            elif ch in u'\n\x85\u2028\u2029':
+                if not spaces and not breaks:
+                    leading = first
+                breaks = True
+                if spaces:
+                    mixed = True
+            if ch not in u' \n\x85\u2028\u2029':
+                if leading:
+                    if spaces and breaks:
+                        contains_mixed_breaks_spaces = True
+                    elif spaces:
+                        contains_leading_spaces = True
+                    elif breaks:
+                        contains_leading_breaks = True
+                else:
+                    if mixed:
+                        contains_mixed_break_spaces = True
+                    elif spaces and breaks:
+                        contains_inline_breaks_spaces = True
+                    elif spaces:
+                        contains_inline_spaces = True
+                    elif breaks:
+                        contains_inline_breaks = True
+                spaces = breaks = mixed = leading = False
+            elif last:
+                if spaces and breaks:
+                    contains_mixed_break_spaces = True
+                elif spaces:
+                    if leading:
+                        contains_leading_spaces = True
+                    else:
+                        contains_trailing_spaces = True
+                elif breaks:
+                    if leading:
+                        contains_leading_breaks = True
+                    else:
+                        contains_trailing_breaks = True
+            index += 1
+            first = False
+            last = (index+1 == len(scalar))
+            preceeded_by_space = (ch in u'\0 \t\r\n\x85\u2028\u2029')
+            followed_by_space = (index+1 < len(scalar) and
+                    scalar[index+1] in u'\0 \t\r\n\x85\u2028\u2029')
+        allow_flow_plain = not (contains_flow_indicator or contains_special_characters
+            or contains_leading_spaces or contains_leading_breaks
+            or contains_trailing_spaces or contains_trailing_breaks
+            or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
+        allow_block_plain = not (contains_block_indicator or contains_special_characters
+            or contains_leading_spaces or contains_leading_breaks
+            or contains_trailing_spaces or contains_trailing_breaks
+            or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
+        allow_single_quoted = not (contains_special_characters
+            or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
+        allow_double_quoted = True
+        allow_block = not (contains_special_characters
+            or contains_leading_spaces or contains_leading_breaks
+            or contains_trailing_spaces or contains_mixed_breaks_spaces)
+        return ScalarAnalysis(scalar=scalar, empty=False, multiline=contains_line_breaks,
+                allow_flow_plain=allow_flow_plain, allow_block_plain=allow_block_plain,
+                allow_single_quoted=allow_single_quoted, allow_double_quoted=allow_double_quoted,
+                allow_block=allow_block)
+
     # Writers.
 
     def write_stream_start(self):
 
     def write_indicator(self, indicator, need_whitespace,
             whitespace=False, indention=False):
-        if self.whitespace:
+        if self.whitespace or not need_whitespace:
             data = indicator
         else:
             data = u' '+indicator
-        self.writespace = whitespace
+        self.whitespace = whitespace
         self.indention = self.indention and indention
         self.column += len(data)
         if self.encoding:
 
     def write_indent(self):
         indent = self.indent or 0
-        if not self.indention or self.column > indent:
+        if not self.indention or self.column > indent   \
+                or (self.column == indent and not self.whitespace):
             self.write_line_break()
         if self.column < indent:
+            self.whitespace = True
             data = u' '*(indent-self.column)
             self.column = indent
             if self.encoding:
                 data = data.encode(self.encoding)
             self.writer.write(data)
 
-    def write_line_break(self):
-        data = self.best_line_break
+    def write_line_break(self, data=None):
+        if data is None:
+            data = self.best_line_break
         self.whitespace = True
         self.indention = True
         self.line += 1
             data = data.encode(self.encoding)
         self.writer.write(data)
 
+    def write_version_directive(self, version_text):
+        data = u'%%YAML %s' % version_text
+        if self.encoding:
+            data = data.encode(self.encoding)
+        self.writer.write(data)
+        self.write_line_break()
+
+    def write_tag_directive(self, handle_text, prefix_text):
+        data = u'%%TAG %s %s' % (handle_text, prefix_text)
+        if self.encoding:
+            data = data.encode(self.encoding)
+        self.writer.write(data)
+        self.write_line_break()
+
+    # Scalar writers.
+
+    def write_single_quoted(self, text, split=True):
+        self.write_indicator(u'\'', True)
+        spaces = False
+        breaks = False
+        start = end = 0
+        while end <= len(text):
+            ch = None
+            if end < len(text):
+                ch = text[end]
+            if spaces:
+                if ch is None or ch != u' ':
+                    if start+1 == end and self.column > self.best_width and split   \
+                            and start != 0 and end != len(text):
+                        self.write_indent()
+                    else:
+                        data = text[start:end]
+                        self.column += len(data)
+                        if self.encoding:
+                            data = data.encode(self.encoding)
+                        self.writer.write(data)
+                    start = end
+            elif breaks:
+                if ch is None or ch not in u'\n\x85\u2028\u2029':
+                    if text[start] == u'\n':
+                        self.write_line_break()
+                    for br in text[start:end]:
+                        if br == u'\n':
+                            self.write_line_break()
+                        else:
+                            self.write_line_break(br)
+                    self.write_indent()
+                    start = end
+            else:
+                if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u'\'':
+                    if start < end:
+                        data = text[start:end]
+                        self.column += len(data)
+                        if self.encoding:
+                            data = data.encode(self.encoding)
+                        self.writer.write(data)
+                        start = end
+                    if ch == u'\'':
+                        data = u'\'\''
+                        self.column += 2
+                        if self.encoding:
+                            data = data.encode(self.encoding)
+                        self.writer.write(data)
+                        start = end + 1
+            if ch is not None:
+                spaces = (ch == u' ')
+                breaks = (ch in u'\n\x85\u2028\u2029')
+            end += 1
+        self.write_indicator(u'\'', False)
+
+    ESCAPE_REPLACEMENTS = {
+        u'\0':      u'0',
+        u'\x07':    u'a',
+        u'\x08':    u'b',
+        u'\x09':    u't',
+        u'\x0A':    u'n',
+        u'\x0B':    u'v',
+        u'\x0C':    u'f',
+        u'\x0D':    u'r',
+        u'\x1B':    u'e',
+        u'\"':      u'\"',
+        u'\\':      u'\\',
+        u'\x85':    u'N',
+        u'\xA0':    u'_',
+        u'\u2028':  u'L',
+        u'\u2029':  u'P',
+    }
+
+    def write_double_quoted(self, text, split=True):
+        self.write_indicator(u'"', True)
+        start = end = 0
+        while end <= len(text):
+            ch = None
+            if end < len(text):
+                ch = text[end]
+            if ch is None or not (u'\x20' <= ch <= u'\x7E') or ch in u'"\\':
+                if start < end:
+                    data = text[start:end]
+                    self.column += len(data)
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+                    start = end
+                if ch is not None:
+                    if ch in self.ESCAPE_REPLACEMENTS:
+                        data = u'\\'+self.ESCAPE_REPLACEMENTS[ch]
+                    elif ch <= u'\xFF':
+                        data = u'\\x%02X' % ord(ch)
+                    elif ch <= u'\uFFFF':
+                        data = u'\\u%04X' % ord(ch)
+                    else:
+                        data = u'\\U%08X' % ord(ch)
+                    self.column += len(data)
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+                    start = end+1
+            if 0 < end < len(text)-1 and (ch == u' ' or start >= end)   \
+                    and self.column+(end-start) > self.best_width and split:
+                data = text[start:end]+u'\\'
+                if start < end:
+                    start = end
+                self.column += len(data)
+                if self.encoding:
+                    data = data.encode(self.encoding)
+                self.writer.write(data)
+                self.write_indent()
+                self.whitespace = False
+                self.indention = False
+                if ch == u' ':
+                    data = u'\\'
+                    self.column += len(data)
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+            end += 1
+        self.write_indicator(u'"', False)
+
+    def determine_chomp(self, text):
+        tail = text[-2:]
+        while len(tail) < 2:
+            tail = u' '+tail
+        if tail[-1] in u'\n\x85\u2028\u2029':
+            if tail[-2] in u'\n\x85\u2028\u2029':
+                return u'+'
+            else:
+                return u''
+        else:
+            return u'-'
+
+    def write_folded(self, text):
+        chomp = self.determine_chomp(text)
+        self.write_indicator(u'>'+chomp, True)
+        self.write_indent()
+        leading_space = False
+        spaces = False
+        breaks = False
+        start = end = 0
+        while end <= len(text):
+            ch = None
+            if end < len(text):
+                ch = text[end]
+            if breaks:
+                if ch is None or ch not in u'\n\x85\u2028\u2029':
+                    if not leading_space and ch is not None and ch != u' '  \
+                            and text[start] == u'\n':
+                        self.write_line_break()
+                    leading_space = (ch == u' ')
+                    for br in text[start:end]:
+                        if br == u'\n':
+                            self.write_line_break()
+                        else:
+                            self.write_line_break(br)
+                    if ch is not None:
+                        self.write_indent()
+                    start = end
+            elif spaces:
+                if ch != u' ':
+                    if start+1 == end and self.column > self.best_width:
+                        self.write_indent()
+                    else:
+                        data = text[start:end]
+                        self.column += len(data)
+                        if self.encoding:
+                            data = data.encode(self.encoding)
+                        self.writer.write(data)
+                    start = end
+            else:
+                if ch is None or ch in u' \n\x85\u2028\u2029':
+                    data = text[start:end]
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+                    if ch is None:
+                        self.write_line_break()
+                    start = end
+            if ch is not None:
+                breaks = (ch in u'\n\x85\u2028\u2029')
+                spaces = (ch == u' ')
+            end += 1
+
+    def write_literal(self, text):
+        chomp = self.determine_chomp(text)
+        self.write_indicator(u'|'+chomp, True)
+        self.write_indent()
+        breaks = False
+        start = end = 0
+        while end <= len(text):
+            ch = None
+            if end < len(text):
+                ch = text[end]
+            if breaks:
+                if ch is None or ch not in u'\n\x85\u2028\u2029':
+                    for br in text[start:end]:
+                        if br == u'\n':
+                            self.write_line_break()
+                        else:
+                            self.write_line_break(br)
+                    if ch is not None:
+                        self.write_indent()
+                    start = end
+            else:
+                if ch is None or ch in u'\n\x85\u2028\u2029':
+                    data = text[start:end]
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+                    if ch is None:
+                        self.write_line_break()
+                    start = end
+            if ch is not None:
+                breaks = (ch in u'\n\x85\u2028\u2029')
+            end += 1
+
+    def write_plain(self, text, split=True):
+        if not text:
+            return
+        if not self.whitespace:
+            data = u' '
+            self.column += len(data)
+            if self.encoding:
+                data = data.encode(self.encoding)
+            self.writer.write(data)
+        self.writespace = False
+        self.indention = False
+        spaces = False
+        breaks = False
+        start = end = 0
+        while end <= len(text):
+            ch = None
+            if end < len(text):
+                ch = text[end]
+            if spaces:
+                if ch != u' ':
+                    if start+1 == end and self.column > self.best_width and split:
+                        self.write_indent()
+                        self.writespace = False
+                        self.indention = False
+                    else:
+                        data = text[start:end]
+                        self.column += len(data)
+                        if self.encoding:
+                            data = data.encode(self.encoding)
+                        self.writer.write(data)
+                    start = end
+            elif breaks:
+                if ch not in u'\n\x85\u2028\u2029':
+                    if text[start] == u'\n':
+                        self.write_line_break()
+                    for br in text[start:end]:
+                        if br == u'\n':
+                            self.write_line_break()
+                        else:
+                            self.write_line_break(br)
+                    self.write_indent()
+                    self.whitespace = False
+                    self.indention = False
+                    start = end
+            else:
+                if ch is None or ch in u' \n\x85\u2028\u2029':
+                    data = text[start:end]
+                    self.column += len(data)
+                    if self.encoding:
+                        data = data.encode(self.encoding)
+                    self.writer.write(data)
+                    start = end
+            if ch is not None:
+                spaces = (ch == u' ')
+                breaks = (ch in u'\n\x85\u2028\u2029')
+            end += 1
+

lib/yaml/events.py

 
 class DocumentStartEvent(Event):
     def __init__(self, start_mark=None, end_mark=None,
-            implicit=None, version=None, tags=None):
+            explicit=None, version=None, tags=None):
         self.start_mark = start_mark
         self.end_mark = end_mark
-        self.implicit = implicit
+        self.explicit = explicit
         self.version = version
         self.tags = tags
 
 class DocumentEndEvent(Event):
     def __init__(self, start_mark=None, end_mark=None,
-            implicit=None):
+            explicit=None):
         self.start_mark = start_mark
         self.end_mark = end_mark
-        self.implicit = implicit
+        self.explicit = explicit
 
 class AliasEvent(NodeEvent):
     pass

lib/yaml/nodes.py

 
 class ScalarNode(Node):
     id = 'scalar'
+    def __init__(self, tag, value, implicit, start_mark, end_mark):
+        self.tag = tag
+        self.value = value
+        self.implicit = implicit
+        self.start_mark = start_mark
+        self.end_mark = end_mark
 
 class CollectionNode(Node):
     pass

lib/yaml/parser.py

             token = self.scanner.peek()
             start_mark = end_mark = token.start_mark
             yield DocumentStartEvent(start_mark, end_mark,
-                    implicit=True)
+                    explicit=False)
             for event in self.parse_block_node():
                 yield event
             token = self.scanner.peek()
             start_mark = end_mark = token.start_mark
-            implicit = True
+            explicit = False
             while self.scanner.check(DocumentEndToken):
                 token = self.scanner.get()
                 end_mark = token.end_mark
-                implicit = True
+                explicit = True
             yield DocumentEndEvent(start_mark, end_mark,
-                    implicit=implicit)
+                    explicit=explicit)
 
         # Parse explicit documents.
         while not self.scanner.check(StreamEndToken):
             token = self.scanner.get()
             end_mark = token.end_mark
             yield DocumentStartEvent(start_mark, end_mark,
-                    implicit=False, version=version, tags=tags)
+                    explicit=True, version=version, tags=tags)
             if self.scanner.check(DirectiveToken,
                     DocumentStartToken, DocumentEndToken, StreamEndToken):
                 yield self.process_empty_scalar(token.end_mark)
                     yield event
             token = self.scanner.peek()
             start_mark = end_mark = token.start_mark
-            implicit=True
+            explicit = False
             while self.scanner.check(DocumentEndToken):
                 token = self.scanner.get()
                 end_mark = token.end_mark
-                implicit=False
+                explicit=True
             yield DocumentEndEvent(start_mark, end_mark,
-                    implicit=implicit)
+                    explicit=explicit)
 
         # Parse end of stream.
         token = self.scanner.get()
                     tag = self.tag_handles[handle]+suffix
                 else:
                     tag = suffix
-            if tag is None:
-                if not (self.scanner.check(ScalarToken) and
-                        self.scanner.peek().implicit):
-                    tag = u'!'
+            #if tag is None:
+            #    if not (self.scanner.check(ScalarToken) and
+            #            self.scanner.peek().implicit):
+            #        tag = u'!'
             if start_mark is None:
                 start_mark = end_mark = self.scanner.peek().start_mark
             event = None
                 if self.scanner.check(ScalarToken):
                     token = self.scanner.get()
                     end_mark = token.end_mark
+                    implicit = (tag is None and token.implicit)
                     event = ScalarEvent(anchor, tag, token.value,
                             start_mark, end_mark,
-                            implicit=token.implicit, style=token.style)
+                            implicit=implicit, style=token.style)
                 elif self.scanner.check(FlowSequenceStartToken):
                     end_mark = self.scanner.peek().end_mark
                     event = SequenceStartEvent(anchor, tag, start_mark, end_mark,
         while not self.scanner.check(FlowSequenceEndToken):
             if self.scanner.check(KeyToken):
                 token = self.scanner.get()
-                yield MappingStartEvent(None, u'!',
+                yield MappingStartEvent(None, None, # u'!',
                         token.start_mark, token.end_mark,
                         flow_style=True)
                 if not self.scanner.check(ValueToken,

lib/yaml/resolver.py

                 self.resolve_node(path+[node, key], node.value[key])
 
     def resolve_scalar(self, path, node):
-        if node.tag is None:
+        if node.tag is None and node.implicit:
             node.tag = self.detect_scalar(node.value)
         if node.tag is None or node.tag == u'!':
             node.tag = self.DEFAULT_SCALAR_TAG

lib/yaml/scanner.py

         ch = self.reader.peek()
         return ch not in u'\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`'  \
                 or (self.reader.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029'
-                        and (ch == '-' or (not self.flow_level and ch in u'?:')))
+                        and (ch == u'-' or (not self.flow_level and ch in u'?:')))
 
     # Scanners.
 

tests/data/documents.events

 - !StreamStart
-- !DocumentStart
-- !Scalar { implicit: true }
+- !DocumentStart { explicit: false }
+- !Scalar { implicit: true, value: 'data' }
 - !DocumentEnd
 - !DocumentStart
 - !Scalar { implicit: true }
 - !DocumentEnd
-- !DocumentStart
+- !DocumentStart { version: [1,1], tags: { '!': '!foo', '!yaml!': 'tag:yaml.org,2002:', '!ugly!': '!!!!!!!' } }
 - !Scalar { implicit: true }
 - !DocumentEnd
 - !StreamEnd

tests/data/mappings.events

 - !MappingEnd
 - !MappingEnd
 - !Scalar { implicit: true, value: 'flow mapping' }
-- !MappingStart { flow: true }
+- !MappingStart { flow_style: true }
 - !Scalar { implicit: true, value: 'key' }
 - !Scalar { implicit: true, value: 'value' }
 - !MappingStart

tests/data/sequences.events

 
 - !DocumentStart
 - !SequenceStart
-- !SequenceStart { flow: true }
+- !SequenceStart { flow_style: true }
 - !SequenceStart
 - !SequenceEnd
 - !Scalar

tests/data/tags.events

+- !StreamStart
+- !DocumentStart
+- !SequenceStart
+- !Scalar { value: 'data' }
+- !Scalar { tag: '!', value: 'data' }
+- !Scalar { tag: 'tag:yaml.org,2002:str', value: 'data' }
+- !Scalar { tag: '!myfunnytag', value: 'data' }
+- !Scalar { tag: '!my!ugly!tag', value: 'data' }
+- !Scalar { tag: 'tag:my.domain.org,2002:data!? #', value: 'data' }
+- !SequenceEnd
+- !DocumentEnd
+- !StreamEnd

tests/test_appliance.py

             anchor = None
             if self.test_token(AnchorToken):
                 anchor = self.get_value()
-            tag = u'!'
+            tag = None
             if self.test_token(TagToken):
                 tag = self.get_value()
             if self.test_token(ScalarToken):

tests/test_emitter.py

 from yaml import *
 import yaml
 
-class TestEmitterOnCanonical(test_appliance.TestAppliance):
+class TestEmitter(test_appliance.TestAppliance):
 
-    def _testEmitterOnCanonical(self, test_name, canonical_filename):
-        events = list(iter(Parser(Scanner(Reader(file(canonical_filename, 'rb'))))))
-        #writer = sys.stdout
+    def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
+        self._testEmitter(test_name, data_filename)
+
+    def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
+        self._testEmitter(test_name, canonical_filename, False)
+
+    def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
+        self._testEmitter(test_name, canonical_filename, True)
+
+    def _testEmitter(self, test_name, filename, canonical=None):
+        events = list(iter(Parser(Scanner(Reader(file(filename, 'rb'))))))
+        if canonical is not None:
+            events[0].canonical = canonical
+        #self._dump(filename, events)
         writer = StringIO.StringIO()
         emitter = Emitter(writer)
-        #print "-"*30
-        #print "ORIGINAL DATA:"
-        #print file(canonical_filename, 'rb').read()
         for event in events:
             emitter.emit(event)
         data = writer.getvalue()
         new_events = list(parse(data))
-        self.failUnlessEqual(len(events), len(new_events))
         for event, new_event in zip(events, new_events):
             self.failUnlessEqual(event.__class__, new_event.__class__)
+            if isinstance(event, NodeEvent):
+                self.failUnlessEqual(event.anchor, new_event.anchor)
+            if isinstance(event, CollectionStartEvent):
+                self.failUnlessEqual(event.tag, new_event.tag)
+            if isinstance(event, ScalarEvent):
+                #self.failUnlessEqual(event.implicit, new_event.implicit)
+                if not event.implicit and not new_event.implicit:
+                    self.failUnlessEqual(event.tag, new_event.tag)
+                self.failUnlessEqual(event.value, new_event.value)
 
-TestEmitterOnCanonical.add_tests('testEmitterOnCanonical', '.canonical')
+    def _dump(self, filename, events):
+        writer = sys.stdout
+        emitter = Emitter(writer)
+        print "="*30
+        print "ORIGINAL DOCUMENT:"
+        print file(filename, 'rb').read()
+        print '-'*30
+        print "EMITTED DOCUMENT:"
+        for event in events:
+            emitter.emit(event)
+        
+TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
+#TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
+#TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
 
 class EventsConstructor(Constructor):
 
 
 EventsConstructor.add_constructor(None, EventsConstructor.construct_event)
 
-class TestEmitter(test_appliance.TestAppliance):
+class TestEmitterEvents(test_appliance.TestAppliance):
 
-    def _testEmitter(self, test_name, events_filename):
-        events = load_document(file(events_filename, 'rb'), Constructor=EventsConstructor)
-        self._dump(events_filename, events)
+    def _testEmitterEvents(self, test_name, events_filename):
+        events = list(load_document(file(events_filename, 'rb'), Constructor=EventsConstructor))
+        #self._dump(events_filename, events)
         writer = StringIO.StringIO()
         emitter = Emitter(writer)
         for event in events:
         self.failUnlessEqual(len(events), len(new_events))
         for event, new_event in zip(events, new_events):
             self.failUnlessEqual(event.__class__, new_event.__class__)
+            if isinstance(event, NodeEvent):
+                self.failUnlessEqual(event.anchor, new_event.anchor)
+            if isinstance(event, CollectionStartEvent):
+                self.failUnlessEqual(event.tag, new_event.tag)
+            if isinstance(event, ScalarEvent):
+                self.failUnless(event.implicit == new_event.implicit
+                        or event.tag == new_event.tag)
+                self.failUnlessEqual(event.value, new_event.value)
 
     def _dump(self, events_filename, events):
         writer = sys.stdout
         for event in events:
             emitter.emit(event)
         
-TestEmitter.add_tests('testEmitter', '.events')
+TestEmitterEvents.add_tests('testEmitterEvents', '.events')
 

tests/test_yaml.py

 from test_errors import *
 from test_detector import *
 from test_constructor import *
-#from test_emitter import *
+from test_emitter import *
 from test_syck import *
 
 def main(module='__main__'):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.