Snakemake / snakemake / workflow.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# -*- coding: utf-8 -*-

__author__ = "Johannes Köster"

import re
import os
import sys
import signal
from collections import OrderedDict
from itertools import filterfalse, chain
from functools import partial
from operator import attrgetter

from snakemake.logging import logger
from snakemake.rules import Rule, Ruleorder
from snakemake.exceptions import RuleException, CreateRuleException, \
    UnknownRuleException, NoRulesException, print_exception
from snakemake.shell import shell
from snakemake.dag import DAG
from snakemake.scheduler import JobScheduler
from snakemake.parser import parse
from snakemake.io import protected, temp, temporary, expand, dynamic, glob_wildcards, flag, not_iterable
from snakemake.persistence import Persistence


class Workflow:
    def __init__(self, snakefile=None, snakemakepath=None, jobscript=None):
        """
        Create the controller.
        """
        self._rules = OrderedDict()
        self.first_rule = None
        self._workdir = None
        self._ruleorder = Ruleorder()
        self._localrules = set()
        self.linemaps = dict()
        self.rule_count = 0
        self.basedir = os.path.dirname(snakefile)
        self.snakefile = os.path.abspath(snakefile)
        self.snakemakepath = snakemakepath
        self.jobscript = jobscript
        self.persistence = None
        self.global_resources = None
        self.globals = globals()
        self._subworkflows = dict()

    @property
    def subworkflows(self):
        return self._subworkflows.values()

    @property
    def rules(self):
        return self._rules.values()

    @property
    def concrete_files(self):
        return (file for rule in self.rules for file in chain(rule.input, rule.output) if not callable(file) and not file.contains_wildcard())

    def check(self):
        for clause in self._ruleorder:
            for rulename in clause:
                if not self.is_rule(rulename):
                    raise UnknownRuleException(
                        rulename, prefix = "Error in ruleorder definition.")

    def add_rule(self, name=None, lineno=None, snakefile=None):
        """
        Add a rule.
        """
        if name is None:
            name = str(len(self._rules) + 1)
        if self.is_rule(name):
            raise CreateRuleException(
                "The name {} is already used by another rule".format(name))
        rule = Rule(name, self, lineno=lineno, snakefile=snakefile)
        self._rules[rule.name] = rule
        self.rule_count += 1
        if not self.first_rule:
            self.first_rule = rule.name
        return name

    def is_rule(self, name):
        """
        Return True if name is the name of a rule.

        Arguments
        name -- a name
        """
        return name in self._rules

    def get_rule(self, name):
        """
        Get rule by name.

        Arguments
        name -- the name of the rule
        """
        if not self._rules:
            raise NoRulesException()
        if not name in self._rules:
            raise UnknownRuleException(name)
        return self._rules[name]

    def list_rules(self, details=True, log=logger.info):
        log("Available rules:")
        for rule in self.rules:
            log(rule.name)
            if details:
                if rule.docstring:
                    for line in rule.docstring.split("\n"):
                        log("\t" + line)

    def is_local(self, rule):
        return rule.name in self._localrules

    def execute(
        self, targets=None, dryrun=False,  touch=False, cores=1, nodes=1,
        forcetargets=False, forceall=False, forcerun=None,
        prioritytargets=None, quiet=False, keepgoing=False,
        printshellcmds=False, printreason=False, printdag=False,
        cluster=None, immediate_submit=False, ignore_ambiguity=False,
        workdir=None, printrulegraph=False,
        stats=None, force_incomplete=False, ignore_incomplete=False,
        list_version_changes=False, list_code_changes=False,
        list_input_changes=False, list_params_changes=False,
        summary=False, output_wait=3, nolock=False, unlock=False,
        resources=None, notemp=False, nodeps=False,
        cleanup_metadata=None, subsnakemake=None):

        self.global_resources = dict() if cluster or resources is None else resources
        self.global_resources["_cores"] = cores
        self.global_resources["_nodes"] = nodes

        def rules(items):
            return map(self._rules.__getitem__, filter(self.is_rule, items))

        def files(items):
            return map(os.path.relpath, filterfalse(self.is_rule, items))

        if workdir is None:
            workdir = os.getcwd() if self._workdir is None else self._workdir
        os.chdir(workdir)

        if not targets:
            targets = [self.first_rule] if self.first_rule is not None else list()
        if prioritytargets is None:
            prioritytargets = list()
        if forcerun is None:
            forcerun = list()

        priorityrules = set(rules(prioritytargets))
        priorityfiles = set(files(prioritytargets))
        forcerules = set(rules(forcerun))
        forcefiles = set(files(forcerun))
        targetrules = set(chain(
            rules(targets), filterfalse(Rule.has_wildcards, priorityrules),
            filterfalse(Rule.has_wildcards, forcerules)))
        targetfiles = set(chain(files(targets), priorityfiles, forcefiles))
        if forcetargets:
            forcefiles.update(targetfiles)
            forcerules.update(targetrules)

        dag = DAG(
            self, dryrun=dryrun, targetfiles=targetfiles,
            targetrules=targetrules,
            forceall=forceall, forcefiles=forcefiles,
            forcerules=forcerules, priorityfiles=priorityfiles,
            priorityrules=priorityrules, ignore_ambiguity=ignore_ambiguity,
            force_incomplete=force_incomplete,
            ignore_incomplete=ignore_incomplete, notemp=notemp)

        self.persistence = Persistence(nolock=nolock, dag=dag)

        if cleanup_metadata:
            for f in cleanup_metadata:
                self.persistence.cleanup_metadata(f)
            return True

        dag.init()
        dag.check_dynamic()

        if unlock:
            try:
                self.persistence.cleanup_locks()
                logger.info("Unlocking working directory.")
                return True
            except IOError:
                logger.error("Error: Unlocking the directory {} failed. Maybe "
                "you don't have the permissions?")
                return False
        try:
            self.persistence.lock()
        except IOError:
            logger.error("Error: Directory cannot be locked. Please make "
                "sure that no other Snakemake process is trying to create "
                "the same files in the following directory:\n{}\n"
                "If you are sure that no other "
                "instances of snakemake are running on this directory, "
                "the remaining lock was likely caused by a kill signal or "
                "a power loss. It can be removed with "
                "the --unlock argument.".format(os.getcwd()))
            return False

        if self.subworkflows:
            # backup globals
            globals_backup = dict(self.globals)
            # execute subworkflows
            for subworkflow in self.subworkflows:
                subworkflow_targets = subworkflow.targets(dag)
                if subworkflow_targets:
                    logger.info("Executing subworkflow {}.".format(subworkflow.name))
                    if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow_targets):
                        return False
                else:
                    logger.info("Subworkflow {}: Nothing to be done.".format(subworkflow.name))
            if self.subworkflows:
                logger.info("Executing main workflow.")
            # rescue globals
            self.globals.update(globals_backup)

        dag.check_incomplete()
        dag.postprocess()

        if nodeps:
            missing_input = [f for job in dag.targetjobs for f in job.input if dag.needrun(job) and not os.path.exists(f)]
            if missing_input:
                logger.error("Dependency resolution disabled (--nodeps) "
                    "but missing input " 
                    "files detected. If this happens on a cluster, please make sure "
                    "that you handle the dependencies yourself or turn of "
                    "--immediate-submit. Missing input files:\n{}".format(
                        "\n".join(missing_input)))
                return False

        if printdag:
            print(dag)
            return True
        elif printrulegraph:
            print(dag.rule_dot())
            return True
        elif summary:
            print("\n".join(dag.summary()))
            return True
        elif list_version_changes:
            items = list(chain(
                *map(self.persistence.version_changed, dag.jobs)))
            if items:
                print(*items, sep="\n")
            return True
        elif list_code_changes:
            items = list(chain(
                *map(self.persistence.code_changed, dag.jobs)))
            if items:
                print(*items, sep="\n")
            return True
        elif list_input_changes:
            items = list(chain(
                *map(self.persistence.input_changed, dag.jobs)))
            if items:
                print(*items, sep="\n")
            return True
        elif list_params_changes:
            items = list(chain(
                *map(self.persistence.params_changed, dag.jobs)))
            if items:
                print(*items, sep="\n")
            return True

        scheduler = JobScheduler(
            self, dag, cores, dryrun=dryrun, touch=touch, cluster=cluster,
            immediate_submit=immediate_submit,
            quiet=quiet, keepgoing=keepgoing,
            printreason=printreason, printshellcmds=printshellcmds,
            output_wait=output_wait)

        if not dryrun and not quiet and len(dag):
            if cluster:
                logger.info("Provided cluster nodes: {}".format(nodes))
            else:
                logger.info("Provided cores: {}".format(cores))
            logger.info("\n".join(dag.stats()))

        success = scheduler.schedule()

        if success:
            if dryrun:
                if not quiet:
                    logger.info("\n".join(dag.stats()))
            elif stats:
                scheduler.stats.to_csv(stats)
        else:
            logger.error(
                "Exiting because a job execution failed. "
                "Look above for error message")
            return False
        return True

    def include(self, snakefile, workdir=None, overwrite_first_rule=False,
        print_compilation=False):
        """
        Include a snakefile.
        """
        global workflow
        global rules
        workflow = self
        rules = Rules()
        first_rule = self.first_rule
        if workdir:
            os.chdir(workdir)
        code, linemap = parse(snakefile)

        if print_compilation:
            print(code)

        self.linemaps[snakefile] = linemap
        exec(compile(code, snakefile, "exec"), self.globals)
        if not overwrite_first_rule:
            self.first_rule = first_rule

    def workdir(self, workdir):
        if self._workdir is None:
            if not os.path.exists(workdir):
                os.makedirs(workdir)
            self._workdir = workdir

    def ruleorder(self, *rulenames):
        self._ruleorder.add(*rulenames)


    def subworkflow(self, name, snakefile=None, workdir=None):
        sw = Subworkflow(self, name, snakefile, workdir)
        self._subworkflows[name] = sw
        self.globals[name] = sw.target

    def localrules(self, *rulenames):
        self._localrules.update(rulenames)

    def rule(self, name=None, lineno=None, snakefile=None):
        name = self.add_rule(name, lineno, snakefile)
        rule = self.get_rule(name)

        def decorate(ruleinfo):
            if ruleinfo.input:
                rule.set_input(*ruleinfo.input[0], **ruleinfo.input[1])
            if ruleinfo.output:
                rule.set_output(*ruleinfo.output[0], **ruleinfo.output[1])
            if ruleinfo.params:
                rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1])
            if ruleinfo.threads:
                if not isinstance(ruleinfo.threads, int):
                    raise RuleException("Threads value has to be an integer.",
                        rule=rule)
                rule.resources["_cores"] = ruleinfo.threads
            if ruleinfo.resources:
                args, resources = ruleinfo.resources
                if args:
                    raise RuleException("Resources have to be named.")
                if not all(map(lambda r: isinstance(r, int), resources.values())):
                    raise RuleException("Resources values have to be integers.", rule=rule)
                rule.resources.update(resources)
            if ruleinfo.priority:
                if (not isinstance(ruleinfo.priority, int)
                    and not isinstance(ruleinfo.priority, float)):
                    raise RuleException("Priority values have to be numeric.",
                        rule=rule)
                rule.priority = ruleinfo.priority
            if ruleinfo.version:
                rule.version = ruleinfo.version
            if ruleinfo.log:
                rule.log = ruleinfo.log
            if ruleinfo.message:
                rule.message = ruleinfo.message
            rule.docstring = ruleinfo.docstring
            rule.run_func = ruleinfo.func
            rule.shellcmd = ruleinfo.shellcmd
            ruleinfo.func.__name__ = "__{}".format(name)
            self.globals[ruleinfo.func.__name__] = ruleinfo.func
            setattr(rules, name, rule)
            return ruleinfo.func
        return decorate

    def docstring(self, string):
        def decorate(ruleinfo):
            ruleinfo.docstring = string
            return ruleinfo
        return decorate

    def input(self, *paths, **kwpaths):
        def decorate(ruleinfo):
            ruleinfo.input = (paths, kwpaths)
            return ruleinfo
        return decorate

    def output(self, *paths, **kwpaths):
        def decorate(ruleinfo):
            ruleinfo.output = (paths, kwpaths)
            return ruleinfo
        return decorate

    def params(self, *params, **kwparams):
        def decorate(ruleinfo):
            ruleinfo.params = (params, kwparams)
            return ruleinfo
        return decorate

    def message(self, message):
        def decorate(ruleinfo):
            ruleinfo.message = message
            return ruleinfo
        return decorate

    def threads(self, threads):
        def decorate(ruleinfo):
            ruleinfo.threads = threads
            return ruleinfo
        return decorate

    def resources(self, *args, **resources):
        def decorate(ruleinfo):
            ruleinfo.resources = (args, resources)
            return ruleinfo
        return decorate

    def priority(self, priority):
        def decorate(ruleinfo):
            ruleinfo.priority = priority
            return ruleinfo
        return decorate

    def version(self, version):
        def decorate(ruleinfo):
            ruleinfo.version = version
            return ruleinfo
        return decorate

    def log(self, log):
        def decorate(ruleinfo):
            ruleinfo.log = log
            return ruleinfo
        return decorate

    def shellcmd(self, cmd):
        def decorate(ruleinfo):
            ruleinfo.shellcmd = cmd
            return ruleinfo
        return decorate

    def run(self, func):
        return RuleInfo(func)

    @staticmethod
    def _empty_decorator(f):
        return f


class RuleInfo:

    def __init__(self, func):
        self.func = func
        self.shellcmd = None
        self.input = None
        self.output = None
        self.params = None
        self.message = None
        self.threads = None
        self.resources = None
        self.priority = None
        self.version = None
        self.log = None
        self.docstring = None


class Subworkflow:

    def __init__(self, workflow, name, snakefile, workdir):
        self.workflow = workflow
        self.name = name
        self._snakefile = snakefile
        self._workdir = workdir

    @property
    def snakefile(self):
        if self._snakefile is None:
            return os.path.join(self.workdir, "Snakefile")
        return os.path.join(self.workflow.basedir, self._snakefile)

    @property
    def workdir(self):
        workdir = "." if self._workdir is None else self._workdir
        return os.path.join(self.workflow.basedir, workdir)

    def target(self, paths):
        if not_iterable(paths):
            return flag(os.path.join(self.workdir, paths), "subworkflow", self)
        return [self.target(path) for path in paths]

    def targets(self, dag):
        return [f for job in dag.needrun_jobs for f in job.subworkflow_input if job.subworkflow_input[f] is self]


class Rules:
    """ A namespace for rules so that they can be accessed via dot notation. """
    pass
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.