Commits

Johannes Köster committed 1f0e29a

Report jobid in jobscript. Mention jobid and path to jobscript in
ClusterJobExcpetion. Fixed exit code of snakemake in case of error (this
should also fix recent hangups in a cluster environment.

  • Participants
  • Parent commits 68bce44

Comments (0)

Files changed (8)

File bin/snakemake

 if __name__ == "__main__":
     #import cProfile
     #cProfile.run('main()', "snakemake.profile")
-    main()
+    snakemakepath = os.path.realpath(__file__)
+    sys.exit(main(snakemakepath=snakemakepath))

File snakemake/__init__.py

     return resources
 
 
-def main():
+def main(snakemakepath=None):
     parser = argparse.ArgumentParser(
         description="Snakemake is a Python based language and execution "
             "environment for GNU Make-like workflows.")
 
     args = parser.parse_args()
 
-    snakemakepath = os.path.realpath(__file__)
 
     try:
         resources = parse_resources(args)

File snakemake/dag.py

         self.prioritytargetjobs = set()
         self._ready_jobs = set()
         self.notemp = notemp
+        self._jobid = dict()
 
         self.forcerules = set()
         self.forcefiles = set()
             logger.warning("Removing temporary output file {}".format(f))
             f.remove()
 
+    def jobid(self, job):
+        if job not in self._jobid:
+            self._jobid[job] = len(self._jobid)
+        return self._jobid[job]
+
     def update(self, jobs, file=None, visited=None, skip_until_dynamic=False):
         """ Update the DAG by adding given jobs and their dependencies. """
         if visited is None:
         if dag is None:
             dag = self.dependencies
         jobs = set(jobs)
-        jobid = dict((job, i) for i, job in enumerate(jobs))
 
         huefactor = 2 / (3 * (len(self.rules) - 1))
         rulecolor = dict(
             used_types.add(t)
 
             nodes.append('\t{}[label = "{}", color="{}", {}];'.format(
-                jobid[job], format_label(job), rulecolor[job.rule], format_node(t)))
+                self.jobid(job), format_label(job), rulecolor[job.rule], format_node(t)))
 
-            deps = set(map(jobid.__getitem__, dag[job]))
-            job = jobid[job]
+            deps = set(map(self.jobid, dag[job]))
+            job = self.jobid(job)
             for dep in deps:
                 edges.append("\t{} -> {};".format(dep, job))
 
                 for t in used_types:
                     legend.append('\tlegend{}[label="{}", {}];'.format(
                         t, types[t], styles[t]))
-                    for target in map(jobid.__getitem__, self.targetjobs):
+                    for target in map(self.jobid, self.targetjobs):
                         legend.append(
                             "\t{} -> legend{}[style=invis];".format(target, t))
 

File snakemake/exceptions.py

 
 
 class ClusterJobException(RuleException):
-    def __init__(self, job):
+    def __init__(self, job, jobid, jobscript):
         super().__init__(
-            "Error executing rule {} on cluster. "
-            "For detailed error see the cluster log.".format(job.rule.name),
+            "Error executing rule {} on cluster (jobid: {}, jobscript: {}). "
+            "For detailed error see the cluster log.".format(job.rule.name, jobid, jobscript),
             lineno=job.rule.lineno, snakefile=job.rule.snakefile)
 
 

File snakemake/executors.py

 from snakemake.utils import format, Unformattable
 from snakemake.exceptions import print_exception, get_exception_origin
 from snakemake.exceptions import format_error, RuleException
-from snakemake.exceptions import ClusterJobException, ProtectedOutputException
+from snakemake.exceptions import ClusterJobException, ProtectedOutputException, WorkflowError
 
 
 class AbstractExecutor:
         self.threads = []
         self._tmpdir = None
         self.cores = cores if cores else ""
-        self.jobid = dict()
+        self.external_jobid = dict()
 
     def shutdown(self):
         for thread in self.threads:
         self, job, callback=None, submit_callback=None, error_callback=None):
         super()._run(job)
         workdir = os.getcwd()
-        jobid = len(self.threads)
+        jobid = self.dag.jobid(job)
 
-        jobscript = os.path.join(self.tmpdir, "snakemake-job.{}.sh".format(jobid))
+        jobscript = self.get_jobscript(job)
         jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid))
         jobfailed = os.path.join(self.tmpdir, "{}.jobfailed".format(jobid))
         with open(jobscript, "w") as f:
-            print(format(self.jobscript), file=f)
+            print(format(self.jobscript, workflow=self.workflow, cores=self.cores), file=f)
         os.chmod(jobscript, os.stat(jobscript).st_mode | stat.S_IXUSR)
 
-        deps = " ".join(
-            (self.jobid[f] if f in self.jobid else "") for f in job.input)
+        deps = " ".join(self.external_jobid[f] for f in job.input if f in self.external_jobid)
         submitcmd = job.format_wildcards(self.submitcmd, dependencies=deps)
-        jobid = subprocess.check_output(
+        ext_jobid = subprocess.check_output(
             '{submitcmd} "{jobscript}"'.format(
                 submitcmd=submitcmd,
                 jobscript=jobscript),
             shell=True).decode().split("\n")
-        if jobid and jobid[0]:
-            jobid = jobid[0]
-            self.jobid.update((f, jobid) for f in job.output)
-            logger.debug("Submitted job with jobid {}.".format(jobid))
+        if ext_jobid and ext_jobid[0]:
+            ext_jobid = ext_jobid[0]
+            self.external_jobid.update((f, ext_jobid) for f in job.output)
+            logger.debug("Submitted job {} with external jobid {}.".format(jobid, ext_jobid))
 
         thread = threading.Thread(
             target=self._wait_for_job,
                 os.remove(jobfailed)
                 os.remove(jobscript)
                 print_exception(
-                    ClusterJobException(job), self.workflow.linemaps)
+                    ClusterJobException(job, self.dag.jobid(job), self.get_jobscript(job)), self.workflow.linemaps)
                 error_callback(job)
                 return
             time.sleep(1)
                     break
         return os.path.abspath(self._tmpdir)
 
+    def get_jobscript(self, job):
+        return os.path.join(self.tmpdir, "{}.snakemake-job.sh".format(self.dag.jobid(job)))
+
+
 
 def run_wrapper(run, input, output, params, wildcards, threads, resources, log, linemaps):
     """

File snakemake/jobscript.sh

 #rule: {job}
 #input: {job.input}
 #output: {job.output}
-{self.workflow.snakemakepath} --snakefile {self.workflow.snakefile} \
---force -j{self.cores} \
+#jobid: {jobid}
+{workflow.snakemakepath} --snakefile {workflow.snakefile} \
+--force -j{cores} \
 --directory {workdir} --nocolor --notemp --quiet --nolock {job.output} \
-> /dev/null && touch "{jobfinished}" || touch "{jobfailed}"
+&& touch "{jobfinished}" || touch "{jobfailed}"
 exit 0

File tests/test14/Snakefile.nonstandard

 rule compute2:
 	input: '{name}.{chromosome}.inter'
 	output: '{name}.{chromosome}.inter2'
-	shell: 'echo copy; cp {input[0]} {output[0]}'
+	shell: 'echo copy; cp {input[0]} {output[0]}; exit 1'
 
 rule gather:
 	input: ['{name}.%s.inter2'%c for c in chromosomes]

File tests/test14/qsub

 tail -n1 $1 >> qsub.log
 # simulate printing of job id by a random number
 echo $RANDOM
-$1
+sh $1