plow / plow.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#! /usr/bin/env python2.6
# -*- coding: utf-8 -*-
# Copyright 2011 by Matteo Bertini <matteo@naufraghi.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import stat
import time
import re
import logging
import shutil
from itertools import groupby
from collections import defaultdict
import subprocess
import threading
from threading import Thread
from Queue import Queue
import multiprocessing
import gzip

import argparse

import execnet

logging.basicConfig(format='%(asctime)s %(levelname)s[%(name)s]: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger("plow")

__version_info__ = (0, 3, 0)
__version__ = ".".join(str(i) for i in __version_info__)

class col:
    PINK = '\033[95m'
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    RED = '\033[91m'
    ENDC = '\033[0m'

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks, gateway):
        Thread.__init__(self)
        self.tasks = tasks
        self.gateway = gateway
        self.daemon = True
        self.start()
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            func(self.gateway, self.tasks)(*args, **kargs)
            self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, group):
        self.tasks = Queue()
        self.group = group
        self.workers = [Worker(self.tasks, gw) for gw in group]
    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))
    def join(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()
    def terminate(self):
        """Terminate execnet Group"""
        self.group.terminate()

class CommandOptionParser(type):
    def __new__(meta, name, bases, attrs):
        parser = argparse.ArgumentParser(prefix_chars='-+')
        parser.add_argument("+INPLACE", nargs='*')
        parser.add_argument("+DUPLICATE", nargs='*')
        parser.add_argument("+CHECKEXISTS", nargs='*')
        parser.add_argument("+MKTARGETS", nargs='*')
        attrs['parser'] = parser
        return type.__new__(meta, name, bases, attrs)

class BaseTask(object):
    __metaclass__ = CommandOptionParser
    def __init__(self, targets, deps, comment, commands, linenum=-1):
        self.targets = targets
        self.deps = deps
        self.comment = comment
        self.commands = [c for c in commands if c.strip()]
        self.linenum = linenum
        self._parse_options()
        # TODO: ensure key immutability
        self._hash = hash((
                           tuple(sorted(self.targets)),
                           tuple(sorted(self.deps)),
                           tuple(sorted(self.commands)),
                          ))
    def _parse_options(self):
        args, others = self.parser.parse_known_args(" ".join([self.comment]).split())
        if args.MKTARGETS is not None:
            args.INPLACE = args.CHECKEXISTS = args.MKTARGETS
        # no option := None -> [], +OPTION := [] -> [all targets]
        self._inplace = set(self.targets if (args.INPLACE == []) else (args.INPLACE or []))
        self._duplicate = set(self.targets if (args.DUPLICATE == []) else (args.DUPLICATE or []))
        self._check_exists = set(self.targets if (args.CHECKEXISTS == []) else (args.CHECKEXISTS or []))
        self._order_only = not self.commands
    def is_outdated(self):
        newest_dep = max([mtime(dep) for dep in self.deps]+[0])
        # if target in _check_exists: -> rebuild only if missing
        def target_timestamp(target):
            timestamp = mtime(target)
            if timestamp < 0:
                return timestamp
            else:
                return timestamp if target not in self._check_exists else newest_dep
        oldest_target = min([target_timestamp(target) for target in self.targets])
        if oldest_target < newest_dep:
            return True
        else:
            return False
    def check_deps(self):
        missings = []
        for dep, task in self.deps.items():
            if not os.path.exists(dep) and not task._order_only:
                missings += [str(MissingTargetError(self, dep))]
        if missings:
            raise IOError("\n".join(missings))
    def run_if_needed(self, call_func=subprocess.check_call, inplace=False, force=False, progress=None):
        self.check_deps()
        thread = threading.current_thread()
        p = progress or ""
        for cmdline in self.commands:
            inplace = inplace or self._inplace
            cmdline_color_skip = " ".join((col.GREEN if (a in self.targets) else (col.YELLOW if a in self.deps else col.BLUE))+a+col.ENDC for a in cmdline.split())
            cmdline_color_run = " ".join((col.RED if (a in self.targets) else (col.YELLOW if a in self.deps else col.BLUE))+a+col.ENDC for a in cmdline.split())
            if self.is_outdated() or force:
                logger.info("{p}Running<{thread.name}>{f}: {cmdline_color_run}".format(f="[F]" if force else "", **locals()))

                movable_targets = set(self.targets) - self._inplace
                tmp_files = {}
                for target in movable_targets:
                    tmp_files[target] = target+".plowtmp"

                call_args = ["/bin/bash", "-c", " ".join(tmp_files.get(x, x) for x in cmdline.split())]
                output = call_func(call_args)

                for target in movable_targets:
                    try:
                        os.rename(tmp_files[target], target)
                    except:
                        logger.error("Problem renaming temp file {0!r}".format(tmp_files[target]))
                        raise
                return output
            else:
                logger.debug("{p}Skipping<{thread.name}>: {cmdline_color_skip}".format(**locals()))
    def __hash__(self):
        return self._hash
    def __eq__(self, other):
        return hash(self) == hash(other)
    def __repr__(self):
        linenum = "[line:{self.linenum}]".format(**locals()) if self.linenum != -1 else ""
        targets = " ".join(self.targets)
        return "<BaseTask{linenum} {targets!r}>".format(**locals())
    def __str__(self):
        targets = " ".join(self.targets)
        deps = " ".join(self.deps)
        commands = "\n\t".join(self.commands)
        tasks = [str(self.deps[dep]) for dep in self.deps if self.deps[dep]]
        linenum = " line: {self.linenum}".format(**locals()) if self.linenum != -1 else ""
        comment = " #{comment}{linenum}".format(comment=" "+self.comment, linenum=linenum) if any([self.comment, linenum]) else ""
        tasks += ["{targets}: {deps}{comment}\n\t{commands}".format(**locals())]
        return "\n".join(tasks)

def mtime(path):
    if os.path.isdir(path):
        return 0
    return os.stat(path).st_mtime if os.path.isfile(path) else -1

def parse_plow(lines):
    """
    Parse the provided plow and return a list of tasks
    """
    def _parse_plow(lines):
        lines = iter(lines)
        c = 0
        for line in lines:
            comment = []
            try:
                c += 1
                if line.find("#") > 0:
                    line, _comment = (l.strip() for l in line.split("#", 1))
                    if _comment:
                        comment += [_comment]
                if not line:
                    continue
                if ":" in line:
                    targets, deps = line.split(":", 1)
                    targets = dict((i.strip(), None) for i in targets.split())
                    deps = dict((i.strip(), None) for i in deps.split())
                    #TODO: add support for multiline command
                    commands = [lines.next().strip()]
                    comment = " ".join(comment)
                    yield BaseTask(targets, deps, comment, commands, c)
                    c += 1
            except:
                logger.exception("Problem parsing line {c}:\n\t{line}".format(**locals()))
                raise
    return list(_parse_plow(lines))

class DuplicateTargetError(Exception):
    def __init__(self, duplicates):
        self.duplicates = duplicates
    def __str__(self):
        msgs = []
        for target, l1, l2 in self.duplicates:
            msgs += ["Duplicate target: {target} line {l1} and {l2}".format(**locals())]
        return "\n".join(msgs)

class MissingTargetError(Exception):
    def __init__(self, task, dep):
        self.task = task
        self.dep = dep
    def __str__(self):
        return "Missing {self.dep!r} to satisfy {self.task!r} deps".format(**locals())

def create_tree(tasks, nodups=False, check_deps=True):
    """
    Given a list of tasks, connect deps to targets and create a DAG
    Returns a dictionary of all targets
    """
    all_targets = {}
    duplicates = []
    for task in tasks:
        for target in task.targets:
            if (target in all_targets) and (target not in task._duplicate):
                l1 = all_targets[target].linenum
                l2 = task.linenum
                duplicates += [(target, l1, l2)]
                logger.warn(DuplicateTargetError([(target, l1, l2)]))
            all_targets[target] = task
    if duplicates and nodups:
        raise DuplicateTargetError(duplicates)
    for task in tasks:
        for dep in task.deps:
            if check_deps and (dep not in all_targets) and (not os.path.exists(dep)):
                raise MissingTargetError(task, dep)
            task.deps[dep] = all_targets.get(dep, None)
    return all_targets

class TaskGraphError(Exception):
    def __init__(self, tasks, task, depth):
        self.tasks = tasks
        self.task = task
        self.depth = depth
    def __str__(self):
        msgs = ["Cyclic chain detected[depth={self.depth}]:".format(**locals())]
        for depth, task in sorted((v, k) for k, v in self.tasks.items()):
            msgs += ["\t[depth={depth}]{task!r}".format(**locals())]
        msgs += ["\t -> {self.task!r}".format(**locals())]
        return "\n".join(msgs)

def follow_tasks(main_task):
    tasks = {}
    def _follow_tasks(task, depth=0):
        #logger.debug("_follow_tasks({task!r}, {depth})".format(**locals()))
        if depth > len(tasks)+1:
            raise TaskGraphError(tasks, task, depth)
        tasks[task] = max(depth, tasks.get(task, 0))
        for itask in task.deps.values():
            if isinstance(itask, BaseTask):
                _follow_tasks(itask, depth+1)
    _follow_tasks(main_task)
    keyfunc = lambda t: -tasks[t]
    return [list(g) for d,g in groupby(sorted(tasks, key=keyfunc), keyfunc)]

def get_heads(targets):
    heads = dict((t,True) for t in targets)
    for target in targets:
        for dep in targets[target].deps:
            heads[dep] = False
    return sorted([t for t in heads if heads[t]], key=lambda t: targets[t].linenum)

def execnet_if_needed(gateway, tasks):
    channel = gateway.remote_exec("""
    from subprocess import Popen, PIPE, CalledProcessError
    import execnet
    def check_output(*popenargs, **kwargs):
        if 'stdout' in kwargs:
            raise ValueError('stdout argument not allowed, it will be overridden.')
        process = Popen(stdout=PIPE, *popenargs, **kwargs)
        output, unused_err = process.communicate()
        retcode = process.poll()
        if retcode:
            cmd = kwargs.get("args")
            if cmd is None:
                cmd = popenargs[0]
            raise CalledProcessError(retcode, cmd)
        return output
    try:
        channel.send(check_output(channel.receive()))
    except EOFError:
        pass
    """)
    def call_func(*args, **kwargs):
        channel.send(*args, **kwargs)
        return channel.receive()
    def _execnet_if_needed(task, inplace, task_changed, progress):
        if task_changed:
            logger.debug("{task!r} changed, force rebuild".format(**locals()))
        try:
            progress += "[remaining {0}]".format(tasks.qsize())
            output = task.run_if_needed(call_func, inplace, task_changed, progress)
            if output:
                sys.stdout.write(output)
        except:
            logger.exception("Problem running task {task!r}".format(**locals()))
            raise
    return _execnet_if_needed

def execute_plow(args, targets, prev_tasks):
    def dryrun(groups):
        for c, group in enumerate(groups):
            print "Group {c}".format(**locals())
            for task in group:
                print "\t{task!r}".format(**locals())
    # execute (single or multiprocess)
    pool = ThreadPool(execnet.Group(args.processes))
    for target in args.targets:
        user_task = targets[target]
        #logger.debug("follow_tasks({user_task!r})".format(**locals()))
        groups = follow_tasks(targets[target])
        if args.dry_run:
            dryrun(groups)
            continue
        for c, group in enumerate(groups):
            for t in group:
                #FIXME: added tasks should not result as modified
                pool.add_task(execnet_if_needed, t, args.inplace, t not in prev_tasks,
                              progress="[group {0}/{1}]".format(c+1, len(groups)))
            pool.join()
    pool.terminate()
    logger.info("Done!")

def xspec(value):
    if os.path.exists(value):
        return [l.strip() for l in open(value).readlines() if not l.startswith('#')]
    try:
        value = int(value)
    except:
        raise argparse.ArgumentTypeError("Invalid number of precesses: {0!r}".format(value))
    if value > 0:
        return ["popen"]*value
    else:
        raise argparse.ArgumentTypeError("Invalid number of precesses: {0} < 1".format(value))

def stdopen(mode="r"):
    def _stdopen(filename):
        if filename == '-':
            stream = sys.stdin if "r" in mode else sys.stdout
        elif ("w" in mode) or os.path.exists(filename):
            #FIXME: gzip.open is very slow, consider using <(zcat my.plow.gz)
            if filename.endswith(".gz"):
                stream = gzip.open(filename, mode, compresslevel=5)
            else:
                stream = open(filename, mode)
        else:
            raise argparse.ArgumentTypeError("File {0!r} not found!".format(filename))
        return stream
    return _stdopen

def is_regular_file(stream):
    return stat.S_ISREG(os.fstat(stream.fileno())[stat.ST_MODE])

def main():
    cpu_count = multiprocessing.cpu_count()
    parser = argparse.ArgumentParser(description="Simple plow runner")
    parser.add_argument("plow", type=stdopen())
    parser.add_argument("targets", nargs="*", help="Targets to run")
    parser.add_argument("-i", "--inplace", action="store_true", help="Skip intermediate temp file creation")
    parser.add_argument("-e", "--check-exists", action="store_true", help="Check only if target exists")
    parser.add_argument("-j", "--processes", type=xspec, default=["popen"]*cpu_count,
                        help="Number of concurrent processes (default: {cpu_count} local) "
                             "or xspec.list (see execnet docs for xspec syntax)".format(**locals()))
    parser.add_argument("-a", "--all", action="store_true", help="Run all targets")
    parser.add_argument("-g", "--grep", help="Grep targets")
    parser.add_argument("-t", "--echo-targets", action="store_true", help="Echo targets")
    parser.add_argument("-n", "--dry-run", action="store_true", help="Print only commands")
    #TODO: add --force to force rebuild or --clean
    #TODO: ignore redundant deps changes (a: b c, d: a) -> (a: b c, d: a b) [b is redundant]

    parser.add_argument("-v", "--verbosity", action='count', default=0)
    parser.add_argument("-q", "--quiet", action='count', default=0)
    parser.add_argument("-o", "--output", type=stdopen('w+'), default=sys.stdout)

    args = parser.parse_args()
    logger.setLevel(max(1, logging.INFO-10*(args.verbosity - args.quiet)))

    # parse makeflow
    args.plow_is_regular_file = is_regular_file(args.plow)
    tasks = parse_plow(args.plow)
    args.plow.close()
    targets = create_tree(tasks)
    prev_tasks = set(tasks)

    if not args.plow_is_regular_file:
        logger.warn("Reading plow from stdin/pipe, can rely only on timestamps")
    else: # try to load previous plow
        plowname = os.path.abspath(args.plow.name)
        prev_plowname = os.path.dirname(plowname)+"/."+os.path.basename(plowname)
        if os.path.exists(prev_plowname):
            logger.debug("Loading previous plow from {0!r}".format(prev_plowname))
            prev_plow = stdopen()(prev_plowname) #TODO: waiting for gzip contextlib-ation
            prev_tasks = set(parse_plow(prev_plow))
            prev_plow.close()

    # check command line targets
    if args.all:
        args.targets = get_heads(targets)
    if args.grep:
        egrep = re.compile(args.grep)
        args.targets += sorted([t for t in targets if (egrep.search(t) is not None)], key=lambda t: targets[t].linenum)
        if not args.echo_targets:
            logger.debug("Grep selected targets:\n\t{_targets}".format(_targets="\n\t".join(args.targets)))
        #TODO: parallelizze targets if possible
    if not args.targets or not all(t in targets for t in args.targets):
        num = 10 if logger.level >= logging.INFO else None
        ellips = ["..."] if (num and len(targets) < num) else []
        msg = "\n".join(["Select a valid target in:"]+["\t{0}".format(target) for target in sorted(targets)[:num]+ellips])
        parser.error(msg)

    if args.echo_targets:
        for target in args.targets:
            args.output.write("{0}\n".format(target))
    else:
        execute_plow(args, targets, prev_tasks)

        # if completed succesfully, save plow
        if args.plow_is_regular_file:
            logger.debug("Saving last completed plow in {0!r}".format(prev_plowname))
            shutil.copy2(args.plow.name, prev_plowname)
        #FIXME: manage interrupted runs

if __name__ == "__main__":
    main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.