hg_adapter_changes / hg_adapter.py

#!/usr/bin/python
#
#  hg_adapter.py
#  Changes
#
#  Created by Ian Baird on 3/22/09.
#  Modified by Matthew Schinckel on 1/4/09.
#
#  Copyright (c) 2009 Skorpiostech, Inc. All rights reserved.
#

import getopt, sys, objc, xml.dom.minidom, datetime, calendar, os
from subprocess import PIPE, Popen, call
from Foundation import NSPropertyListSerialization, NSPropertyListXMLFormat_v1_0, NSDate

###############################################################################

help = """
--command=list_revs <file path>
    list revisions for a file at <file path>
    
--command=test_managed <file path>
    test a file at <file path> for management under this scm flavor
    
--command=export_rev --rev=<revision> --exporturl=<export url> <file path> 
    exports a file at <revision> and <file path> and <export url>

--flavor
    returns the scm flavor

--help
    display this help message

"""

def hgPath():
    if os.environ.has_key('CHNG_HG'):
        return os.environ['CHNG_HG']
    
    # search the usual spots
    hits = [path for path in [ '/usr/local/hg/bin/hg', '/opt/local/bin/hg', '/usr/local/bin/hg', '/usr/bin/hg', '/bin/hg' ] if os.path.exists(path)]
    
    if len(hits) > 0:
        return hits[0]
    
    return "hg"

def repositoryPath(arg0):
    # I've removed this call, since I can't really see that it is needed.
    return None
    while len(arg0) > 1:
        if os.path.isdir(arg0):
            testdir = os.path.join(arg0, '.hg')
            if os.path.isdir(testdir):
                # found it!
                return arg0
        arg0 = os.path.dirname(arg0)
    

def getText(nodelist):
    """ utility method for the dom stuff
    """
    rc = ""
    for node in nodelist:
        if node.nodeType == node.TEXT_NODE:
            rc = rc + node.data
    return rc

def list_revs(arg0):
    """ prints out a plist with the revisions for arg0
    """
    
    
    p = Popen([hgPath(), "log", arg0], 
        bufsize=1024, stdout=PIPE, close_fds=True, stderr=PIPE, cwd=repositoryPath(arg0))
    child_stdout = p.stdout
    child_stderr = p.stderr
    
    logEntries = []
    logEntry = {'msg':''}
    for line in child_stdout:
        
        if line[0] == '\n':
            logEntry['msg'] = logEntry['msg'].strip()
            logEntries.append(logEntry)
            logEntry = {'msg':''}
            line = line[1:]
        
        if line.startswith("changeset: "):
            logEntry['rev'] = line[10:].strip().split(':')[1]
        elif line.startswith("user: "):
            logEntry['author'] = line[6:].strip()
        elif line.startswith("date: "):
            logEntry['date'] = line[6:].strip()
        else:
            logEntry['msg'] += line[8:].strip() + " "
    
    logEntry['msg'] = logEntry['msg'].strip()
    logEntries.append(logEntry)
    
    returnCode = p.wait()
    
    returnDict = dict(revList=logEntries, returnCode=returnCode, stdErr=child_stderr.read())
    
    child_stdout.close()
    child_stderr.close()
    
    plistData, errorString = \
        NSPropertyListSerialization.dataFromPropertyList_format_errorDescription_(returnDict, 
            NSPropertyListXMLFormat_v1_0, objc.NULL)
    sys.stdout.write(plistData.bytes())
    
def test_managed(arg0):
    """ tests to see if arg0 is under SCM, prints a plist containing TRUE if it does, FALSE 
        if it does not
    """
    
    p = Popen([hgPath(), "status", arg0], bufsize=1024, stdout=PIPE, stderr=PIPE, cwd=repositoryPath(arg0),
        close_fds=True)
    child_stdout = p.stdout
    child_stderr = p.stderr
    
    returnCode = p.wait()
    err = child_stderr.read()
    
    isManaged = not len(err) and not child_stdout.read().startswith("?")
    
    if isManaged:
        returnCode = 0
    
    returnDict = dict(isManaged=isManaged, returnCode=returnCode, stdErr=err)
    
    child_stdout.close()
    child_stderr.close()
    
    plistData, errorString = \
        NSPropertyListSerialization.dataFromPropertyList_format_errorDescription_(returnDict, 
            NSPropertyListXMLFormat_v1_0, objc.NULL)
    sys.stdout.write(plistData.bytes())

def export_rev(arg0, exportURL, rev):
    """ exports a revision to a temp file, then prints the a plist containing the temp file path
    """
    import warnings
    
    # Don't need the tmpnam warning here, we're stuck with this :-(
    warnings.simplefilter("ignore", RuntimeWarning)
    svnTmpPath = os.tmpnam()
    warnings.resetwarnings()
    
    p = Popen([hgPath(), "cat", "-r", "%s" % (rev), arg0], bufsize=4096, stdout=PIPE, 
        stderr=PIPE, close_fds=True, cwd=repositoryPath(arg0))
    child_stdout = p.stdout
    child_stderr = p.stderr
    
    file(svnTmpPath, "w").write(child_stdout.read())
    
    returnCode = p.wait()
    
    returnDict = dict(tmpPath=svnTmpPath, returnCode=returnCode, stdErr=child_stderr.read()) 
    
    child_stdout.close()
    child_stderr.close()
    
    plistData, errorString = \
            NSPropertyListSerialization.dataFromPropertyList_format_errorDescription_(returnDict, 
                NSPropertyListXMLFormat_v1_0, objc.NULL)
    sys.stdout.write(plistData.bytes())
        
def url_for_export(arg0):
    """ Not really that meaningful for hg.
    Supposed to return a real URL in svn, but not needed for hg.
    """
    returnDict = dict(exportURL=arg0, returnCode=0, stdErr='')
    
    # Just comment this out to save time until it is really needed.
    
    # arg0 = os.path.abspath(arg0)
    # 
    # hgReposPath = repositoryPath(arg0)
    # 
    # commonPath = os.path.commonprefix([hgReposPath, arg0])
    # 
    # exportURL = arg0[len(commonPath):]
    # 
    # if exportURL[0] == os.sep:
    #     exportURL = exportURL[1:]
    # 
    # returnDict = dict(exportURL=exportURL, returnCode=0, stdErr='')
    
    plistData, errorString = \
        NSPropertyListSerialization.dataFromPropertyList_format_errorDescription_(returnDict, 
            NSPropertyListXMLFormat_v1_0, objc.NULL)
    sys.stdout.write(plistData.bytes())

def flavor():
    plistData, errorString = \
        NSPropertyListSerialization.dataFromPropertyList_format_errorDescription_("Mercurial", 
        NSPropertyListXMLFormat_v1_0, objc.NULL)
    sys.stdout.write(plistData.bytes())
    
def main():
    """ Parse and act on the options that are passed in 
    """
    global help
    optlist, args = getopt.getopt(sys.argv[1:], [], ["command=", "rev=","help","flavor","exporturl="])
    
    dispatch = dict(list_revs=list_revs, test_managed=test_managed, export_rev=export_rev, 
        url_for_export=url_for_export)
    
    command = None
    rev = None
    exportURL = None
    
    for o, a in optlist:
        if o == "--command" and dispatch.has_key(a):
            command = dispatch[a]
        elif o == "--rev":
            rev = a
        elif o == "--exporturl":
            exportURL = a
        elif o == "--flavor":
            flavor()
        elif o == "--help":
            sys.stderr.write(help)
        else:
            sys.stderr.write("Unrecognized switch: %s" % o)
    
    if command and rev and exportURL:
        command(args[0], exportURL, rev)
    elif command:
        command(args[0])
    


if __name__ == '__main__':
    main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.