Source

DiffDB / dictpatch.py

from pprint import pprint


class DictDiff(object):
    def __init__(self, d):
        self.orig = d


    def find_removals(self, old, new, paths=None):
        paths = paths or []
        for k, v in old.iteritems():
            path = []
            if not k in new:
                path.append(k)
            if isinstance(v, dict):
                self.find_removals(v, new[k], paths)
            if path:
                paths.append(path)
        return paths
        
    def diff(self, new):
        updates = {}

        # find our updates
        for k, v in new.iteritems():
            if not k in self.orig:
                updates[k] = new[k]
            elif v != self.orig[k]:
                updates[k] = new[k]

        removal = self.find_removals(self.orig, new)
        return {'+': updates, '-': removal}

        
class TestDictDiff(object):
    def test_find_updates(self):
        a = {'x': 1, 'y': 2}
        b = a.copy()
        b['z'] = 3
        
        dd = DictDiff(a)
        diff = dd.diff(b)
        assert diff
        
        pprint(diff)
        assert diff == {
            '+': {'z': 3},
            '-': []
        }    
        
    def test_find_updates_complex(self):
        a = {'x': 1, 'y': {'z': 4}}
        b = a.copy()
        b['y']['a'] = 3
        
        dd = DictDiff(a)
        diff = dd.diff(b)
        assert diff

    def test_find_removals(self):
        a = {'x': 1, 'y': 2}
        b = {'x': 1}
        dd = DictDiff(a)
        diff = dd.diff(b)
        pprint(diff)
        assert diff == {
            '+': {},
            '-': [['y']],
        }

    def test_find_removals_complex(self):
        a = {'x': 1, 'y': {'z': 3, 'a': 'hello world'}}
        b = {'x': 1, 'y': {'z': 3}}
        dd = DictDiff(a)
        diff = dd.diff(b)
        pprint(diff)
        assert diff == {
            '+': {'y': {'z': 3}},
            '-': [],
        }

    
class DictPatch(object):
    def __init__(self, json):
        self.add = json.get('+', {})
        self.remove = json.get('-', [])

    def update(self, d):
        return d.update(self.add)

    def filter(self, d):
        '''This does a remove then flatten operation on the dict.'''
        for path in self.remove:
            search = d
            while path:
                key = path.pop(0)
                if isinstance(search[key], dict):
                    search = search[key]
            search[key] = None
        return d

    def compact(self, d):
        new = {}
        for k, v in d.iteritems():
            if v != None:
                print k, v
                if isinstance(v, dict):
                    new[k] = self.compact(v)
                else:
                    new[k] = v
        return new
    
    def apply(self, d):
        if self.add:
            self.update(d)
        if self.remove:
            d = self.compact(self.filter(d))
        return d

    
class TestDictPatch(object):
    def test_update_new_info(self):
        d = {'x': 1}
        new = {'y': 2}
        dpatch = DictPatch({'+': new})
        patched = dpatch.apply(d)
        assert patched
        d.update(new)
        assert d == patched

    def test_update_remove_info_simple(self):
        d = {'x': 1, 'y': 2}
        p = DictPatch({'-': [['y']]})
        patched = p.apply(d)
        assert patched == {'x': 1}

    def test_update_remove_info_filter_step(self):
        d = {'x': 1, 'y': {'z': 'foo', 'a': 3}}
        p = DictPatch({'-': [['y', 'a']]})
        patched = p.filter(d)
        assert patched == {'x': 1, 'y': {'z': 'foo', 'a': None}}

    def test_update_remove_info_compact_step(self):
        d = {'x': 1, 'y': {'z': 'foo', 'a': None}}
        p = DictPatch({'-': [['y', 'a']]})
        patched = p.compact(d)
        assert patched == {'x': 1, 'y': {'z': 'foo'}}