Source

fsl / fsl / Interpreter.py

Full commit
"""
The FSL interpreter, which scans the file system and
produces a set of files.

The interpreter is normally invoked with the functions
`interpret` and `interpret_string`; making an explicit
instance of the class `Interpreter` is usually not
necessary.

The interpreter has an extension mechanism called hook
handlers. A hook handler (see `HookHandler`) defines
callback functions that the interpreter calls on
appropriate times. Hook handlers receive real-time
information on interpretation and file-system scanning.
Hook handlers are passed as parameter to the methods
mentioned above.

About exceptions: the lexer, parser and interpreter
raise various exceptions on errors. The superclass for
FSL exceptions is `fsl.globals.FSLError`.
`fsl.globals.FSLParseError` is raised on parse errors,
e.g. invalid tokens or incorrect source structure.
`fsl.globals.FSLSyntaxError` is raised on syntax errors
related to semantics, e.g. type errors.
Finally, `fsl.globals.FSLRuntimeError` is raised
on errors that occur during interpretation.
"""

import fnmatch
import glob
import os
import sys

import AST
import ASTChecker
import ASTLinearizer
import ASTPrinter
import ExpressionEval
import File
import fslparser
import globals
import HookHandler
import utils

_DEBUG = False

# Hooks:
# include file (filename, rule)
# exclude file (filename, rule)
# start of directory scan (dirname, glob_pattern)
# end of directory scan (dirname)

class Interpreter:
    def __init__(self, ast):
        self.ast = ast
        ast.visit(ASTChecker.ASTChecker())

        self._rulelist = ast.visit(ASTLinearizer.ASTLinearizer())
        self._filedict = {}         # {filename: File}
        self._cur_rule_index = None  # index to self._rulelist

        self._hook_handlers = []

    def run(self):
        self._filedict = {}
        self._cur_rule_index = 0

        if self._rulelist and not self._rulelist[0].inclusive:
            print >>sys.stderr, 'Warning: exclusive rule at beginning - has no effect'

        self._check_dir_permissions(self._rulelist)
        self._convert_directory_globs(self._rulelist)

        for i, rule in enumerate(self._rulelist):
            self._cur_rule_index = i
            if not rule.inclusive:
                continue

            if isinstance(rule, AST.GloblistRule):
                self._interpret_globlist(rule)
            else:
                self._interpret_foreach(rule)

        return self._filedict.values()

    def _convert_directory_globs(self, rulelist):
        for rule in rulelist:
#            if (not rule.inclusive) and isinstance(rule, AST.GloblistRule):
                # Don't convert exclusive glob list rules
#                continue
            for glob in rule.glob_patterns:
                if os.path.isdir(glob.fullglob):
                    old_fullglob = glob.fullglob
                    if glob.fullglob.endswith('/'):
                        glob.fullglob += '*'
                    elif not glob.fullglob.endswith('*'):
                        glob.fullglob += '/*'
#                    print >>sys.stderr, 'Converted glob pattern "%s" to "%s"' % (old_fullglob, glob.fullglob)

    def _check_dir_permissions(self, rulelist):
        prev_cwd = os.getcwd()

        for i, rule in enumerate(rulelist):
            if not os.path.exists(rule.rootdir) or not rule.inclusive:
                continue
            for glob_pattern in rule.glob_patterns:
                if os.path.isdir(glob_pattern.innerdir):
                    if self._has_exclusion_rule(glob_pattern.innerdir, i):
                        # The innerdir has exclusion rule, so no need to
                        # check permissions.
                        continue

                    if not os.access(glob_pattern.innerdir, os.R_OK):
                        # This doesn't work on Windows properly.
                        os.chdir(prev_cwd)
                        raise globals.FSLRuntimeError, 'No read permissions for directory "%s"' % glob_pattern.innerdir

                    try:
                        os.chdir(glob_pattern.innerdir)
                    except OSError:
                        os.chdir(prev_cwd)
                        raise globals.FSLRuntimeError, 'No chdir permissions for directory "%s"' % glob_pattern.innerdir

        os.chdir(prev_cwd)

    def _interpret_globlist(self, globlist_rule):
        if globlist_rule.if_expr and not self._expr_true(globlist_rule.if_expr):
            # IF-clause is false.
            return

        for glob_pattern in globlist_rule.glob_patterns:
            self._scan_glob(glob_pattern, glob_pattern.innerdir, None, None)

    def _interpret_foreach(self, foreach_rule):
        for glob_pattern in foreach_rule.glob_patterns:
            self._scan_glob(glob_pattern, glob_pattern.innerdir, foreach_rule.if_expr, foreach_rule.variable_expr)

    def _scan_glob(self, glob_pattern_node, scan_directory, if_expr, variable_expr):
        if not os.path.exists(scan_directory):
            if _DEBUG:
                print 'Skipping directory %s (not found)' % scan_directory
            return  # Early exit, directory not found

        if self._has_exclusion_rule(scan_directory, self._cur_rule_index):
            return  # Early exit, directory excluded

        if os.path.isfile(scan_directory):
            if self._file_is_includable(scan_directory, if_expr, variable_expr):
                self._add_file(scan_directory, glob_pattern_node.parent)
            return  # Early exit, directory was an actual file

        self._notify_start_scan(scan_directory, glob_pattern_node.fullglob)
        files_count = 0
        bytes_count = 0

        try:
            files = os.listdir(scan_directory)
        except (IOError, WindowsError):
            raise globals.FSLRuntimeError, "Can't get directory listing of '%s'" % scan_directory
        files.sort()

        for filename in files:
            fullpath = utils.pathjoin(scan_directory, filename)

            if os.path.isdir(fullpath) and glob_pattern_node.recursive: # Directory
                newdir = utils.pathjoin(scan_directory, filename)
                assert os.path.exists(newdir)
                if not self._has_exclusion_rule(newdir, self._cur_rule_index):
                    rec_files, rec_bytes = self._scan_glob(glob_pattern_node, newdir, if_expr, variable_expr)
                    files_count += rec_files
                    bytes_count += rec_bytes

            elif os.path.isfile(fullpath): # File
                if self._glob_matches(fullpath, glob_pattern_node):
                    if self._file_is_includable(fullpath, if_expr, variable_expr):
                        fileobj = self._add_file(fullpath, glob_pattern_node.parent)
                        if fileobj is not None:
                            files_count += 1
                            bytes_count += fileobj.get_size()

        self._notify_end_scan(scan_directory, files_count, bytes_count)
        return files_count, bytes_count

    def _add_file(self, filename, rule):
        assert isinstance(rule, AST.Rule)
        if filename not in self._filedict:
            fil = File.File(filename)
            self._filedict[filename] = (fil)
            self._notify_include_file(fil, rule)
            return fil
        else:
            return None

    def _file_is_includable(self, filename, if_expr, variable_expr):
        """Return true if the IF-expression is true (if any) and
        there is no exclusion rule."""

        if if_expr is not None:
            variable_expr.variable_obj.set_value(filename)
            matched = self._expr_true(if_expr)
            variable_expr.variable_obj.set_value(None)
            if not matched:
                # If-expression didn't match.
                return False
        return not self._has_exclusion_rule(filename, self._cur_rule_index)

    def _has_exclusion_rule(self, filename, cur_rule_index):
        """Return whether the filename has a matching exclusion
        rule later in the rulelist.
        
        :param filename: The filename to be tested
        :param cur_rule_index: Index of the rule that produced
          `filename`. Checking starts from `cur_rule_index`+1.
        """

        for rule in self._rulelist[cur_rule_index+1:]:
            if rule.inclusive:
                continue

            if isinstance(rule, AST.GloblistRule):
                match = self._check_globlist_exclusion(rule, filename)
            elif isinstance(rule, AST.ForEachRule):
                match = self._check_eachrule_exclusion(rule, filename)
            else:
                raise InternalError

            if match:
                return True  # Early exit

        return False

    def _check_globlist_exclusion(self, rule, filename):
        for glob_node in rule.glob_patterns:
            if self._glob_matches(filename, glob_node):
                return True
        return False

    def _check_eachrule_exclusion(self, rule, filename):
        if os.path.isdir(filename):
            return False

        for glob_node in rule.glob_patterns:
            if self._glob_matches(filename, glob_node):
                rule.variable_expr.variable_obj.set_value(filename)
                matched = self._expr_true(rule.if_expr)
                rule.variable_expr.variable_obj.set_value(None)
                if matched:
                    return True

        return False

    def _expr_true(self, if_expr):
        return if_expr.visit(ExpressionEval.ExpressionEval())

    def _glob_matches(self, filename, globrule_node):
        if globrule_node.fullglob.endswith('/*'):
            # Match also "xyz/*" to "xyz"
            pattern2 = globrule_node.fullglob[:-2].lower()
            if pattern2 == filename.lower():
                return True

        match = fnmatch.fnmatch(filename, globrule_node.fullglob)
        if match and not globrule_node.recursive:
            dirname = os.path.dirname(filename).replace('\\', '/')
            return dirname == globrule_node.innerdir or filename == globrule_node.innerdir
        else:
            return match

    ### Hooks ##########################################################

    def add_hook_handler(self, handler):
        if not isinstance(handler, HookHandler.HookHandler):
            raise TypeError, 'handler must be instance of HookHandler'
        self._hook_handlers.append(handler)

    def _notify_include_file(self, fileobj, rule):
        for handler in self._hook_handlers:
            handler.include_file(fileobj, rule)

    def _notify_exclude_file(self, filename, rule):
        fileobj = File.File(filename)
        for handler in self._hook_handlers:
            handler.exclude_file(fileobj, rule)

    def _notify_start_scan(self, dirname, glob_pattern):
        fileobj = File.File(dirname)
        for handler in self._hook_handlers:
            handler.start_scan(fileobj, glob_pattern)

    def _notify_end_scan(self, dirname, files_count, bytes_count):
        fileobj = File.File(dirname)
        for handler in self._hook_handlers:
            handler.end_scan(fileobj, files_count, bytes_count)


def interpret(source_filenames, rootdir, hook_handlers=[]):
    """Interpret source files, return resulting file-set as a
    list of `File.File` instances.
    If multiple files are given as parameter, they are combined in a
    cascading manner.

    :param source_filenames: List of source filenames, or a
      single filename.
    :param rootdir: The root directory for top-level relative
      filenames.
    :param hook_handlers: List of `HookHandler.HookHandler`
      instances. The handler callback functions are called
      by the interpreter.
    """

    if isinstance(hook_handlers, HookHandler.HookHandler):
        hook_handlers = [hook_handlers]
    ast = fslparser.parse(source_filenames, rootdir)
    interpreter = Interpreter(ast)
    for handler in hook_handlers:
        interpreter.add_hook_handler(handler)
    return interpreter.run()

def interpret_string(source_string, rootdir, hook_handlers=[]):
    """Interpret a source string, return resulting file-set as a
    list of `File.File` instances.

    :param source_string: FSL program as a string. The string
      may contain newlines.
    :param rootdir: The root directory for top-level relative
      filenames.
    :param hook_handlers: List of `HookHandler.HookHandler`
      instances. The handler callback functions are called
      by the interpreter.
    """

    if isinstance(hook_handlers, HookHandler.HookHandler):
        hook_handlers = [hook_handlers]
    ast = fslparser.parse_string(source_string, rootdir)
    interpreter = Interpreter(ast)
    for handler in hook_handlers:
        interpreter.add_hook_handler(handler)
    return interpreter.run()