Kiva Editor's Assistant / tokensearch.py

#!/usr/bin/env python
import re
from collections import namedtuple
import logging

from keatoken import Token

OpCode = namedtuple('OpCode', ['opcode', 'str', 'row', 'col'])


class ScoreHist(object):
    def __init__(self):
        self.score = ' '
        self.hist = []


class TokenSearch(object):
    """Contains a list of expected attribute/value pairs and optional
    comparison functions.
    """

    def __init__(self, value, attr_name='str', compare_func=None):
        """Store arguments.

        Arguments:
        attr_name -- name of token attribute to check

        value -- value required for a match

        compare_func -- if not None, called with attr_name and value,
        otherwise a string compare is performed
        """
        self._attr_name = attr_name
        self._value = value
        self._compare_func = compare_func

    def search(self, token):
        """Compare token against the criteria stored in this object.
        """
        if self._compare_func:
            return self._compare_func(token, self._attr_name, self._value)
        return token.getattr(self._attr_name) == self._value


class TokenSearchByRegexp(object):
    def __init__(self, regexp, replace, attr_name='str'):
        if isinstance(regexp, basestring):
            regex_list = map(lambda x: x + '$', regexp.split())
            self._regexp = map(re.compile, regex_list)
        else:
            self._regexp = map(re.compile, regexp)
        if isinstance(replace, basestring):
            self._replace = replace.split()
        else:
            self._replace = replace
        self._attr_name = attr_name

    def _instantiate_replacements(self, match_groups):
        inst_rep = []
        for rep in self._replace:
            # first, find out if there's a replacement escape sequence
            # in the replacement string (\1, \2, etc.)
            while True:
                mo = re.search(r'\\([0-9])', rep)
                if not mo:
                    inst_rep.append(rep)
                    break
                # there is; find out which group is supposed to replace
                # it
                index = int(mo.group(1)) - 1
                # substitute the appropriate match group for all
                # occurrences of the escape sequence
                rep = rep.replace(mo.group(0), match_groups[index])
                # loop around to replace any further escapes
        return inst_rep

    def _modify_tokens(self, caller, source_tokens, source_pos, source_len,
                       target_strings):
        source_slice = source_tokens[source_pos:(source_pos + source_len)]
        operations = get_levenshtein_dist(source_slice, target_strings)

        # Now d[num_source_tokens][num_target_strings].hist contains a list of
        # operations to perform to make source_tokens look like
        # target_strings.
        source_idx = source_pos
        for op in operations:
            # opcode 'c' (copy) is a no-op
            if op.opcode == 'd':
                deleted = source_tokens.pop(source_idx)
                logging.debug(u'{} deleted {}'.format(caller, deleted.str))
            elif op.opcode == 'i':
                token_to_insert = Token(op.str)
                source_tokens.insert(source_idx, token_to_insert)
                logging.debug(u'{} inserted {}'.format(caller, str(token_to_insert)))
            elif op.opcode == 's':
                old_value = repr(source_tokens[source_idx])
                source_tokens[source_idx].str = op.str
                logging.debug(u'{} changed {} to {}'.format(caller,
                        old_value, source_tokens[source_idx]))

            if op.opcode != 'd':
                source_idx += 1
        return source_tokens

    def apply(self, caller, tokens):
        changed = False
        search_expr_idx = 0
        match_groups = []
        for i, token in enumerate(tokens):
            mo = self._regexp[search_expr_idx].match(token.str)
            if not mo:
                search_expr_idx = 0
                match_groups = []
                continue
            # Save the Match groups, if non-empty; flatten the tuples
            # into a single list so they can be referenced by the
            # replacement escapes \1, \2, etc.
            if mo.groups():
                match_groups += list(mo.groups())
            # Move on to the next element of self._regexp; if we've
            # successfully matched the entire length of the self._regexp
            # list, then apply the replacement to the source tokens.
            search_expr_idx += 1
            if search_expr_idx == len(self._regexp):
                self._modify_tokens(
                    caller, tokens, i - (len(self._regexp) - 1),
                    len(self._regexp),
                    self._instantiate_replacements(match_groups))
                changed = True
                break
        return changed


def get_levenshtein_dist(source_tokens, target_strings):
    """Return a minimal list of operations required to transform the
    source tokens into the target tokens.
    """
    # Compute the Levenshtein Distance between source and target token
    # lists. (See http://en.wikipedia.org/wiki/Levenshtein_distance for
    # an explanation and http://www.merriampark.com/ld.htm for public-
    # domain implementations.)
    #
    # This results in a set of instructions for transforming the source
    # tokens into the target tokens using the minimum number of copy,
    # delete, insert, and change operations.
    #

    num_source_tokens = len(source_tokens)
    num_target_strings = len(target_strings)
    d = [[ScoreHist() for col in range(num_source_tokens + 1)]
         for row in range(num_target_strings + 1)]

    for row in range(num_target_strings + 1):
        # init column 0 with the distance of any target string to an
        # empty source string
        d[row][0].score = row
        if row > 0:
            d[row][0].hist = [OpCode('i', target_strings[row - 1], row, col)]
    for col in range(num_source_tokens + 1):
        # init row 0 with the distance of any second string to an empty
        # first string
        d[0][col].score = col
        if col > 0:
            d[0][col].hist = [OpCode('d', source_tokens[col - 1], row, col)]

    for col in range(1, num_source_tokens + 1):
        for row in range(1, num_target_strings + 1):
            if source_tokens[col - 1].str == target_strings[row - 1]:
                d[row][col].score = d[row - 1][col - 1].score
                d[row][col].hist = d[row - 1][col - 1].hist + [
                    OpCode('c', target_strings[row - 1], row, col)]
            else:
                del_cost = d[row][col - 1].score + 1
                ins_cost = d[row - 1][col].score + 1
                sub_cost = d[row - 1][col - 1].score + 1

                d[row][col].score = min(del_cost, ins_cost, sub_cost)

                if row > col:
                    if sub_cost < ins_cost:
                        d[row][col].hist = d[row - 1][col - 1].hist + [
                            OpCode('s', target_strings[row - 1], row, col)]
                    else:
                        d[row][col].hist = d[row - 1][col].hist + [
                            OpCode('i', target_strings[row - 1], row, col)]
                elif row < col:
                    if sub_cost < del_cost:
                        d[row][col].hist = d[row - 1][col - 1].hist + [
                            OpCode('s', target_strings[row - 1], row, col)]
                    else:
                        d[row][col].hist = d[row][col - 1].hist + [
                            OpCode('d', source_tokens[col - 1], row, col)]
                else:  # on the diagonal
                    if d[row][col].score == sub_cost:
                        d[row][col].hist = d[row - 1][col - 1].hist + [
                        OpCode('s', target_strings[row - 1], row, col)]
                    elif d[row][col].score == ins_cost:
                        d[row][col].hist = d[row - 1][col].hist + [
                        OpCode('i', target_strings[row - 1], row, col)]
                    else:
                        d[row][col].hist = d[row][col - 1].hist + [
                        OpCode('d', source_tokens[col - 1], row, col)]
    return d[num_target_strings][num_source_tokens].hist


if __name__ == '__main__':
    from keatoken import Token
    ts = TokenSearchByRegexp(u'with (.+) children and (.+)',
                             ur'and has \1 children, \2 of whom go to school')
    tokens = map(Token, 'with 4 children and 2'.split())
    ts.apply(tokens)
    print tokens
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.