Commits

Johannes Köster committed 68bba7b

Track params.

  • Participants
  • Parent commits 8765dfe

Comments (0)

Files changed (5)

File bin/snakemake

         help="List all files for which the defined input files have changed "
         "in the Snakefile.")
     parser.add_argument(
+        "--list-params-changes", "--lp", action="store_true",
+        help="List all files for which the defined params have changed "
+        "in the Snakefile.")
+    parser.add_argument(
         "--output-wait", "-w", type=int, default=3,
         help="Wait T seconds if an output file of a job is not present after "
         "the job finished. This helps if your filesystem "
             list_version_changes=args.list_version_changes,
             list_code_changes=args.list_code_changes,
             list_input_changes=args.list_input_changes,
+            list_params_changes=args.list_params_changes,
             summary=args.summary,
             print_compilation=args.print_compilation,
             debug=args.debug

File snakemake/__init__.py

     list_version_changes=False,
     list_code_changes=False,
     list_input_changes=False,
+    list_params_changes=False,
     summary=False,
     output_wait=3,
     print_compilation=False,
             # ignore: if it does not work we can still work without it
             pass
 
-    if not (dryrun or touch or quiet or unlock or listrules):
-        if cluster:
-            logger.warning("Provided cluster nodes: {}".format(cores))
-        else:
-            logger.warning("Provided cores: {}".format(cores))
-
     success = False
     try:
         workflow.include(snakefile, workdir=workdir,
                     list_version_changes=list_version_changes,
                     list_code_changes=list_code_changes,
                     list_input_changes=list_input_changes,
+                    list_params_changes=list_params_changes,
                     summary=summary,
                     output_wait=output_wait,
                     nolock=not lock,

File snakemake/dag.py

                     status = "rule implementation changed"
                 elif self.workflow.persistence.input_changed(job, file=f):
                     status = "set of input files changed"
+                elif self.workflow.persistence.params_changed(job, file=f):
+                    status = "params changed"
                 yield "\t".join((f, date, rule, version, status, pending))
 
     def stats(self):

File snakemake/persistence.py

         self._code = os.path.join(self.path, "code_tracking")
         self._rule = os.path.join(self.path, "rule_tracking")
         self._input = os.path.join(self.path, "input_tracking")
+        self._params = os.path.join(self.path, "params_tracking")
 
-        for d in (self._incomplete, self._version, self._code, self._rule, self._input):
+        for d in (self._incomplete, self._version, self._code, self._rule, self._input, self._params):
             if not os.path.exists(d):
                 os.mkdir(d)
 
         self._delete_record(self._code, path)
         self._delete_record(self._rule, path)
         self._delete_record(self._input, path)
+        self._delete_record(self._params, path)
 
     def started(self, job):
         for f in job.output:
         version = job.rule.version
         code = self.code(job.rule)
         input = self.input(job)
+        params = self.params(job)
         for f in job.expanded_output:
             self._delete_record(self._incomplete, f)
             self._record(self._version, version, f)
             self._record(self._code, code, f, bin=True)
             self._record(self._rule, job.rule.name, f)
             self._record(self._input, input, f)
+            self._record(self._params, params, f)
 
     def cleanup(self, job):
         for f in job.expanded_output:
             self._delete_record(self._code, f)
             self._delete_record(self._rule, f)
             self._delete_record(self._input, f)
+            self._delete_record(self._params, f)
 
     def incomplete(self, job):
         marked_incomplete = partial(self._exists_record, self._incomplete)
             map(lambda f: f.exists and marked_incomplete(f), job.output))
 
     def version(self, path):
+        if not os.path.exists(path):
+            return None
         return self._read_record(self._version, path)
 
     def rule(self, path):
+        if not os.path.exists(path):
+            return None
         return self._read_record(self._rule, path)
 
     def version_changed(self, job, file=None):
         if file is None:
             return cr(*job.output)
         else:
-            return next(cr(file))
+            return bool(list(cr(file)))
 
     def code_changed(self, job, file=None):
         cr = partial(self._changed_records, self._code, self.code(job.rule), bin=True)
         if file is None:
             return cr(*job.output)
         else:
-            return next(cr(file))
+            return bool(list(cr(file)))
 
     def input_changed(self, job, file=None):
         cr = partial(self._changed_records, self._input, self.input(job))
         if file is None:
             return cr(*job.output)
         else:
-            return next(cr(file))
+            return bool(list(cr(file)))
+
+    def params_changed(self, job, file=None):
+        cr = partial(self._changed_records, self._params, self.params(job))
+        if file is None:
+            return cr(*job.output)
+        else:
+            return bool(list(cr(file)))
 
     def noop(self, *args):
         pass
         return "\n".join(sorted(job.input))
 
     @lru_cache()
+    def params(self, job):
+        return "\n".join(sorted(job.params))
+
+    @lru_cache()
     def output(self, job):
         return sorted(job.output)
 

File snakemake/workflow.py

         workdir=None,
         stats=None, force_incomplete=False, ignore_incomplete=False,
         list_version_changes=False, list_code_changes=False,
-        list_input_changes=False,
+        list_input_changes=False, list_params_changes=False,
         summary=False, output_wait=3, nolock=False, unlock=False):
 
         def rules(items):
             if items:
                 print(*items, sep="\n")
             return True
+        elif list_params_changes:
+            items = list(chain(
+                *map(self.persistence.params_changed, dag.jobs)))
+            if items:
+                print(*items, sep="\n")
+            return True
 
         scheduler = JobScheduler(
             self, dag, cores, dryrun=dryrun, touch=touch, cluster=cluster,
             quiet=quiet, keepgoing=keepgoing,
             printreason=printreason, printshellcmds=printshellcmds,
             output_wait=output_wait)
+
+        if not dryrun and not quiet:
+            if cluster:
+                logger.warning("Provided cluster nodes: {}".format(cores))
+            else:
+                logger.warning("Provided cores: {}".format(cores))
+
         success = scheduler.schedule()
 
         if success: