Commits

Johannes Köster committed 8765dfe

Track input file changes in persistence module.

  • Participants
  • Parent commits 4bddd98

Comments (0)

Files changed (5)

         "a different version (as determined by the version keyword).")
     parser.add_argument(
         "--list-code-changes", "--lc", action="store_true",
-        help="List all files for which the rule body (run or shell) changed "
+        help="List all files for which the rule body (run or shell) have changed "
+        "in the Snakefile.")
+    parser.add_argument(
+        "--list-input-changes", "--li", action="store_true",
+        help="List all files for which the defined input files have changed "
         "in the Snakefile.")
     parser.add_argument(
         "--output-wait", "-w", type=int, default=3,
             ignore_incomplete=args.ignore_incomplete,
             list_version_changes=args.list_version_changes,
             list_code_changes=args.list_code_changes,
+            list_input_changes=args.list_input_changes,
             summary=args.summary,
             print_compilation=args.print_compilation,
             debug=args.debug

snakemake/__init__.py

     ignore_incomplete=False,
     list_version_changes=False,
     list_code_changes=False,
+    list_input_changes=False,
     summary=False,
     output_wait=3,
     print_compilation=False,
                     ignore_incomplete=ignore_incomplete,
                     list_version_changes=list_version_changes,
                     list_code_changes=list_code_changes,
+                    list_input_changes=list_input_changes,
                     summary=summary,
                     output_wait=output_wait,
                     nolock=not lock,
                     status = "version changed to {}".format(job.rule.version)
                 elif self.workflow.persistence.code_changed(job, file=f):
                     status = "rule implementation changed"
+                elif self.workflow.persistence.input_changed(job, file=f):
+                    status = "set of input files changed"
                 yield "\t".join((f, date, rule, version, status, pending))
 
     def stats(self):

snakemake/persistence.py

         self._version = os.path.join(self.path, "version_tracking")
         self._code = os.path.join(self.path, "code_tracking")
         self._rule = os.path.join(self.path, "rule_tracking")
+        self._input = os.path.join(self.path, "input_tracking")
 
-        for d in (self._incomplete, self._version, self._code, self._rule):
+        for d in (self._incomplete, self._version, self._code, self._rule, self._input):
             if not os.path.exists(d):
                 os.mkdir(d)
 
         self._delete_record(self._version, path)
         self._delete_record(self._code, path)
         self._delete_record(self._rule, path)
+        self._delete_record(self._input, path)
 
     def started(self, job):
         for f in job.output:
     def finished(self, job):
         version = job.rule.version
         code = self.code(job.rule)
+        input = self.input(job)
         for f in job.expanded_output:
             self._delete_record(self._incomplete, f)
             self._record(self._version, version, f)
             self._record(self._code, code, f, bin=True)
             self._record(self._rule, job.rule.name, f)
+            self._record(self._input, input, f)
 
     def cleanup(self, job):
         for f in job.expanded_output:
             self._delete_record(self._version, f)
             self._delete_record(self._code, f)
             self._delete_record(self._rule, f)
+            self._delete_record(self._input, f)
 
     def incomplete(self, job):
         marked_incomplete = partial(self._exists_record, self._incomplete)
         return self._read_record(self._rule, path)
 
     def version_changed(self, job, file=None):
-        if file is not None:
-            return (self._exists_record(self._version, file)
-                and not self._equals_record(
-                    self._version, job.rule.version, file))
-        return filterfalse(
-            partial(self._equals_record, self._version, job.rule.version),
-            job.output)
+        cr = partial(self._changed_records, self._version, job.rule.version)
+        if file is None:
+            return cr(*job.output)
+        else:
+            return next(cr(file))
 
     def code_changed(self, job, file=None):
-        if file is not None:
-            return (self._exists_record(self._code, file)
-                and not self._equals_record(
-                    self._code, self.code(job.rule), file, bin=True))
-        return filterfalse(
-            partial(
-                self._equals_record, self._code,
-                self.code(job.rule), bin=True),
-            job.output)
+        cr = partial(self._changed_records, self._code, self.code(job.rule), bin=True)
+        if file is None:
+            return cr(*job.output)
+        else:
+            return next(cr(file))
+
+    def input_changed(self, job, file=None):
+        cr = partial(self._changed_records, self._input, self.input(job))
+        if file is None:
+            return cr(*job.output)
+        else:
+            return next(cr(file))
 
     def noop(self, *args):
         pass
     def code(self, rule):
         return marshal.dumps(rule.run_func.__code__)
 
+    @lru_cache()
+    def input(self, job):
+        return "\n".join(sorted(job.input))
+
+    @lru_cache()
+    def output(self, job):
+        return sorted(job.output)
+
     def _record(self, subject, value, id, bin=False):
         if value is not None:
             with open(
             "rb" if bin else "r") as f:
             return f.read()
 
+    def _changed_records(self, subject, value, *ids, bin=False):
+        equals = partial(self._equals_record, subject, value, bin=bin)
+        return filter(
+            lambda id: self._exists_record(subject, id) and not equals(id),
+            ids)
+
     def _equals_record(self, subject, value, id, bin=False):
         return self._read_record(subject, id, bin=bin) == value
 

snakemake/workflow.py

         workdir=None,
         stats=None, force_incomplete=False, ignore_incomplete=False,
         list_version_changes=False, list_code_changes=False,
+        list_input_changes=False,
         summary=False, output_wait=3, nolock=False, unlock=False):
 
         def rules(items):
             items = list(chain(
                 *map(self.persistence.version_changed, dag.jobs)))
             if items:
-                print(items, sep="\n")
+                print(*items, sep="\n")
             return True
         elif list_code_changes:
             items = list(chain(
                 *map(self.persistence.code_changed, dag.jobs)))
             if items:
-                print(items, sep="\n")
+                print(*items, sep="\n")
+            return True
+        elif list_input_changes:
+            items = list(chain(
+                *map(self.persistence.input_changed, dag.jobs)))
+            if items:
+                print(*items, sep="\n")
             return True
 
         scheduler = JobScheduler(