Commits

Johannes Köster committed f5bbe72

Allow wildcards in subworkflow targets.

Comments (0)

Files changed (6)

snakemake/__init__.py

                         nodeps=nodeps,
                         jobscript=jobscript,
                         timestamp=timestamp)
-                    for subworkflow in workflow.subworkflows:
-                        logger.warning("Executing subworkflow {}.".format(subworkflow.name))
-                        if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow.targets):
-                            success = False
-                    if workflow.subworkflows:
-                        logger.warning("Executing main workflow.")
-                if success:
                     success = workflow.execute(
                         targets=targets, dryrun=dryrun, touch=touch,
                         cores=cores, forcetargets=forcetargets,
                         resources=resources,
                         notemp=notemp,
                         nodeps=nodeps,
-                        cleanup_metadata=cleanup_metadata
+                        cleanup_metadata=cleanup_metadata,
+                        subsnakemake=subsnakemake
                         )
 
     except (Exception, BaseException) as ex:
         # if user specified it twice
         file2jobs = self.file2jobs
         for file in set(job.input):
+            # omit the file if it comes from a subworkflow
+            if file in job.subworkflow_input:
+                continue
             try:
                 jobs = self.file2jobs(file)
                 dependencies[file].extend(jobs)
 
 class AnnotatedString(str):
     def __init__(self, value):
-        self.flags = set()
+        self.flags = dict()
 
 
-def flag(value, flag):
+def flag(value, flag, flag_value=True):
     if isinstance(value, AnnotatedString):
-        value.flags.add(flag)
+        value.flags[flag] = flag_value
         return value
     if not_iterable(value):
         value = AnnotatedString(value)
-        value.flags.add(flag)
+        value.flags[flag] = flag_value
         return value
     return [flag(v, flag) for v in value]
 

snakemake/jobs.py

 
         self.dynamic_output, self.dynamic_input = set(), set()
         self.temp_output, self.protected_output = set(), set()
+        self.subworkflow_input = dict()
         for f in self.output:
             f_ = self.ruleio[f]
             if f_ in self.rule.dynamic_output:
             if f_ in self.rule.protected_output:
                 self.protected_output.add(f)
         for f in self.input:
-            if self.ruleio[f] in self.rule.dynamic_input:
+            f_ = self.ruleio[f]
+            if f_ in self.rule.dynamic_input:
                 self.dynamic_input.add(f)
+            if f_ in self.rule.subworkflow_input:
+                self.subworkflow_input[f] = self.rule.subworkflow_input[f_]
         self._hash = self.rule.__hash__()
         if not self.dynamic_output:
             for o in self.output:
     @property
     def missing_input(self):
         """ Return missing input files. """
-        return set(f for f in self.input if not f.exists)
+        # omit file if it comes from a subworkflow
+        return set(f for f in self.input if not f.exists and not f in self.subworkflow_input)
 
     @property
     def output_mintime(self):

snakemake/rules.py

             self.dynamic_input = set()
             self.temp_output = set()
             self.protected_output = set()
+            self.subworkflow_input = dict()
             self.resources = dict(_cores=1)
             self.priority = 1
             self.version = None
             self.dynamic_input = other.dynamic_input
             self.temp_output = other.temp_output
             self.protected_output = other.protected_output
+            self.subworkflow_input = other.subworkflow_input
             self.resources = other.resources
             self.priority = other.priority
             self.version = other.version
                     self.dynamic_output.add(_item)
                 else:
                     self.dynamic_input.add(_item)
+            if is_flagged(item, "subworkflow"):
+                if output:
+                    raise SyntaxError("Only input files may refer to a subworkflow")
+                else:
+                    # record the workflow this item comes from
+                    self.subworkflow_input[_item] = item.flags["subworkflow"]
             inoutput.append(_item)
             if name:
                 inoutput.add_name(name)

snakemake/workflow.py

 from snakemake.dag import DAG
 from snakemake.scheduler import JobScheduler
 from snakemake.parser import parse
-from snakemake.io import protected, temp, temporary, expand, dynamic, glob_wildcards, contains_wildcard, not_iterable
+from snakemake.io import protected, temp, temporary, expand, dynamic, glob_wildcards, flag, not_iterable
 from snakemake.persistence import Persistence
 
 
         list_input_changes=False, list_params_changes=False,
         summary=False, output_wait=3, nolock=False, unlock=False,
         resources=None, notemp=False, nodeps=False,
-        cleanup_metadata=None):
+        cleanup_metadata=None, subsnakemake=None):
 
         self.global_resources = dict() if cluster or resources is None else resources
         self.global_resources["_cores"] = cores
                 "the --unlock argument.".format(os.getcwd()))
             return False
 
+        # execute subworkflows
+        for subworkflow in self.subworkflows:
+            logger.warning("Executing subworkflow {}.".format(subworkflow.name))
+            if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow.targets(dag)):
+                return False
+        if self.subworkflows:
+            logger.warning("Executing main workflow.")
+
         dag.check_incomplete()
         dag.postprocess()
 
         self.name = name
         self._snakefile = snakefile
         self._workdir = workdir
-        self.targets = set()
 
     @property
     def snakefile(self):
 
     def target(self, paths):
         if not_iterable(paths):
-            paths = [paths]
-        for path in paths:
-            if contains_wildcard(path):
-                raise SyntaxError("No wildcards allowed in subworkflow target. Please refer only to concrete files from the subworkflow.")
-            self.targets.add(path)
-        if len(paths) == 1:
-            return os.path.join(self.workdir, path)
-        return [os.path.join(self.workdir, path) for path in paths]
+            return flag(os.path.join(self.workdir, paths), "subworkflow", self)
+        return [self.target(path) for path in paths]
+
+    def targets(self, dag):
+        return [f for job in dag.jobs for f in job.subworkflow_input if job.subworkflow_input[f] is self]