1. Sean Davis
  2. Snakemake

Commits

Ryan Dale  committed 082ad27 Merge

Merged johanneskoester/snakemake into master

  • Participants
  • Parent commits 4a0c8ef, f82078f
  • Branches master

Comments (0)

Files changed (28)

File debian/changelog

View file
  • Ignore whitespace
+python3-snakemake (2.4.7.1) precise; urgency=low
+
+  * New upstream release.
+
+ -- Johannes Koester <johannes.koester@tu-dortmund.de>  Sun, 13 Oct 2013 19:24:04 +0200
+
+
+
+python3-snakemake (2.4.6) precise; urgency=low
+
+  * New upstream release.
+
+ -- Johannes Koester <johannes.koester@tu-dortmund.de>  Wed, 25 Sep 2013 14:08:04 +0200
+
+
 python3-snakemake (2.4.5) precise; urgency=low
 
   * New upstream release.

File snakemake/__init__.py

View file
  • Ignore whitespace
 import re
 import sys
 import inspect
+from functools import partial
+
 
 from snakemake.workflow import Workflow
 from snakemake.exceptions import print_exception
 from snakemake.logging import logger, init_logger
 
 __author__ = "Johannes Köster"
-__version__ = "2.4.5"
+__version__ = "2.4.7.1"
 
 
 def snakemake(snakefile,
     printreason=False,
     printshellcmds=False,
     printdag=False,
-    printruledag=False,
+    printrulegraph=False,
     nocolor=False,
     quiet=False,
     keepgoing=False,
     print_compilation=False,
     debug=False,
     notemp=False,
+    nodeps=False,
     jobscript=None):
     """
     Run snakemake on a given snakefile.
             # ignore: if it does not work we can still work without it
             pass
 
-    success = False
+    success = True
     try:
         workflow.include(snakefile, workdir=workdir,
             overwrite_first_rule=True, print_compilation=print_compilation)
             if listrules:
                 workflow.list_rules()
             else:
-                success = workflow.execute(
-                    targets=targets, dryrun=dryrun, touch=touch,
-                    cores=cores, forcetargets=forcetargets,
-                    forceall=forceall, forcerun=forcerun,
-                    prioritytargets=prioritytargets, quiet=quiet,
-                    keepgoing=keepgoing, printshellcmds=printshellcmds,
-                    printreason=printreason, printruledag=printruledag,
-                    printdag=printdag, cluster=cluster,
-                    immediate_submit=immediate_submit,
-                    ignore_ambiguity=ignore_ambiguity,
-                    workdir=workdir, stats=stats,
-                    force_incomplete=force_incomplete,
-                    ignore_incomplete=ignore_incomplete,
-                    list_version_changes=list_version_changes,
-                    list_code_changes=list_code_changes,
-                    list_input_changes=list_input_changes,
-                    list_params_changes=list_params_changes,
-                    summary=summary,
-                    output_wait=output_wait,
-                    nolock=not lock,
-                    unlock=unlock,
-                    resources=resources,
-                    notemp=notemp,
-                    cleanup_metadata=cleanup_metadata
-                    )
+                if not printdag and not printrulegraph:
+                    # handle subworkflows
+                    subsnakemake = partial(
+                        snakemake,
+                        cores=cores,
+                        resources=resources,
+                        dryrun=dryrun,
+                        touch=touch,
+                        printreason=printreason,
+                        printshellcmds=printshellcmds,
+                        nocolor=nocolor,
+                        quiet=quiet,
+                        keepgoing=keepgoing,
+                        cluster=cluster,
+                        immediate_submit=immediate_submit,
+                        standalone=standalone,
+                        ignore_ambiguity=ignore_ambiguity,
+                        snakemakepath=snakemakepath,
+                        lock=lock,
+                        unlock=unlock,
+                        cleanup_metadata=cleanup_metadata,
+                        force_incomplete=force_incomplete,
+                        ignore_incomplete=ignore_incomplete,
+                        output_wait=output_wait,
+                        debug=debug,
+                        notemp=notemp,
+                        nodeps=nodeps,
+                        jobscript=jobscript)
+                    for subworkflow in workflow.subworkflows:
+                        logger.warning("Executing subworkflow {}.".format(subworkflow.name))
+                        if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow.targets):
+                            success = False
+                    if workflow.subworkflows:
+                        logger.warning("Executing main workflow.")
+                if success:
+                    success = workflow.execute(
+                        targets=targets, dryrun=dryrun, touch=touch,
+                        cores=cores, forcetargets=forcetargets,
+                        forceall=forceall, forcerun=forcerun,
+                        prioritytargets=prioritytargets, quiet=quiet,
+                        keepgoing=keepgoing, printshellcmds=printshellcmds,
+                        printreason=printreason, printrulegraph=printrulegraph,
+                        printdag=printdag, cluster=cluster,
+                        immediate_submit=immediate_submit,
+                        ignore_ambiguity=ignore_ambiguity,
+                        workdir=workdir, stats=stats,
+                        force_incomplete=force_incomplete,
+                        ignore_incomplete=ignore_incomplete,
+                        list_version_changes=list_version_changes,
+                        list_code_changes=list_code_changes,
+                        list_input_changes=list_input_changes,
+                        list_params_changes=list_params_changes,
+                        summary=summary,
+                        output_wait=output_wait,
+                        nolock=not lock,
+                        unlock=unlock,
+                        resources=resources,
+                        notemp=notemp,
+                        nodeps=nodeps,
+                        cleanup_metadata=cleanup_metadata
+                        )
 
     except (Exception, BaseException) as ex:
         print_exception(ex, workflow.linemaps)
+        success = False
     if workdir:
         os.chdir(olddir)
     if workflow.persistence:
             "acyclic graph of jobs in the dot language. Recommended "
             "use on Unix systems: snakemake --dag | dot | display")
     parser.add_argument(
-        "--ruledag", action="store_true",
-        help="Do not execute anything and print the directed "
-            "acyclic graph of rules in the dot language. This will be less "
+        "--rulegraph", action="store_true",
+        help="Do not execute anything and print the dependency graph "
+            "of rules in the dot language. This will be less "
             "crowded than above DAG of jobs, but also show less information. "
+            "Note that each rule is displayed once, hence the displayed graph will be "
+            "cyclic if a rule appears in several steps of the workflow. "
             "Use this if above option leads to a DAG that is too large. "
             "Recommended use on Unix systems: snakemake --ruledag | dot | display")
     parser.add_argument(
         "any incomplete jobs.")
     parser.add_argument(
         "--list-version-changes", "--lv", action="store_true",
-        help="List all files that have been created with "
+        help="List all output files that have been created with "
         "a different version (as determined by the version keyword).")
     parser.add_argument(
         "--list-code-changes", "--lc", action="store_true",
-        help="List all files for which the rule body (run or shell) have changed "
+        help="List all output files for which the rule body (run or shell) have changed "
         "in the Snakefile.")
     parser.add_argument(
         "--list-input-changes", "--li", action="store_true",
-        help="List all files for which the defined input files have changed "
-        "in the Snakefile.")
+        help="List all output files for which the defined input files have changed "
+        "in the Snakefile (e.g. new input files were added in the rule definition or files were renamed). For listing input file modification in the filesystem, use --summary.")
     parser.add_argument(
         "--list-params-changes", "--lp", action="store_true",
-        help="List all files for which the defined params have changed "
+        help="List all output files for which the defined params have changed "
         "in the Snakefile.")
     parser.add_argument(
         "--output-wait", "-w", type=int, default=3, metavar="SECONDS",
             printshellcmds=args.printshellcmds,
             printreason=args.reason,
             printdag=args.dag,
-            printruledag=args.ruledag,
+            printrulegraph=args.rulegraph,
             touch=args.touch,
             forcetargets=args.force,
             forceall=args.forceall,
             print_compilation=args.print_compilation,
             debug=args.debug,
             jobscript=args.jobscript,
-            notemp=args.notemp
-            )
+            notemp=args.notemp)
     sys.exit(0 if success else 1)

File snakemake/dag.py

View file
  • Ignore whitespace
                 self._finished.add(newjob)
 
                 self.postprocess()
+                self.handle_protected(newjob)
 
     def update_dynamic(self, job):
         dynamic_wildcards = job.dynamic_wildcards
             yield job
 
     def is_isomorph(self, job1, job2):
-        #import pdb; pdb.set_trace()
         if job1.rule != job2.rule:
             return False
         rule = lambda job: job.rule.name
         return jobs
 
     def rule_dot2(self):
-        dag = dict()
-        job2node = dict()
-        for job, level in self.level_bfs(self.dependencies, *self.targetjobs):
-            node = (job.rule, level)
-            job2node[job] = node
-            for dep in self.dependencies[job]:
-                dag[job2node[job]].extend(
-                    job2node[dep] for dep in self.dependencies[job])
-        labels = dict((node, node[0]) for node in dag)
-        
+        dag = defaultdict(list)
+        visited = set()
+        preselect = set()
 
-    def dot(self):
-        return self._dot(self.jobs)
+        def preselect_parents(job):
+            for parent in self.depending[job]:
+                if parent in preselect:
+                    continue
+                preselect.add(parent)
+                preselect_parents(parent)
 
-    def rule_dot(self):
-        rule = lambda job: job.rule.name
-        dag = dict()
-        noniso = defaultdict(set)
-        def build_ruledag(job):
-            if job in dag:
+        def build_ruledag(job, key=lambda job: job.rule.name):
+            if job in visited:
                 return
-            dag[job] = list()
-            deps = sorted(self.dependencies[job], key=rule)
-            if deps:
-                for dep in deps: 
-                    if not any(map(
-                        partial(self.is_isomorph, dep), noniso[dep.rule].intersection(deps))):
-                        noniso[dep.rule].add(dep)
-                        build_ruledag(dep)
-                dag[job].extend(dep for dep in deps if dep in noniso[dep.rule])
+            visited.add(job)
+            deps = sorted(self.dependencies[job], key=key)
+            deps = [(
+                group[0] if preselect.isdisjoint(group) else preselect.intersection(group).pop()
+            ) for group in (list(g) for _, g in groupby(deps, key))]
+#            deps = [next(g) for _, g in groupby(deps, key)]
+            dag[job].extend(deps)
+            preselect_parents(job)
+            for dep in deps:
+                build_ruledag(dep)
+
         for job in self.targetjobs:
             build_ruledag(job)
 
-        return self._dot(
-            dag.keys(), print_wildcards=False, print_types=False, dag=dag)
+        return self._dot(dag.keys(), print_wildcards=False, print_types=False, dag=dag)
+
+    def rule_dot(self):
+        graph = defaultdict(set)
+        for job in self.jobs:
+            graph[job.rule].update(dep.rule for dep in self.dependencies[job])
+        return self._dot(graph)
+
+    def dot(self):
+        def node2style(job):
+            if not self.needrun(job):
+                return "rounded,dashed"
+            if self.dynamic(job) or job.dynamic_input:
+                return "rounded,dotted"
+            return "rounded"
+
+        def format_wildcard(wildcard):
+            name, value = wildcard
+            if _IOFile.dynamic_fill in value:
+                value = "..."
+            return "{}: {}".format(name, value)
+
+        node2rule = lambda job: job.rule
+        node2label = lambda job: "\\n".join(chain([job.rule.name], map(format_wildcard, self.new_wildcards(job))))
+
+        dag = {job: self.dependencies[job] for job in self.jobs}
+
+        return self._dot(dag, node2rule=node2rule, node2style=node2style, node2label=node2label)
 
     def _dot(
-        self, jobs, errors=False, print_wildcards=True,
-        print_types=True, dag=None):
-        if dag is None:
-            dag = self.dependencies
-        jobs = set(jobs)
+        self, graph,
+        node2rule=lambda node: node,
+        node2style=lambda node: "rounded",
+        node2label=lambda node: node):
 
+        # color rules
         huefactor = 2 / (3 * (len(self.rules) - 1))
-        rulecolor = dict(
-            (rule, "{:.2f} 0.6 0.85".format(i * huefactor))
-            for i, rule in enumerate(self.rules))
-
-        nodes, edges = list(), list()
-        types = ["running job", "not running job", "dynamic job"]
-        styles = [
-            'style="rounded"', 'style="rounded,dashed"',
-            'style="rounded,dotted"']
-        used_types = set()
-
-        if print_wildcards:
-            def format_wildcard(wildcard):
-                name, value = wildcard
-                if _IOFile.dynamic_fill in value:
-                    value = "..."
-                return "{}: {}".format(name, value)
-
-            format_label = lambda job: "\\n".join([job.rule.name] + list(
-                    map(format_wildcard, self.new_wildcards(job))))
-        else:
-            format_label = lambda job: job.rule.name
-        if print_types:
-            format_node = lambda t: styles[t]
-        else:
-            format_node = lambda _: styles[0]
+        rulecolor = {rule: "{:.2f} 0.6 0.85".format(i * huefactor)
+            for i, rule in enumerate(self.rules)}
 
-        for job in jobs:
-            t = 0
-            if not self.needrun(job):
-                t = 1
-            if self.dynamic(job) or job.dynamic_input:
-                t = 2
-            used_types.add(t)
+        # markup
+        node_markup = '\t{}[label = "{}", color = "{}", style="{}"];'.format
+        edge_markup = "\t{} -> {}".format
 
-            nodes.append('\t{}[label = "{}", color="{}", {}];'.format(
-                self.jobid(job), format_label(job), rulecolor[job.rule], format_node(t)))
+        # node ids
+        ids = {node: i for i, node in enumerate(graph)}
 
-            deps = set(map(self.jobid, dag[job]))
-            job = self.jobid(job)
-            for dep in deps:
-                edges.append("\t{} -> {};".format(dep, job))
-
-        legend = list()
-        if print_types:
-            if len(used_types) > 1:
-                for t in used_types:
-                    legend.append('\tlegend{}[label="{}", {}];'.format(
-                        t, types[t], styles[t]))
-                    for target in map(self.jobid, self.targetjobs):
-                        legend.append(
-                            "\t{} -> legend{}[style=invis];".format(target, t))
+        # calculate nodes
+        nodes = [node_markup(ids[node], node2label(node), rulecolor[node2rule(node)], node2style(node)) for node in graph]
+        # calculate edges
+        edges = [edge_markup(ids[dep], ids[node]) for node, deps in graph.items() for dep in deps]
 
         return textwrap.dedent(
             """\
                 node[shape=box, style=rounded, fontname=sans, \
                 fontsize=10, penwidth=2];
                 edge[penwidth=2, color=grey];
-            {nodes}
-            {edges}
-            {legend}
+            {items}
             }}\
-            """).format(nodes="\n".join(nodes),
-                        edges="\n".join(edges),
-                        legend="\n".join(legend))
+            """).format(items="\n".join(nodes + edges))
 
     def summary(self):
         yield "file\tdate\trule\tversion\tstatus\tplan"

File snakemake/exceptions.py

View file
  • Ignore whitespace
     linemaps -- a dict of a dict that maps for each snakefile
         the compiled lines to source code lines in the snakefile.
     """
-    #traceback.print_exception(type(ex), ex, ex.__traceback__)
+    traceback.print_exception(type(ex), ex, ex.__traceback__)
     origin = get_exception_origin(ex, linemaps)
     if origin is not None:
         lineno, file = origin

File snakemake/executors.py

View file
  • Ignore whitespace
     def _run(self, job):
         self.printjob(job)
 
+    def rule_prefix(self, job):
+        return "local " if self.workflow.is_local(job.rule) else ""
+
     def printjob(self, job):
         # skip dynamic jobs that will be "executed" only in dryrun mode
         if self.dag.dynamic(job):
             if job.message:
                 desc.append(job.message)
             else:
-                desc.append("rule {}:".format(job.rule.name))
+                desc.append("{}rule {}:".format(self.rule_prefix(job), job.rule.name))
                 for name, value in (
                     ("input", ", ".join(format_files(
                         job, job.input, job.ruleio, job.dynamic_input))),
         super()._run(job)
         workdir = os.getcwd()
         jobid = self.dag.jobid(job)
+        properties = job.json()
 
         jobscript = self.get_jobscript(job)
         jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid))
 
         deps = " ".join(self.external_jobid[f] for f in job.input if f in self.external_jobid)
         submitcmd = job.format_wildcards(self.submitcmd, dependencies=deps)
-        ext_jobid = subprocess.check_output(
-            '{submitcmd} "{jobscript}"'.format(
-                submitcmd=submitcmd,
-                jobscript=jobscript),
-            shell=True).decode().split("\n")
+        try:
+            ext_jobid = subprocess.check_output(
+                '{submitcmd} "{jobscript}"'.format(
+                    submitcmd=submitcmd,
+                    jobscript=jobscript),
+                shell=True).decode().split("\n")
+        except subprocess.CalledProcessError as ex:
+            raise WorkflowError("Error executing jobscript (exit code {}):\n{}".format(ex.returncode, ex.output.decode()), rule=job.rule)
         if ext_jobid and ext_jobid[0]:
             ext_jobid = ext_jobid[0]
             self.external_jobid.update((f, ext_jobid) for f in job.output)
         return os.path.join(self.tmpdir, "{}.snakemake-job.sh".format(self.dag.jobid(job)))
 
 
-
 def run_wrapper(run, input, output, params, wildcards, threads, resources, log, linemaps):
     """
     Wrapper around the run method that handles directory creation and

File snakemake/io.py

View file
  • Ignore whitespace
 import os
 import re
 import stat
-from itertools import product
-from collections import Iterable
+from itertools import product, chain
+from collections import Iterable, namedtuple
 from snakemake.exceptions import MissingOutputException, WorkflowError, WildcardError
 
 __author__ = "Johannes Köster"
                 value = wildcards[name]
                 if fail_dynamic and value == dynamic_fill:
                     raise WildcardError(name)
-                return value
+                return str(value)  # convert anything into a str
             except KeyError as ex:
                 if fill_missing:
                     return dynamic_fill
         return re.sub(_wildcard_regex, format_match, pattern)
 
 
-class temp(str):
+def not_iterable(value):
+    return isinstance(value, str) or not isinstance(value, Iterable)
+
+
+class AnnotatedString(str):
+    def __init__(self, value):
+        self.flags = set()
+
+
+def flag(value, flag):
+    if isinstance(value, AnnotatedString):
+        value.flags.add(flag)
+        return value
+    if not_iterable(value):
+        value = AnnotatedString(value)
+        value.flags.add(flag)
+        return value
+    return [flag(v, flag) for v in value]
+
+
+def is_flagged(value, flag):
+    if isinstance(value, AnnotatedString):
+        return flag in value.flags
+    return False
+
+
+def temp(value):
     """
     A flag for an input or output file that shall be removed after usage.
     """
-    pass
+    if is_flagged(value, "protected"):
+        raise SyntaxError("Protected and temporary flags are mutually exclusive.")
+    return flag(value, "temp")
 
 
-class temporary(temp):
+def temporary(value):
     """ An alias for temp. """
-    pass
+    return temp(value)
 
 
-class protected(str):
+def protected(value):
     """ A flag for a file that shall be write protected after creation. """
-    pass
+    if is_flagged(value, "temp"):
+        raise SyntaxError("Protected and temporary flags are mutually exclusive.")
+    return flag(value, "protected")
 
 
-class dynamic(str):
+def dynamic(value):
     """
     A flag for a file that shall be dynamic, i.e. the multiplicity
     (and wildcard values) will be expanded after a certain
     rule has been run """
-    def __new__(cls, file):
+    annotated = flag(value, "dynamic")
+    tocheck = [annotated] if not_iterable(annotated) else annotated
+    for file in tocheck:
         matches = list(_wildcard_regex.finditer(file))
         #if len(matches) != 1:
         #    raise SyntaxError("Dynamic files need exactly one wildcard.")
             if match.group("constraint"):
                 raise SyntaxError(
                     "The wildcards in dynamic files cannot be constrained.")
-        obj = str.__new__(cls, file)
-        return obj
+    return annotated
 
 
 def expand(*args, **wildcards):
                 values = [values]
             yield [(wildcard, value) for value in values]
 
-    expanded = list()
-    for comb in combinator(*flatten(wildcards)):
-        comb = dict(comb)
-        for filepattern in filepatterns:
-            expanded.append(filepattern.format(**comb))
-    return expanded
+    try:
+        return [filepattern.format(**comb) for comb in map(dict, combinator(*flatten(wildcards))) for filepattern in filepatterns]
+    except KeyError as e:
+        raise WildcardError("No values given for wildcard {}.".format(e))
+
+
+def glob_wildcards(pattern):
+    """
+    Glob the values of the wildcards by matching the given pattern to the filesystem.
+    Returns a named tuple with a list of values for each wildcard.
+    """
+    first_wildcard = re.search("{[^{]", pattern)
+    dirname = os.path.dirname(pattern[:first_wildcard.start()]) if first_wildcard else os.path.dirname(pattern)
+    if not dirname:
+        dirname = "."
+    
+    names = [match.group('name')
+        for match in _wildcard_regex.finditer(pattern)]
+    Wildcards = namedtuple("Wildcards", names)
+    wildcards = Wildcards(*[list() for name in names])
+
+    pattern = re.compile(regex(pattern))
+    for dirpath, dirnames, filenames in os.walk(dirname):
+        for f in chain(filenames, dirnames):
+            if dirpath != ".":
+                f = os.path.join(dirpath, f)
+            match = re.match(pattern, f)
+            if match:
+                for name, value in match.groupdict().items():
+                    getattr(wildcards, name).append(value)
+    return wildcards
 
 
 # TODO rewrite Namedlist!

File snakemake/jobs.py

View file
  • Ignore whitespace
 import os
 import sys
 import base64
+import json
 
 from collections import defaultdict
 from itertools import chain
             self.log, self.ruleio) = rule.expand_wildcards(
             self.wildcards_dict)
 
-        self.resources = dict()
-        for name, res in rule.resources.items():
-            self.resources[name] = min(self.rule.workflow.global_resources.get(name, 0), res)
+        self.resources = {
+            name: min(self.rule.workflow.global_resources.get(name, 0), res)
+            for name, res in rule.resources.items()}
         self.threads = self.resources["_cores"]
         self.resources = Resources(fromdict=self.resources)
         self._inputsize = None
         except NameError as ex:
             raise RuleException("NameError: " + str(ex), rule=self.rule)
 
+    def json(self):
+        resources = {name: res for name, res in self.resources.items() if name != "_cores"}
+        properties = {
+            "rule": self.rule.name,
+            "local": self.dag.workflow.is_local(self.rule),
+            "input": self.input,    
+            "output": self.output,
+            "threads": self.threads,
+            "resources": resources
+        }
+        return json.dumps(properties)
+
     def __repr__(self):
         return self.rule.name
 

File snakemake/jobscript.sh

View file
  • Ignore whitespace
 #!/bin/sh
-#rule: {job}
-#input: {job.input}
-#output: {job.output}
-#jobid: {jobid}
+# properties = {properties}
 {workflow.snakemakepath} --snakefile {workflow.snakefile} \
 --force -j{cores} \
 --directory {workdir} --nocolor --notemp --quiet --nolock {job.output} \

File snakemake/parser.py

View file
  • Ignore whitespace
     return token.type == tokenize.OP and token.string == ">"
 
 
+def is_comma(token):
+    return token.type == tokenize.COMMA
+
+
 def is_name(token):
     return token.type == tokenize.NAME
 
 
 class KeywordState(TokenAutomaton):
 
+    prefix = ""
+
     def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
         super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
         self.line = 0
 
     @property
     def keyword(self):
-        return self.__class__.__name__.lower()
+        return self.__class__.__name__.lower()[len(self.prefix):]
 
     def end(self):
         yield ")"
         yield "@workflow.{keyword}(".format(keyword=self.keyword)
 
 
+class SubworkflowKeywordState(KeywordState):
+    prefix = "Subworkflow"
+
+    def start(self):
+        yield ", {keyword}=".format(keyword=self.keyword)
+
+    def end(self):
+        # no end needed
+        return list()
+
+
 # Global keyword states
 
 
                 'e.g. rule1 > rule2 > rule3 ...', token)
 
 
+# subworkflows
+
+
+class SubworkflowSnakefile(SubworkflowKeywordState):
+    pass
+
+
+class SubworkflowWorkdir(SubworkflowKeywordState):
+    pass
+
+
+class Subworkflow(GlobalKeywordState):
+
+    subautomata = dict(
+        snakefile=SubworkflowSnakefile,
+        workdir=SubworkflowWorkdir)
+
+    def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
+        super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)
+        self.state = self.name
+        self.has_snakefile = False
+        self.has_workdir = False
+        self.has_name = False
+        self.primary_token = None
+
+    def end(self):
+        if not (self.has_snakefile or self.has_workdir):
+            self.error("A subworkflow needs either a path to a Snakefile or to a workdir.", self.primary_token)
+        yield ")"
+
+    def name(self, token):
+        if is_name(token):
+            yield "workflow.subworkflow('{name}'".format(name=token.string), token
+            self.has_name = True
+        elif is_colon(token) and self.has_name:
+            self.primary_token = token
+            self.state = self.block
+        else:
+            self.error("Expected name after subworkflow keyword.", token)
+
+    def block_content(self, token):
+        if is_name(token):
+            try:
+                if token.string == "snakefile":
+                    self.has_snakefile = True
+                if token.string == "workdir":
+                    self.has_workdir = True
+                for t in self.subautomaton(
+                    token.string).consume():
+                    yield t
+            except KeyError:
+                self.error("Unexpected keyword {} in "
+                    "subworkflow definition".format(token.string), token)
+            except StopAutomaton as e:
+                self.indentation(e.token)
+                for t in self.block(e.token):
+                    yield t
+        elif is_comment(token):
+            yield "\n", token
+            yield token.string, token
+        elif is_string(token):
+            # ignore docstring
+            pass
+        else:
+            self.error("Expecting subworkflow keyword, comment or docstrings "
+                "inside a subworkflow definition.", token)
+
+
+class Localrules(GlobalKeywordState):
+
+    def block_content(self, token):
+        if is_comma(token):
+            yield ",", token
+        elif is_name(token):
+            yield '"{}"'.format(token.string), token
+        else:
+            self.error('Expected a comma separated list of rules that shall '
+            'not be executed by the cluster command.', token)
+
+
 # Rule keyword states
 
 
         include=Include,
         workdir=Workdir,
         ruleorder=Ruleorder,
-        rule=Rule)
+        rule=Rule,
+        subworkflow=Subworkflow,
+        localrules=Localrules)
 
     def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
         super().__init__(snakefile, base_indent=base_indent, dedent=dedent, root=root)

File snakemake/rules.py

View file
  • Ignore whitespace
 
 from snakemake.io import IOFile, _IOFile, protected, temp, dynamic, Namedlist
 from snakemake.io import expand, InputFiles, OutputFiles, Wildcards, Params
-from snakemake.io import apply_wildcards
+from snakemake.io import apply_wildcards, is_flagged, not_iterable
 from snakemake.exceptions import RuleException, IOFileException, WildcardError
 
 __author__ = "Johannes Köster"
         io_, dynamic_io_ = get_io(branch)
 
         # replace the dynamic files with the expanded files
-        for i, e in reversed(list(expansion.items())):
-            dynamic_io_.remove(io[i])
-            io_.insert_items(i, e)
+        replacements = [(i, io[i], e) for i, e in reversed(list(expansion.items()))]
+        for i, old, exp in replacements:
+            dynamic_io_.remove(old)
+            io_.insert_items(i, exp)
+
         if not input:
+            for i, old, exp in replacements:
+                if old in branch.temp_output:
+                    branch.temp_output.discard(old)
+                    branch.temp_output.update(exp)
+                if old in branch.protected_output:
+                    branch.protected_output.discard(old)
+                    branch.protected_output.update(exp)
+
             branch.wildcard_names.clear()
             non_dynamic_wildcards = dict(
                 (name, values[0])
         inoutput = self.output if output else self.input
         if isinstance(item, str):
             _item = IOFile(item, rule=self)
-            if isinstance(item, temp):
+            if is_flagged(item, "temp"):
                 if not output:
                     raise SyntaxError("Only output files may be temporary")
                 self.temp_output.add(_item)
-            if isinstance(item, protected):
+            if is_flagged(item, "protected"):
                 if not output:
                     raise SyntaxError("Only output files may be protected")
                 self.protected_output.add(_item)
-            if isinstance(item, dynamic):
+            if is_flagged(item, "dynamic"):
                 if output:
                     self.dynamic_output.add(_item)
                 else:
             for name, item in olditems.allitems():
                 start = len(newitems)
                 if callable(item):
-                    items = item(wildcards_obj)
-                    if isinstance(items, str):
-                        items = [items]
-                    for item_ in items:
+                    item = item(wildcards_obj)
+                    if not_iterable(item):
+                        item = [item]
+                    for item_ in item:
                         concrete = concretize(item_, wildcards)
                         newitems.append(concrete)
                         if ruleio is not None:
                             ruleio[concrete] = item_
                 else:
-                    if isinstance(item, str):
+                    if not_iterable(item):
                         item = [item]
                     for item_ in item:
                         concrete = concretize(item_, wildcards)

File snakemake/scheduler.py

View file
  • Ignore whitespace
         keepgoing=False,
         output_wait=3):
         """ Create a new instance of KnapsackJobScheduler. """
+        self.cluster = cluster
         self.dag = dag
         self.workflow = workflow
         self.dryrun = dryrun
                 output_wait=output_wait)
         elif cluster:
             # TODO properly set cores
+            self._local_executor = CPUExecutor(
+                workflow, dag, cores, printreason=printreason,
+                quiet=quiet, printshellcmds=printshellcmds,
+                threads=use_threads,
+                output_wait=output_wait)
             self._executor = ClusterExecutor(
                 workflow, dag, None, submitcmd=cluster,
                 printreason=printreason, quiet=quiet,
                 self.rule_weight,
                 maxcores=1)
             if immediate_submit:
+                self.rule_reward = self.dryrun_rule_reward
                 self._submit_callback = partial(
                     self._proceed,
                     update_dynamic=False,
                     print_progress=False,
                     update_resources=False)
+            else:
+                self.run = self.run_cluster_or_local
         else:
             self._executor = CPUExecutor(
                 workflow, dag, cores, printreason=printreason,
             logger.debug("Selected jobs:\n\t" + "\n\t".join(map(str, run)))
             self.running.update(run)
             for job in run:
-                self._executor.run(
-                    job, callback=self._finish_callback,
-                    submit_callback=self._submit_callback,
-                    error_callback=self._error)
+                self.run(job)
+
+    def run(self, job):
+        self._executor.run(
+            job, callback=self._finish_callback,
+            submit_callback=self._submit_callback,
+            error_callback=self._error)
+
+    def run_cluster_or_local(self, job):
+        executor = self._local_executor if self.workflow.is_local(job.rule) else self._executor
+        executor.run(
+            job, callback=self._finish_callback,
+            submit_callback=self._submit_callback,
+            error_callback=self._error)
 
     def _noop(self, job):
         pass

File snakemake/utils.py

View file
  • Ignore whitespace
 __author__ = "Johannes Köster"
 
 import os
+import json
 import io
 import re
 import fnmatch
 def report(
     text, path,
     stylesheet=os.path.join(os.path.dirname(__file__), "report.css"),
-    defaultenc="utf8", template=None, **files):
+    defaultenc="utf8", template=None, metadata=None, **files):
     """
     Create an HTML report using python docutils.
     Attention: This function needs Python docutils to be installed for the
     defaultenc -- The encoding that is reported to the browser for embedded
         text files, defaults to utf8.
     template -- An optional path to a docutils HTML template.
+    metadata -- E.g. an optional author name or email address.
 
     All other keyword args are intepreted as paths to files that shall be
     embedded into the document. They keywords will be available as link
     .. container::
        :name: metadata
 
-       {metadata}
+       {metadata} {date}
 
-    """).format(metadata=datetime.date.today().isoformat())
+    """).format(metadata=metadata, date=datetime.date.today().isoformat())
 
     text = format(textwrap.dedent(text), stepout=2)
 
 
     def __str__(self):
         raise ValueError(self.errormsg)
+
+
+def read_job_properties(jobscript, prefix="# properties"):
+    with open(jobscript) as jobscript:
+        for l in jobscript:
+            if l.startswith(prefix):
+                return json.loads(l.split("=")[1])

File snakemake/workflow.py

View file
  • Ignore whitespace
 import signal
 from collections import OrderedDict
 from itertools import filterfalse, chain
+from functools import partial
 from operator import attrgetter
 
 from snakemake.logging import logger
 from snakemake.dag import DAG
 from snakemake.scheduler import JobScheduler
 from snakemake.parser import parse
-from snakemake.io import protected, temp, temporary, expand, dynamic
+from snakemake.io import protected, temp, temporary, expand, dynamic, glob_wildcards
 from snakemake.persistence import Persistence
 
 
         self.first_rule = None
         self._workdir = None
         self._ruleorder = Ruleorder()
+        self._localrules = set()
         self.linemaps = dict()
         self.rule_count = 0
-        self.snakefile = snakefile
+        self.basedir = os.path.dirname(snakefile)
+        self.snakefile = os.path.abspath(snakefile)
         self.snakemakepath = os.path.abspath(snakemakepath)
         self.jobscript = jobscript
         self.persistence = None
         self.global_resources = None
         self.globals = globals()
+        self._subworkflows = dict()
+
+    @property
+    def subworkflows(self):
+        return self._subworkflows.values()
 
     @property
     def rules(self):
                     for line in rule.docstring.split("\n"):
                         log("\t" + line)
 
+    def is_local(self, rule):
+        return rule.name in self._localrules
+
     def execute(
         self, targets=None, dryrun=False,  touch=False, cores=1,
         forcetargets=False, forceall=False, forcerun=None,
         prioritytargets=None, quiet=False, keepgoing=False,
         printshellcmds=False, printreason=False, printdag=False,
         cluster=None, immediate_submit=False, ignore_ambiguity=False,
-        workdir=None, printruledag=False,
+        workdir=None, printrulegraph=False,
         stats=None, force_incomplete=False, ignore_incomplete=False,
         list_version_changes=False, list_code_changes=False,
         list_input_changes=False, list_params_changes=False,
         summary=False, output_wait=3, nolock=False, unlock=False,
-        resources=None, notemp=False,
+        resources=None, notemp=False, nodeps=False,
         cleanup_metadata=None):
 
         self.global_resources = dict() if cluster or resources is None else resources
         os.chdir(workdir)
 
         if not targets:
-            targets = [self.first_rule]
+            targets = [self.first_rule] if self.first_rule is not None else list()
         if prioritytargets is None:
             prioritytargets = list()
         if forcerun is None:
         dag.check_incomplete()
         dag.postprocess()
 
+        if nodeps:
+            missing_input = [f for job in dag.targetjobs for f in job.input if dag.needrun(job) and not os.path.exists(f)]
+            logger.critical("Dependency resolution disabled (--nodeps) "
+                "but missing input " 
+                "files detected. If this happens on a cluster, please make sure "
+                "that you handle the dependencies yourself or turn of "
+                "--immediate-submit. Missing input files:\n{}".format(
+                    "\n".join(missing_input)))
+            
+            return False
+
         if printdag:
             print(dag)
             return True
-        elif printruledag:
+        elif printrulegraph:
             print(dag.rule_dot())
             return True
         elif summary:
     def ruleorder(self, *rulenames):
         self._ruleorder.add(*rulenames)
 
+
+    def subworkflow(self, name, snakefile=None, workdir=None):
+        sw = Subworkflow(self, name, snakefile, workdir)
+        self._subworkflows[name] = sw
+        self.globals[name] = sw.target
+
+    def localrules(self, *rulenames):
+        self._localrules.update(rulenames)
+
     def rule(self, name=None, lineno=None, snakefile=None):
         name = self.add_rule(name, lineno, snakefile)
         rule = self.get_rule(name)
 
 
 class RuleInfo:
+
     def __init__(self, func):
         self.func = func
         self.shellcmd = None
         self.version = None
         self.log = None
         self.docstring = None
+
+class Subworkflow:
+
+    def __init__(self, workflow, name, snakefile, workdir):
+        self.workflow = workflow
+        self.name = name
+        self._snakefile = snakefile
+        self._workdir = workdir
+        self.targets = set()
+
+    @property
+    def snakefile(self):
+        if self._snakefile is None:
+            return os.path.join(self.workdir, "Snakefile")
+        return os.path.join(self.workflow.basedir, self._snakefile)
+
+    @property
+    def workdir(self):
+        workdir = "." if self._workdir is None else self._workdir
+        return os.path.join(self.workflow.basedir, workdir)
+
+    def target(self, path):
+        self.targets.add(path)
+        return os.path.join(self.workdir, path)

File tests/test01/Snakefile

View file
  • Ignore whitespace
 rule rule2:
 	input: testin
 	output: 'test.inter'
-	message: 'Copying {input[0]} to {output[0]}'
+#	message: 'Copying {input[0]} to {output[0]}'
 	shell: 
 		'''
 		cp {input[0]} {output[0]}

File tests/test05/Snakefile

View file
  • Ignore whitespace
 
 rule compute1:
 	input: '{name}.in'
-	output: inter=['{name}.%s.inter'%c for c in chromosomes]
+	output: inter=expand('{{name}}.{chr}.inter', chr=chromosomes)
 	resources: gpu=1
 	run:
 		assert len(output.inter) > 0

File tests/test14/Snakefile.nonstandard

View file
  • Ignore whitespace
 chromosomes = [1,2,3]
 
+
+localrules: all
+
+
 rule all:
 	input: 'test.predictions', 'test.2.inter2'
 

File tests/test14/qsub.py

View file
  • Ignore whitespace
+#!/usr/bin/env python3
+import sys
+import os
+import random
+
+from snakemake.utils import read_job_properties
+
+jobscript = sys.argv[1]
+job_properties = read_job_properties(jobscript)
+with open("qsub.log", "w") as log:
+    print(job_properties, file=log)
+
+print(random.randint(1, 100))
+os.system("sh {}".format(jobscript))

File tests/test15/Snakefile

View file
  • Ignore whitespace
 	input: test = lambda wildcards: "test2.in" if os.path.exists("test2.in") else "test.in"
 	output: "test.out"
 	shell: "cp {input.test} {output}"
+
+

File tests/test_globwildcards/Snakefile

View file
  • Ignore whitespace
+
+IDS, = glob_wildcards("test.{id}.txt")
+
+
+rule all:
+	input: expand("test.{id}.out", id=IDS)
+
+rule:
+	input: "test.{id}.txt"
+	output: "test.{id}.out"
+	shell: "touch {output}"

File tests/test_globwildcards/expected-results/test.0.out

  • Ignore whitespace
Empty file added.

File tests/test_globwildcards/expected-results/test.1.out

  • Ignore whitespace
Empty file added.

File tests/test_globwildcards/expected-results/test.2.out

  • Ignore whitespace
Empty file added.

File tests/test_globwildcards/test.0.txt

  • Ignore whitespace
Empty file added.

File tests/test_globwildcards/test.1.txt

  • Ignore whitespace
Empty file added.

File tests/test_globwildcards/test.2.txt

  • Ignore whitespace
Empty file added.

File tests/test_subworkflows/Snakefile

View file
  • Ignore whitespace
+
+subworkflow test02:
+    workdir: "../test02"
+
+
+rule:
+    input: test02("test.out")
+    output: "test.out"
+    shell: "cp {input} {output}"

File tests/test_subworkflows/expected-results/test.out

View file
  • Ignore whitespace
+testz0r

File tests/tests.py

View file
  • Ignore whitespace
 
 def test_keyword_list():
 	run(dpath("test_keyword_list"))
+
+def test_subworkflows():
+    run(dpath("test_subworkflows"))
+
+def test_globwildcards():
+    run(dpath("test_globwildcards"))