Source

trac-gviz / trac-dev / gviz / tracgviz / util / parsing.py

Full commit
Olemis Lang f4e31f7 






















Olemis Lang 2f29abc 

Olemis Lang f4e31f7 














































































































































Olemis Lang 98c842c 



















Olemis Lang f4e31f7 












































































































#!/usr/bin/env python

# Copyright 2009-2011 Olemis Lang <olemis at gmail.com>
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

r"""Generic parsing algorithms.

Copyright 2009-2011 Olemis Lang <olemis at gmail.com>
Licensed under the Apache License, Version 2.0 
"""
__author__ = 'Olemis Lang'

__all__ = 'OperatorPrecedenceParser', 'Any', 'EndMarker', 'gen_precedence', \
    'NonTerminal'
__metaclass__ = type

from itertools import ifilter
import logging
from pygments.token import *

#------------------------------------------------------
#   Operator precedence grammars
#------------------------------------------------------

class OperatorPrecedenceParser:
  r"""
  Shift-reduce parser for operator precedence grammars based on precedence
  matrix.
  """

  MorePrecedence = Token.Grammar.Relationship.MorePrecedence
  SamePrecedence = Token.Grammar.Relationship.SamePrecedence
  LessPrecedence = Token.Grammar.Relationship.LessPrecedence

  def __init__(self):
    r"""Initialize with empty precendence table and skeletal grammar.
    """
    self.start_state = None
    self.precedence = {}
    self.productions_tree = {}

  def parse(self, stream, on_reduce, start_state=None):
    r"""Parse a token `stream` of tokens.

    :param on_reduce:   Optional Callable object invoked when a reduction is 
                        perfomed according to some grammar production.
                        It receives production ID as first parameter.
                        Subsequent positional arguments will be the
                        values matched for each symbol in target 
                        production. It may return a value. It will be
                        propagated in the form of a `NonTerminal` token 
                        and will be available in subsequent reductions 
                        involving that token.
    :param start_state: The name of a non-terminal symbol. This parameter 
                        allows for parsing a subset of the target grammar.
                        If missing parsing will start with topmost
                        grammar state.
    :return:            the last object returned by on_reduce callback
    :raise SyntaxError: if invalid syntax is detected
    :raise InvalidParserConfiguration: if no suitable start state is found
    """
    if start_state is None:
      start_state = self.start_state
    if start_state is None:
      raise InvalidParserConfiguration(self, "Missing start state")
    if on_reduce is None:
      on_reduce = lambda *args: None
    # Auxiliary variables
    SHIFT_PREC = (self.SamePrecedence, self.LessPrecedence)
    REDUCE_PREC = (self.MorePrecedence,)
    # Initial state
    pushdown_list = [(EndMarker, start_state)]
    last_tkn, last_val = EndMarker, start_state
    tkn = None        # Force reading next char from stream
    while True:
      is_last_nt = last_tkn is NonTerminal
      if is_last_nt:
        last_tkn, last_val = pushdown_list[-2]
      if tkn is None:
        try:
          tkn, val = stream.next()
        except StopIteration:
          tkn, val = (EndMarker, start_state)
      if is_last_nt and last_tkn is EndMarker and tkn is EndMarker:
        # Accept token stream !!!
        return pushdown_list[-1][1]
      candidates = ( ((last_tkn, lv) , (tkn, v)) \
          for lv in (last_val, Any) for v in (val, Any) )
      try:
        precedence = ifilter(None, 
            (self.precedence.get(c) for c in candidates)).next()
      except StopIteration:
        if tkn is EndMarker:
          raise SyntaxError(self, "Unexpected EOL")
        else:
          raise SyntaxError(self, "Unexpected token " + val)
      else:
        logging.debug('Precedence %s,%s |--> %s,%s: %s', 
            last_tkn, last_val, tkn, val, precedence)
      if precedence in SHIFT_PREC:
        # Shift
        pushdown_list.append((tkn, val))
        last_tkn, last_val = tkn, val
        tkn = None        # Force reading next char from stream
      elif precedence in REDUCE_PREC:
        try:
          prod_id, args = self._match_production(pushdown_list)
        except LookupError:
          raise SyntaxError('Unexpected token ' + last_val)
        pushdown_list.append((NonTerminal, on_reduce(prod_id, *args)))
        last_tkn, last_val = pushdown_list[-1]
      else:
        raise InvalidParserConfiguration(self, "Invalid precedence " + 
            str(precedence))

  def _match_token(self, reftkn, tkn):
    r"""Match a token `tkn` against expected token `reftkn`.
    """
    if reftkn is Any:
      return True
    else:
      _reftype, _refval = reftkn
      _type, _val = tkn
      return (_reftype is Any or _reftype is _type) and \
          (_refval is Any or _refval == _val)

  def _match_production(self, pushdown_list):
    r"""Match production on reduce
    """
    idx = 0
    choices = self.productions_tree
    while EndMarker not in choices:
      idx -= 1
      last_tkn, last_val = pushdown_list[idx]
      logging.debug('Last token %s : %s', last_tkn, last_val)
      choices = choices.get((last_tkn, last_val)) or choices.get((last_tkn, Any))
      if choices is None:
        raise LookupError("Could not match grammar against pushdown list")
    if EndMarker in choices:
      args = pushdown_list[idx:]
      for _ in xrange(idx, 0):
        pushdown_list.pop()
      return choices[EndMarker], args
    else:
      raise InvalidParserConfiguration(self, "Expected production ID. Not found.")

#------------------------------------------------------
#   Helper functions and objects
#------------------------------------------------------

# Intermediate tokens representing non-terminal grammar symbols
NonTerminal = Token.Grammar.NonTerminal
# Wildchar token used in token matching.
Any         = Token.Grammar.Any
# Token used to delimit segments in a token stream
EndMarker   = Token.Grammar.EndMarker

def gen_precedence(pseudo_precedence):
  r"""Generates a precedence table from a more readable representation 
  of the form 

  {
      SrcToken : {
          Token.Grammar.Relationship.MorePrecedence : { DstToken1, ... }
          Token.Grammar.Relationship.SamePrecedence : { DstToken2, ... }
          Token.Grammar.Relationship.LessPrecedence : { DstToken3, ... }
        },
      ...
  }
  """
  return dict(
    [ (tkn1, tkn2), prec] \
        for tkn1, v1 in pseudo_precedence.iteritems() \
        for prec, v2 in v1.iteritems() \
        for tkn2 in v2
  )

#------------------------------------------------------
#   Exception classes
#------------------------------------------------------

class ParserError(RuntimeError):
  r"""Error condition detected at parsing time.
  """
  def __init__(self, parser, *args):
    r"""Initialize ecception object with parser and arguments
    """
    self.parser = parser
    RuntimeError.__init__(self, *args)

  def __unicode__(self):
    return "Parser %s failed: %" % (self.parser.__class__.__name__,
        RuntimeError.__unicode__(self))

class InvalidParserConfiguration(ParserError):
  r"""Wrong parser configuration detected.
  """

#------------------------------------------------------
#   Global Testing
#------------------------------------------------------

from tracgviz.testing.test_parsing import __test__, SAMPLE_GRAMMAR_PRECEDENCE, \
    SAMPLE_GRAMMAR_PRODUCTIONS, SAMPLE_INPUT_STREAM

def test_suite():
  from doctest import ELLIPSIS, NORMALIZE_WHITESPACE, REPORT_UDIFF
  from unittest import defaultTestLoader
  from string import whitespace
  import sys

  from tracgviz.api import GVizInvalidQuery
  from tracgviz.testing.dutest import MultiTestLoader, DocTestLoader

  # logging.basicConfig(level=logging.DEBUG)

  def parse(expr, *attrs, **kwds):
    # Test lexical analysis
    print "*****\n* Tokens\n*****"
    newline = False
    p = GVizQLParser()
    p.noisy = False
    for tkn, val in p.get_tokens(expr):
      if tkn is not Whitespace:
        if newline:
          print
        else:
          newline = True
        print tkn, val,
    if not (tkn is Whitespace and val == '\n'):  # Check for EOL token.
      raise AssertionError('Expected : %s , %r \n\nGot : %s , %r' % \
                                  (Whitespace, u'\n', tkn, val))
    if attrs :
      # Test parsing and compilation
      print
      print "*****\n* Parsing\n*****",
      from api import GVizUnsupportedQueryOp
      try:
        expr = p.parse(expr)
      except GVizUnsupportedQueryOp :
        print '\nNotSupported  :(',
      except GVizInvalidQuery, exc :
        print 
        print exc.message
      else:
        for attrnm in attrs :
          print
          print getattr(expr, attrnm),
        # Test evaluation and transformations on result sets
        print
        print "*****\n* Result\n*****",
        try:
          schema, data = expr.transform(TEST_DATA_GVIZSCHEMA, \
                                        TEST_DATA_GVIZDATA)
        except Exception, exc:
          print
          print exc.__class__.__name__, ' : ', exc.message,
        else:
          print
          print "= Columns =",
          if isinstance(schema, dict):
            def iter_schema():
              return ((k,) + v for k, v in schema.iteritems())
          else:
            iter_schema = lambda : schema
          for col in iter_schema() :
            print
            print col[0], col[1],
            try :
              print col[2],
            except IndexError :
              pass
          print 
          for row in data :
            print "= Row ="
            for val, col in izip(row, iter_schema()):
              print '  ', col[0], '=', val

  l = MultiTestLoader([defaultTestLoader, \
                        DocTestLoader(
                            extraglobs=dict(parse=parse),
                            optionflags=ELLIPSIS \
                                        | NORMALIZE_WHITESPACE \
                                        | REPORT_UDIFF \
                          )])
  return l.loadTestsFromModule(sys.modules[__name__])