Source

hg-multiundo / multiundo.py

Full commit
'''Support for multi-level undo and redo using file system snapshots'''

import os
import shutil

from mercurial import dispatch
from mercurial import extensions
from mercurial import scmutil
from mercurial import util

ignoredpaths = ['.hg/cache/']

class Undoer(object):
    '''Maintains the undo/redo state.'''

    def __init__(self, repo):
        self.repo = repo
        self.filecache = {}
        try:
            with open(self._listpath()) as fhandle:
                self.undolst = fhandle.read().splitlines()

            indexpath = self._indexpath()

            with open(indexpath) as fhandle:
                data = fhandle.read()
            index = self.index = int(data)

            firstindex = self._firstindex()
            lastindex = self._lastindex()
            if index + 1 < firstindex or lastindex < index:
                raise ValueError('invalid undo index found in %s' % (indexpath))

        except (IOError, ValueError):
            self.undolst = []
            self.index = -1

    def _firstindex(self):
        '''gets the index of the first stored undo, or -1'''
        return int(self.undolst[0].split()[0]) if self.undolst else -1

    def _lastindex(self):
        '''gets the index of the last stored undo, or -1'''
        return int(self.undolst[-1].split()[0]) if self.undolst else -1

    def _listpath(self):
        '''gets the path to the multiundo list file'''
        return self.repo.join(os.path.join('multiundo', 'multiundo.lst'))

    def _indexpath(self):
        '''gets the path to the index list file'''
        return self.repo.join(os.path.join('multiundo', 'multiundo.idx'))

    def _undoroot(self, idx):
        '''gets the undo root inside .hg for the given undo index'''
        return self.repo.join(os.path.join('multiundo', 'undo', str(idx)))

    def _redoroot(self, idx):
        '''gets the redo root inside .hg for the given redo index'''
        return self.repo.join(os.path.join('multiundo', 'redo', str(idx)))
    
    def _writeindex(self):
        '''writes the undo list to disk'''
        indexpath = self._indexpath()
        _safemakedirs(os.path.dirname(indexpath))
        with open(indexpath, 'w') as fhandle:
            fhandle.write(str(self.index) + '\n')

    def _writelist(self):
        '''writes the undo list to disk'''
        if not self.undolst:
            _deletePath(self._listpath())
        else:
            path = self._listpath()
            _safemakedirs(os.path.dirname(path))
            with open(path, 'w') as fhandle:
                fhandle.write('\n'.join(self.undolst))

    def _deletestale(self):
        for i in xrange(self.index + 1, self._lastindex() + 1):
            _deletePath(self._undoroot(i))
        _deletePath(self.repo.join(os.path.join('multiundo', 'redo')))
        if self.index != -1:
            self.undolst = self.undolst[:(self.index + 1 - self._firstindex())]
        else:
            self.undolst = []
    
    def _prune(self):
        keepCount = int(self.repo.ui.config(
            'multiundo', 'undocount', default=5))
        keepCount = max(keepCount, self._lastindex() - self.index)
        currentUndoCount = len(self.undolst)
        if currentUndoCount > keepCount:
            newFirst = self._lastindex() - keepCount
            for i in xrange(self._firstindex(), newFirst + 1):
                _deletePath(self._redoroot(i))
                _deletePath(self._undoroot(i))
            self.undolst = self.undolst[-keepCount:]
    
    def _addstate(self):
        '''adds a new undo state to the list of undos'''
        self.index += 1
        self.undolst.append(str(self.index).rjust(3) + ' ' + command + '\n')

    def addfile(self, base, fname, hardlink=False):
        global written

        source = os.path.join(base, fname)
        root = self.repo.root
        sourcebase = source[len(root) + 1:]

        if not os.path.exists(source):
            return

        if any([sourcebase.startswith(x) for x in ignoredpaths]):
            return

        if source in self.filecache:
            return

        self.filecache[source] = True

        if not written:
            written = True
            self._deletestale()
            self._addstate()
            self._prune()
            self._writeindex()
            self._writelist()

        undoRoot = self._undoroot(self.index)
        target = os.path.join(undoRoot, sourcebase)
        _safemakedirs(os.path.dirname(target))

        self.repo.ui.debug('  Backing up %s to %s\n' % (source, target))

        if hardlink:
            util.oslink(source, target)
        else:
            shutil.copyfile(source, target)

    def liststates(self, ui):
        ui.write('Available states:\n')
        if self.index == -1:
            ui.write('  (current)\n')
        if self.undolst:
            for entry in self.undolst:
                ui.write('  %s\n' % entry)
                if self.index == int(entry.split()[0]):
                    ui.write('  (current)\n')
        if self.index > self._lastindex():
            ui.write('  (current)\n')

    def _undo(self, ui):
        undoroot = self._undoroot(self.index)
        redoRoot = self._redoroot(self.index)
        _deletePath(redoRoot)
        _copytree(undoroot, self.repo.root, redoRoot)
        _copytree(undoroot, undoroot, self.repo.root)

    def _redo(self, ui):
        redoRoot = self._redoroot(self.index)
        _copytree(redoRoot, redoRoot, self.repo.root)

    def undo(self, ui):
        ui.write('Undoing state %d\n' % (self.index,))
        self._undo(ui)
        self.index -= 1
        self._writeindex()
        ui.write('Done\n')

    def redo(self, ui):
        self.index += 1
        ui.write('Redoing state %d\n' % (self.index,))
        self._redo(ui)
        self._writeindex()
        ui.write('Done\n')

    def indexes(self):
        return self._firstindex(), self._lastindex()

def _list(ui, repo):
    undoer.liststates(ui)

def _clear(ui, repo):
    _deletePath(repo.join('multiundo'))

def _deletePath(p):
    if os.path.exists(p):
        if os.path.isdir(p):
            shutil.rmtree(p, True, True)
        else:
            os.unlink(p)

def _safemakedirs(d):
    if not os.path.exists(d):
        os.makedirs(d)

def _copytree(namesRoot, sourceRoot, targetRoot):
    for root, d, files in os.walk(namesRoot):
        for f in files:
            name = os.path.join(root,f)
            source = os.path.join(sourceRoot, name[len(namesRoot)+1:])
            target = os.path.join(targetRoot, name[len(namesRoot)+1:])
            _copyfile(source, target)

def _copyfile(source, target):
    if not os.path.exists(os.path.dirname(target)):
        os.makedirs(os.path.dirname(target))
    shutil.copyfile(source, target)

def undo(ui, repo, *args, **opts):
    '''undoes the last change operation that was initiated by Mercurial'''

    global undoer
    if opts.get('list'):
        return _list(ui, repo)

    if opts.get('clear'):
        return _clear(ui, repo)

    firstindex, lastindex = undoer.indexes()
    if undoer.index < firstindex or undoer.index == -1:
        raise util.Abort('nothing to undo')

    u = undoer
    undoer = None
    u.undo(ui)

def redo(ui, repo, *args, **opts):
    '''redoes the last undone Mercurial change operation'''
    global undoer

    if opts.get('list'):
        return _list(ui, repo)

    if opts.get('clear'):
        return _clear(ui, repo)

    firstindex, lastindex = undoer.indexes()
    if undoer.index >= lastindex:
        raise util.Abort('nothing to redo')

    u = undoer
    undoer = None
    u.redo(ui)

command = None
written = False
undoer = None

def _isread(mode):
    '''detects whether the file open mode is a read'''
    return mode == 'r' or mode == 'rb'

def mycall(orig, self, path, mode='r', text=False, atomictemp=False):
    '''wrapper for the mercurial opener to insert a file in the undo store'''
    if not _isread(mode) and undoer is not None:
        undoer.addfile(self.base, path, hardlink=True)
    return orig(self, path, mode, text, atomictemp)

def myruncommand(orig, lui, repo, cmd, fullargs, *args, **kwargs):
    '''wrapper for mercurial's dispatcher to store the current command'''

    global undoer
    global command

    if repo is not None:
        command = ' '.join([str(p) for p in repo.parents()]) + ' '
        command += ' '.join(fullargs)
        undoer = Undoer(repo)

    v = orig(lui, repo, cmd, fullargs, *args, **kwargs)

    if repo is not None:
        command = None

    return v

def mywrite(orig, self, path, data):
    '''wraps whole-file writing to add file to undo store'''
    if undoer is not None:
        undoer.addfile(undoer.repo.root, path)
    return orig(self, path, data)

def myappend(orig, self, path, data):
    '''wraps whole-file appending to add file to undo store'''
    if undoer is not None:
        undoer.addfile(undoer.repo.root, path)
    return orig(self, path, data)

def extsetup(ui):
    '''extension setup - called from mercurial'''
    extensions.wrapfunction(dispatch, 'runcommand', myruncommand)
    extensions.wrapfunction(scmutil.opener, '__call__', mycall)
    extensions.wrapfunction(scmutil.abstractopener, 'write', mywrite)
    extensions.wrapfunction(scmutil.abstractopener, 'append', myappend)

cmdtable = {
        'undo': (undo, [
            ('l', 'list', False, 'list the undo states`'),
            ('C', 'clear', False, 'clear the undo states'),
            ], '[-l|-C]'),
        'redo': (redo, [
            ('l', 'list', False, 'list the redo states'),
            ('C', 'clear', False, 'clear the redo states')
            ], '[-l|-C]')
        }