1. Matteo Bertini
  2. plow


plow / memo.py

#! /usr/bin/env python2.6
# -*- coding: utf-8 -*-
# Copyright 2011 by Matteo Bertini <matteo@naufraghi.net>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import logging
from itertools import chain
import subprocess
import gzip

import argparse

logging.basicConfig(format='%(asctime)s %(levelname)s[%(name)s]: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger("memo")

__version_info__ = (0, 3, 0)
__version__ = ".".join(str(i) for i in __version_info__)

class col:
    PINK = '\033[95m'
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    RED = '\033[91m'
    ENDC = '\033[0m'

def try_rename(source, target):
    if os.path.exists(source):
        os.rename(source, target)
        raise OSError("Cannot rename {0!r} in {1!r}, source file is missing!".format(source, target))

def run_if_needed(args):
    latest_timestamp = max([(os.stat(dep).st_mtime if os.path.exists(dep) else -1) for dep in args.deps]+[0])
    target_timestamp = min((os.stat(target).st_mtime if os.path.exists(target) else -1) for target in args.targets)

    if target_timestamp == -1:
        status = "MISSING"
    elif target_timestamp < latest_timestamp:
        status = "OLDER"
        status = "NEWER"

    def colorize(part, target_color=col.RED, dep_color=col.YELLOW, default_color=col.BLUE):
        if part in args.targets:
            COL = target_color
        elif part in args.deps:
            COL = dep_color
            COL = default_color
        return "".join((COL, part, col.ENDC))

    if status == "MISSING" or (status == "OLDER" and not args.check_exists):
        cmdline_color_run = " ".join(colorize(part, col.RED, col.YELLOW, col.BLUE) for part in args.cmdargs)
        logger.info("Running: {0}".format(cmdline_color_run))
        call_args = ["/bin/bash", "-c", " ".join(args.cmdargs)]
        if not args.inplace:
            tmp_files = {}
            for target in args.targets:
                tmp_files[target] = target+".memo"
            call_args = ["/bin/bash", "-c", " ".join(tmp_files.get(x, x) for x in args.cmdargs)]
        if not args.inplace:
            for target in args.targets:
                try_rename(tmp_files[target], target)
        cmdline_color_skip = " ".join(colorize(part, col.GREEN, col.YELLOW, col.BLUE) for part in args.cmdargs)
        logger.debug("Skipping: {0}".format(cmdline_color_skip))

def generate_pakeflow(args):
    command = " ".join(args.cmdargs)
    targets = " ".join(args.targets)
    deps = " ".join(args.deps)
    args.output.write("{targets}: {deps}\n\t{command}\n".format(**locals()))

def process_cmdline(args, cmdline):
    args.cmdargs = cmdline.split()
    args.targets = set(args.targets if args.targets else args.cmdargs[-1:])
    def can_be_dep(c, dep):
        if dep in args.targets:
            return False
        if os.path.exists(dep):
            return True
        if os.path.exists(os.path.dirname(dep)):
            return True
        if dep.startswith("-"):
            return False
        if dep.endswith(","):
            return False
        if any((x in dep) for x in "<>()$|#'\"[]*"):
            return False
        if dep.strip("1234567890.") == "":
            return False
        if "." not in dep:
            return False
        if c == 0 and dep[0] not in "./": # executable:
            return False
        return True
    args.deps = set(d for c,d in enumerate(args.cmdargs) if can_be_dep(c,d))

    if not args.pakeflow:

def parse_targets(cmdline):
    if "#" in cmdline:
        parser = argparse.ArgumentParser(prefix_chars='-+')
        parser.add_argument("+TARGETS", nargs='+')
        parser.add_argument("+MKTARGETS", nargs='*')
        command, comment = cmdline.split("#", 1)
        args, other = parser.parse_known_args(comment.split())
        if args.MKTARGETS is not None:
            other += ["+MKTARGETS"] + args.MKTARGETS
        cmdline = command
        if other:
            cmdline = " # ".join([command, " ".join(other)])
        targets = (args.TARGETS or []) + (args.MKTARGETS or [])
        targets = []
    return targets, cmdline

def stdopen(mode="r"):
    def _stdopen(filename):
        if filename == '-':
            stream = sys.stdin if "r" in mode else sys.stdout
        elif ("w" in mode) or os.path.exists(filename):
            if filename.endswith(".gz"):
                stream = gzip.open(filename, mode, compresslevel=5)
                stream = open(filename, mode)
            raise argparse.ArgumentTypeError("File {0!r} not found!".format(filename))
        return stream
    return _stdopen

def main():
    parser = argparse.ArgumentParser(description="Execute a script only if deps are updated")
    parser.add_argument("cmdline", nargs='*')
    parser.add_argument("-t", "--targets", nargs='+', help="Explicit targets if multiple or not the last argument")
    parser.add_argument("-e", "--check-exists", action="store_true", help="Check only if target exists")
    parser.add_argument("-i", "--inplace", action="store_true", help="Skip intermediate temp file creation")
    parser.add_argument("-p", "--pakeflow", action="store_true", help="Generate pakeflow task")
    parser.add_argument("-l", "--command-list", type=stdopen(), help="Read commands from file")

    parser.add_argument("-v", "--verbosity", action='count', default=0)
    parser.add_argument("-q", "--quiet", action='count', default=0)
    parser.add_argument("-o", "--output", type=stdopen('w+'), default=sys.stdout)

    args = parser.parse_args()
    logger.setLevel(logging.INFO-10*(args.verbosity - args.quiet))

    if args.command_list:
        for command in args.command_list:
            args.targets, cmdline = parse_targets(command)
            process_cmdline(args, cmdline)
        process_cmdline(args, " ".join(args.cmdline))

if __name__ == "__main__":