Source

pyyaml-legacy / yaml / load.py

Full commit
import re, string
from implicit import convertImplicit
from inline import InlineTokenizer
from yaml.klass import DefaultResolver
from yaml.stream import YamlLoaderException, FileStream, StringStream, NestedDocs

try:
    iter(list()) # is iter supported by this version of Python?
except:
    # XXX - Python 2.1 does not support iterators   
    class StopIteration: pass
    class iter:
        def __init__(self,parser):
            self._docs = []
            try:
                while 1:
                   self._docs.append(parser.next())
            except StopIteration: pass
            self._idx = 0
        def __len__(self): return len(self._docs)
        def __getitem__(self,idx): return self._docs[idx]
        def next(self):
            if self._idx < len(self._docs):
                ret = self._docs[self._idx] 
                self._idx = self._idx + 1
                return ret
            raise StopIteration

def loadFile(filename, typeResolver=None):
    return loadStream(FileStream(filename),typeResolver)
   
def load(str, typeResolver=None):
    return loadStream(StringStream(str), typeResolver)

def l(str): return load(str).next()

def file(filename): return loadFile(filename).next()

def loadStream(stream, typeResolver):
    return iter(Parser(stream, typeResolver))

def tryProductions(productions, value):
    for production in productions:
        results = production(value)
        if results:
            (ok, result) = results
            if ok:
                return (1, result)

def dumpDictionary(): return {}

class Parser:
    def __init__(self, stream, typeResolver=None):
        try:
            self.dictionary = dict
        except:
            self.dictionary = dumpDictionary
        self.nestedDocs = NestedDocs(stream)
        self.aliases = {}
        if typeResolver:
            self.typeResolver = typeResolver
        else:
            self.typeResolver = DefaultResolver()

    def error(self, msg):
        self.nestedDocs.error(msg, self.line)

    def nestPop(self):
        line = self.nestedDocs.pop()
        if line is not None:
            self.line = line
            return 1

    def value(self, indicator):
        return getToken(indicator+"\s*(.*)", self.line)

    def getNextDocument(self): raise "getNextDocument() deprecated--use next()"

    def next(self):
        line = self.nestedDocs.popDocSep()
        indicator = getIndicator(line)
        if indicator:
            return self.parse_value(indicator)
        if line:
            self.nestedDocs.nestToNextLine()
            return self.parseLines()
        raise StopIteration

    def __iter__(self): return self

    def parseLines(self):
        peekLine = self.nestedDocs.peek()
        if peekLine:
            if re.match("\s*-", peekLine):
                return self.parse_collection([], self.parse_seq_line)
            else:
                return self.parse_collection(self.dictionary(), self.parse_map_line)
        raise StopIteration

    def parse_collection(self, items, lineParser):
        while self.nestPop():
            if self.line:
                lineParser(items)
        return items    

    def parse_seq_line(self, items):
        value = self.value("-")
        if value is not None:
            items.append(self.parse_seq_value(value))
        else:
            self.error("missing '-' for seq")

    def parse_map_line(self, items):
        if (self.line == '?'):
            self.parse_map_line_nested(items)
        else:
            self.parse_map_line_simple(items, self.line)

    def parse_map_line_nested(self, items):
        self.nestedDocs.nestToNextLine()
        key = self.parseLines()
        if self.nestPop():
            value = self.value(':')
            if value is not None:
                items[tuple(key)] = self.parse_value(value)
                return
        self.error("key has no value for nested map")

    def parse_map_line_simple(self, items, line):
        map_item = self.key_value(line)
        if map_item:
            (key, value) = map_item
            key = convertImplicit(key)
            if items.has_key(key):
                self.error("Duplicate key "+key)
            items[key] = self.parse_value(value)
        else:
            self.error("bad key for map")

    def is_map(self, value):
        # XXX - need real tokenizer
        if len(value) == 0:
            return 0
        if value[0] == "'":
            return 0
        if re.search(':(\s|$)', value):       
            return 1

    def parse_seq_value(self, value):
        if self.is_map(value):
            return self.parse_compressed_map(value)
        else:
            return self.parse_value(value)

    def parse_compressed_map(self, value):
        items = self.dictionary()
        line = self.line
        token = getToken("(\s*-\s*)", line)
        self.nestedDocs.nestBySpecificAmount(len(token))
        self.parse_map_line_simple(items, value)
        return self.parse_collection(items, self.parse_map_line)

    def parse_value(self, value):
        (alias, value) = self.testForRepeatOfAlias(value)
        if alias:
            return value
        (alias, value) = self.testForAlias(value)            
        value = self.parse_unaliased_value(value)
        if alias:
            self.aliases[alias] = value
        return value          

    def parse_unaliased_value(self, value):
        match = re.match(r"(!\S*)(.*)", value)
        if match:
            (url, value) = match.groups()
            value = self.parse_untyped_value(value)
            if url[:2] == '!!':
                return self.typeResolver.resolveType(value, url)
            else:
                # XXX - allows syntax, but ignores it
                return value
        return self.parse_untyped_value(value)

    def parseInlineArray(self, value):        
        if re.match("\s*\[", value):
            return self.parseInline([], value, ']', 
                self.parseInlineArrayItem)

    def parseInlineHash(self, value):        
        if re.match("\s*{", value):
            return self.parseInline(self.dictionary(), value, '}', 
                self.parseInlineHashItem)

    def parseInlineArrayItem(self, result, token):
        return result.append(convertImplicit(token))

    def parseInlineHashItem(self, result, token):
        (key, value) = self.key_value(token)
        result[key] = value

    def parseInline(self, result, value, end_marker, itemMethod):
        tokenizer = InlineTokenizer(value)
        tokenizer.next()
        while 1:
            token = tokenizer.next()
            if token == end_marker:
                break
            itemMethod(result, token)
        return (1, result)

    def parseSpecial(self, value):
        productions = [
            self.parseMultiLineScalar,
            self.parseInlineHash,
            self.parseInlineArray,
        ]
        return tryProductions(productions, value)

    def parse_untyped_value(self, value):
        parse = self.parseSpecial(value)
        if parse:
            (ok, data) = parse
            return data
        token = getToken("(\S.*)", value)
        if token:
            lines = [token] + \
                pruneTrailingEmpties(self.nestedDocs.popNestedLines())
            return convertImplicit(joinLines(lines))
        else:
            self.nestedDocs.nestToNextLine()
            return self.parseLines()

    def parseNative(self, value):
        return (1, convertImplicit(value))

    def parseMultiLineScalar(self, value):
        if value == '>':
            return (1, self.parseFolded())
        elif value == '|':
            return (1, joinLiteral(self.parseBlock()))
        elif value == '|+':
            return (1, joinLiteral(self.unprunedBlock()))

    def parseFolded(self):
        data = self.parseBlock()
        i = 0
        resultString = ''
        while i < len(data)-1:
            resultString = resultString + data[i]
            resultString = resultString + foldChar(data[i], data[i+1])
            i = i + 1
        return resultString + data[-1] + "\n"        

    def unprunedBlock(self):
        self.nestedDocs.nestToNextLine()
        data = []
        while self.nestPop():
            data.append(self.line)
        return data

    def parseBlock(self):
        return pruneTrailingEmpties(self.unprunedBlock())

    def testForAlias(self, value):
        match = re.match("&(\S*)\s*(.*)", value)
        if match:
            return match.groups()
        return (None, value)

    def testForRepeatOfAlias(self, value):
        match = re.match("\*(\S+)", value)
        if match:
            alias = match.groups()[0]
            if self.aliases.has_key(alias):
                return (alias, self.aliases[alias])
            else:
                self.error("Unknown alias")
        return (None, value)

    def key_value(self, str):
        #if str[-1] == ' ':
        #    self.error("Trailing spaces not allowed without quotes.")
        # XXX This allows mis-balanced " vs. ' stuff
        match = re.match("[\"'](.+)[\"']\s*:\s*(.*)", str)
        if match:
            (key, value) = match.groups()
            return (key, value)
        match = re.match("(.+?)\s*:\s*(.*)", str)
        if match:
            (key, value) = match.groups()
            if len(value) and value[0] == '#':
                value = ''
            return (key, value)

def getToken(regex, value):
    match = re.search(regex, value)
    if match:
        return match.groups()[0]

def pruneTrailingEmpties(data):
    while len(data) > 0 and data[-1] == '':
        data = data[:-1]
    return data

def foldChar(line1, line2):
    if re.match("^\S", line1) and re.match("^\S", line2):
        return " "
    return "\n"

def getIndicator(line):
    if line:
        header = r"(#YAML:\d+\.\d+\s*){0,1}"
        match = re.match("--- "+header+"(\S*.*)", line)
        if match:
            return match.groups()[-1]

def joinLines(lines):
    result = ''
    for line in lines[:-1]:
        if line[-1] == '\\':
            result = result + line[:-1]
        else:
            result = result + line + " "
    return result + lines[-1]

def joinLiteral(data):
    return string.join(data,"\n") + "\n"