1. Olemis Lang
  2. trac-gviz


trac-gviz / trac-dev / gviz / tracgviz / util / parsing.py

Olemis Lang f4e31f7 

Olemis Lang 2f29abc 

Olemis Lang f4e31f7 

Olemis Lang 98c842c 

Olemis Lang f4e31f7 

#!/usr/bin/env python

# Copyright 2009-2011 Olemis Lang <olemis at gmail.com>
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#       http://www.apache.org/licenses/LICENSE-2.0
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

r"""Generic parsing algorithms.

Copyright 2009-2011 Olemis Lang <olemis at gmail.com>
Licensed under the Apache License, Version 2.0 
__author__ = 'Olemis Lang'

__all__ = 'OperatorPrecedenceParser', 'Any', 'EndMarker', 'gen_precedence', \
__metaclass__ = type

from itertools import ifilter
import logging
from pygments.token import *

#   Operator precedence grammars

class OperatorPrecedenceParser:
  Shift-reduce parser for operator precedence grammars based on precedence

  MorePrecedence = Token.Grammar.Relationship.MorePrecedence
  SamePrecedence = Token.Grammar.Relationship.SamePrecedence
  LessPrecedence = Token.Grammar.Relationship.LessPrecedence

  def __init__(self):
    r"""Initialize with empty precendence table and skeletal grammar.
    self.start_state = None
    self.precedence = {}
    self.productions_tree = {}

  def parse(self, stream, on_reduce, start_state=None):
    r"""Parse a token `stream` of tokens.

    :param on_reduce:   Optional Callable object invoked when a reduction is 
                        perfomed according to some grammar production.
                        It receives production ID as first parameter.
                        Subsequent positional arguments will be the
                        values matched for each symbol in target 
                        production. It may return a value. It will be
                        propagated in the form of a `NonTerminal` token 
                        and will be available in subsequent reductions 
                        involving that token.
    :param start_state: The name of a non-terminal symbol. This parameter 
                        allows for parsing a subset of the target grammar.
                        If missing parsing will start with topmost
                        grammar state.
    :return:            the last object returned by on_reduce callback
    :raise SyntaxError: if invalid syntax is detected
    :raise InvalidParserConfiguration: if no suitable start state is found
    if start_state is None:
      start_state = self.start_state
    if start_state is None:
      raise InvalidParserConfiguration(self, "Missing start state")
    if on_reduce is None:
      on_reduce = lambda *args: None
    # Auxiliary variables
    SHIFT_PREC = (self.SamePrecedence, self.LessPrecedence)
    REDUCE_PREC = (self.MorePrecedence,)
    # Initial state
    pushdown_list = [(EndMarker, start_state)]
    last_tkn, last_val = EndMarker, start_state
    tkn = None        # Force reading next char from stream
    while True:
      is_last_nt = last_tkn is NonTerminal
      if is_last_nt:
        last_tkn, last_val = pushdown_list[-2]
      if tkn is None:
          tkn, val = stream.next()
        except StopIteration:
          tkn, val = (EndMarker, start_state)
      if is_last_nt and last_tkn is EndMarker and tkn is EndMarker:
        # Accept token stream !!!
        return pushdown_list[-1][1]
      candidates = ( ((last_tkn, lv) , (tkn, v)) \
          for lv in (last_val, Any) for v in (val, Any) )
        precedence = ifilter(None, 
            (self.precedence.get(c) for c in candidates)).next()
      except StopIteration:
        if tkn is EndMarker:
          raise SyntaxError(self, "Unexpected EOL")
          raise SyntaxError(self, "Unexpected token " + val)
        logging.debug('Precedence %s,%s |--> %s,%s: %s', 
            last_tkn, last_val, tkn, val, precedence)
      if precedence in SHIFT_PREC:
        # Shift
        pushdown_list.append((tkn, val))
        last_tkn, last_val = tkn, val
        tkn = None        # Force reading next char from stream
      elif precedence in REDUCE_PREC:
          prod_id, args = self._match_production(pushdown_list)
        except LookupError:
          raise SyntaxError('Unexpected token ' + last_val)
        pushdown_list.append((NonTerminal, on_reduce(prod_id, *args)))
        last_tkn, last_val = pushdown_list[-1]
        raise InvalidParserConfiguration(self, "Invalid precedence " + 

  def _match_token(self, reftkn, tkn):
    r"""Match a token `tkn` against expected token `reftkn`.
    if reftkn is Any:
      return True
      _reftype, _refval = reftkn
      _type, _val = tkn
      return (_reftype is Any or _reftype is _type) and \
          (_refval is Any or _refval == _val)

  def _match_production(self, pushdown_list):
    r"""Match production on reduce
    idx = 0
    choices = self.productions_tree
    while EndMarker not in choices:
      idx -= 1
      last_tkn, last_val = pushdown_list[idx]
      logging.debug('Last token %s : %s', last_tkn, last_val)
      choices = choices.get((last_tkn, last_val)) or choices.get((last_tkn, Any))
      if choices is None:
        raise LookupError("Could not match grammar against pushdown list")
    if EndMarker in choices:
      args = pushdown_list[idx:]
      for _ in xrange(idx, 0):
      return choices[EndMarker], args
      raise InvalidParserConfiguration(self, "Expected production ID. Not found.")

#   Helper functions and objects

# Intermediate tokens representing non-terminal grammar symbols
NonTerminal = Token.Grammar.NonTerminal
# Wildchar token used in token matching.
Any         = Token.Grammar.Any
# Token used to delimit segments in a token stream
EndMarker   = Token.Grammar.EndMarker

def gen_precedence(pseudo_precedence):
  r"""Generates a precedence table from a more readable representation 
  of the form 

      SrcToken : {
          Token.Grammar.Relationship.MorePrecedence : { DstToken1, ... }
          Token.Grammar.Relationship.SamePrecedence : { DstToken2, ... }
          Token.Grammar.Relationship.LessPrecedence : { DstToken3, ... }
  return dict(
    [ (tkn1, tkn2), prec] \
        for tkn1, v1 in pseudo_precedence.iteritems() \
        for prec, v2 in v1.iteritems() \
        for tkn2 in v2

#   Exception classes

class ParserError(RuntimeError):
  r"""Error condition detected at parsing time.
  def __init__(self, parser, *args):
    r"""Initialize ecception object with parser and arguments
    self.parser = parser
    RuntimeError.__init__(self, *args)

  def __unicode__(self):
    return "Parser %s failed: %" % (self.parser.__class__.__name__,

class InvalidParserConfiguration(ParserError):
  r"""Wrong parser configuration detected.

#   Global Testing

from tracgviz.testing.test_parsing import __test__, SAMPLE_GRAMMAR_PRECEDENCE, \

def test_suite():
  from unittest import defaultTestLoader
  from string import whitespace
  import sys

  from tracgviz.api import GVizInvalidQuery
  from tracgviz.testing.dutest import MultiTestLoader, DocTestLoader

  # logging.basicConfig(level=logging.DEBUG)

  def parse(expr, *attrs, **kwds):
    # Test lexical analysis
    print "*****\n* Tokens\n*****"
    newline = False
    p = GVizQLParser()
    p.noisy = False
    for tkn, val in p.get_tokens(expr):
      if tkn is not Whitespace:
        if newline:
          newline = True
        print tkn, val,
    if not (tkn is Whitespace and val == '\n'):  # Check for EOL token.
      raise AssertionError('Expected : %s , %r \n\nGot : %s , %r' % \
                                  (Whitespace, u'\n', tkn, val))
    if attrs :
      # Test parsing and compilation
      print "*****\n* Parsing\n*****",
      from api import GVizUnsupportedQueryOp
        expr = p.parse(expr)
      except GVizUnsupportedQueryOp :
        print '\nNotSupported  :(',
      except GVizInvalidQuery, exc :
        print exc.message
        for attrnm in attrs :
          print getattr(expr, attrnm),
        # Test evaluation and transformations on result sets
        print "*****\n* Result\n*****",
          schema, data = expr.transform(TEST_DATA_GVIZSCHEMA, \
        except Exception, exc:
          print exc.__class__.__name__, ' : ', exc.message,
          print "= Columns =",
          if isinstance(schema, dict):
            def iter_schema():
              return ((k,) + v for k, v in schema.iteritems())
            iter_schema = lambda : schema
          for col in iter_schema() :
            print col[0], col[1],
            try :
              print col[2],
            except IndexError :
          for row in data :
            print "= Row ="
            for val, col in izip(row, iter_schema()):
              print '  ', col[0], '=', val

  l = MultiTestLoader([defaultTestLoader, \
                            optionflags=ELLIPSIS \
                                        | NORMALIZE_WHITESPACE \
                                        | REPORT_UDIFF \
  return l.loadTestsFromModule(sys.modules[__name__])