Commits

david_walker committed 4342d54

in-development code to search and replace tokens

  • Participants
  • Parent commits 0125f30
  • Branches parse

Comments (0)

Files changed (1)

File tokensearch.py

+#!/usr/bin/env python
+
+from collections import namedtuple
+import sys
+
+OpCode = namedtuple('OpCode', ['opcode', 'token'])
+
+
+class ScoreHist(object):
+    def __init__(self):
+        self.score = ' '
+        self.hist = []
+
+
+class TokenSearch(object):
+    """Contains a list of expected attribute/value pairs and optional
+    comparison functions.
+    """
+
+    def __init__(self, value, attr_name='str', compare_func=None):
+        """Store arguments.
+
+        Arguments:
+        attr_name -- name of token attribute to check
+
+        value -- value required for a match
+
+        compare_func -- if not None, called with attr_name and value,
+        otherwise a string compare is performed
+        """
+        self._attr_name = attr_name
+        self._value = value
+        self._compare_func = compare_func
+
+    def match(self, token):
+        """Compare token against the criteria stored in this object.
+        """
+        if self._compare_func:
+            return self._compare_func(token, self._attr_name, self._value)
+        return token.getattr(self._attr_name) == self._value
+
+
+class TokenSearchByRegexp(object):
+    def __init__(self, regexp, attr_name='str'):
+        self._attr_name = attr_name
+        self._regexp = regexp
+
+    def match(self, token):
+        return self._regexp.match(token.getattr(self._attr_name))
+
+
+class TokenSearchReplace(object):
+    def __init__(self, search_exprs, replacement):
+        self._search_exprs = search_exprs
+        self._replacement = replacement
+        self._match_start_idx = None
+
+    def search(self, tokens):
+        self._searched_tokens = tokens
+        search_expr_idx = 0
+        for token in tokens:
+            if self._search_exprs[search_expr_idx].match(token):
+                search_expr_idx += 1
+                if search_expr_idx == len(self._search_exprs) - 1:
+                    self._match_start_idx = tokens.index(token)
+                    return True
+            else:
+                search_expr_idx = 0
+        return False
+
+    def replace():
+        pass
+
+
+def deploy():
+    tks = map(TokenSearch, u'with (.+) children and (.+) of them go to school'.split())
+    # convert the two plain TokenSearch objects into regular
+    # expression token search objects.
+    tks[1] = TokenSearchByRegexp(tks[1].value)
+    tks[4] = TokenSearchByRegexp(tks[4].value)
+
+    replacement = ur'and has \1 children, \2 of whom go to school'.split()
+    tksl = TokenSearchList(tks, replacement)
+
+
+def wc(s):
+    sys.stdout.write(' ' + str(s) + ' |')
+
+
+def print_table(caption, source, target, d):
+    print caption
+    # print source string as table header
+    sys.stdout.write('   |   |')
+    for s in source:
+        wc(s)
+    sys.stdout.write('\n')
+    # print table contents prefixed with target string
+    for row in range(len(target) + 1):
+        for col in range(len(source) + 2):
+            if col == 0:
+                if row == 0:
+                    wc(' ')
+                else:
+                    wc(target[row - 1])
+            else:
+                wc(d[row][col - 1].score)
+        sys.stdout.write('\n')
+    sys.stdout.write('\n\n')
+
+
+def get_levenshtein_dist(source_tokens, target_tokens):
+    """Change the source_tokens to match the target_tokens, preserving as
+    much of the source token data as possible.
+    """
+    # Compute the Levenshtein Distance between source and new token
+    # lists. (See http://en.wikipedia.org/wiki/Levenshtein_distance)
+    #
+    #
+    #
+    # For all i and j, d[i,j].score will hold the Levenshtein distance
+    # between the first i elements of source_tokens and the first j elements of
+    # target_tokens.
+
+    num_source_tokens = len(source_tokens)
+    num_target_tokens = len(target_tokens)
+    d = [[ScoreHist() for col in range(num_source_tokens + 1)]
+         for row in range(num_target_tokens + 1)]
+
+    for row in range(num_target_tokens + 1):
+        # init column 0 with the distance of any target string to an
+        # empty source string
+        d[row][0].score = row
+        if row > 0:
+            d[row][0].hist = [OpCode('i', target_tokens[row - 1])]
+    for col in range(num_source_tokens + 1):
+        # init row 0 with the distance of any second string to an empty
+        # first string
+        d[0][col].score = col
+        if col > 0:
+            d[0][col].hist = [OpCode('d', source_tokens[col - 1])]
+
+    for col in range(1, num_source_tokens + 1):
+        for row in range(1, num_target_tokens + 1):
+            if source_tokens[col - 1] == target_tokens[row - 1]:
+                d[row][col].score = d[row - 1][col - 1].score
+                d[row][col].hist = d[row - 1][col - 1].hist + [
+                    OpCode('c', target_tokens[row - 1])]
+            else:
+                #print 'row', row, 'col', col
+                del_cost = d[row - 1][col].score + 1
+                ins_cost = d[row][col - 1].score + 1
+                sub_cost = d[row - 1][col - 1].score + 1
+
+                d[row][col].score = min(del_cost, ins_cost, sub_cost)
+
+                # if row > column then insert is required
+                # if row < column then delete is required
+                # if row == column then sub is required
+
+                if row > col:
+                    assert(d[row][col].score == ins_cost)
+                    d[row][col].hist = d[row][col - 1].hist + [
+                        OpCode('i', target_tokens[row - 1])]
+                elif row < col:
+                    assert(d[row][col].score == del_cost)
+                    d[row][col].hist = d[row - 1][col].hist + [
+                        OpCode('d', source_tokens[col - 1])]
+                else:
+                    assert(d[row][col].score == sub_cost)
+                    d[row][col].hist = d[row - 1][col - 1].hist + [
+                        OpCode('s', target_tokens[row - 1])]
+    print_table('', source_tokens, target_tokens, d)
+    return d[num_target_tokens][num_source_tokens].hist
+
+
+def modify_tokens(source_tokens, target_tokens):
+    operations = get_levenshtein_dist(source_tokens, target_tokens)
+
+    # Now d[num_source_tokens][num_target_tokens].hist contains a list of
+    # operations to perform to make source_tokens look like
+    # target_tokens.
+    source_idx = 0
+    for op in operations:
+        if op.opcode == 'c':
+            print 'copy', op.token
+            source_idx += 1
+        elif op.opcode == 'd':
+            print 'delete', source_tokens[source_idx]
+            source_tokens.pop(source_idx)
+        elif op.opcode == 'i':
+            print 'insert', op.token, 'before index', source_idx
+            source_tokens.insert(source_idx, op.token)
+        elif op.opcode == 's':
+            print 'sub', op.token, 'for', source_tokens[source_idx]
+            source_tokens[source_idx] = op.token
+            source_idx += 1
+    print 'result:', source_tokens