Commits

Johannes Köster committed 132e995

Implemented issue #43. Beginning of an exception refactoring.

Comments (0)

Files changed (8)

             "$ snakemake --cluster 'sbatch --dependency {dependencies}.\n"
             "Assuming that your submit script (here sbatch) outputs the generated job id to the first stdout line, {dependencies} will be filled with space separated job ids this job depends on."))
     parser.add_argument(
+        "--jobscript", "--js",
+        help="Provide a custom job script for submission to the cluster. "
+            "The default script resides as 'jobscript.sh' in the "
+            "installation directory.")
+    parser.add_argument(
         "--reason", "-r", action = "store_true",
         help="Print the reason for each executed rule.")
     parser.add_argument(
             list_params_changes=args.list_params_changes,
             summary=args.summary,
             print_compilation=args.print_compilation,
-            debug=args.debug
+            debug=args.debug,
+            jobscript=args.jobscript
             )
     exit(0 if success else 1)
 

snakemake/__init__.py

     output_wait=3,
     print_compilation=False,
     debug=False,
-    notemp=False):
+    notemp=False,
+    jobscript=None):
     """
     Run snakemake on a given snakefile.
     Note: at the moment, this function is not thread-safe!
 
     if workdir:
         olddir = os.getcwd()
-    workflow = Workflow(snakefile=snakefile, snakemakepath=snakemakepath)
+    workflow = Workflow(
+        snakefile=snakefile, snakemakepath=snakemakepath,
+        jobscript=jobscript)
 
     if standalone:
         try:

snakemake/exceptions.py

                 logger.critical(format_error(
                     e, e.lineno, linemaps=linemaps, snakefile=e.filename,
                     show_traceback=print_traceback))
+    elif isinstance(ex, WorkflowError):
+        logger.critical(
+            format_error(
+                ex, ex.lineno, linemaps=linemaps, snakefile=e.filename,
+                show_traceback=print_traceback))
     elif isinstance(ex, KeyboardInterrupt):
         logger.warning("Cancelling snakemake on user request.")
     else:
         traceback.print_exception(type(ex), ex, ex.__traceback__)
 
 
-class WildcardError(Exception):
+class WorkflowError(Exception):
+
+    def __init__(self, *args, lineno=None, snakefile=None, rule=None):
+        super().__init__("\n".join(map(str, args)))
+        if rule is not None:
+            self.lineno = rule.lineno
+            self.snakefile = rule.snakefile
+        self.lineno = lineno
+        self.snakefile = snakefile
+
+
+class WildcardError(WorkflowError):
     pass
 
 

snakemake/executors.py

 
 class ClusterExecutor(RealExecutor):
 
-    jobscript = textwrap.dedent("""\
-        #!/bin/sh
-        #rule: {job}
-        #input: {job.input}
-        #output: {job.output}
-        {self.workflow.snakemakepath} --snakefile {self.workflow.snakefile} \
-        --force -j{self.cores} \
-        --directory {workdir} --nocolor --notemp --quiet --nolock {job.output} \
-        > /dev/null && touch "{jobfinished}" || touch "{jobfailed}"
-        exit 0
-        """)
-
     def __init__(
         self, workflow, dag, cores, submitcmd="qsub",
         printreason=False, quiet=False, printshellcmds=False, output_wait=3):
             raise ValueError(
             "Cluster executor needs to know the path "
             "to the snakemake binary.")
+
+        jobscript = workflow.jobscript
+        if jobscript is None:
+            jobscript = os.path.join(
+                os.path.dirname(__file__),
+                'jobscript.sh')
+        try:
+            with open(jobscript) as f:
+                self.jobscript = f.read()
+        except IOError as e:
+            raise WorkflowError(e)
+
         self.submitcmd = submitcmd
         self.threads = []
         self._tmpdir = None
 import stat
 from itertools import product
 from collections import Iterable
-from snakemake.exceptions import MissingOutputException, WildcardError
+from snakemake.exceptions import MissingOutputException, WorkflowError
 
 __author__ = "Johannes Köster"
 
                 if fill_missing:
                     return dynamic_fill
                 else:
-                    raise WildcardError(str(ex))
+                    raise WorkflowError(ex)
 
         return re.sub(_wildcard_regex, format_match, pattern)
 

snakemake/jobscript.sh

+#!/bin/sh
+#rule: {job}
+#input: {job.input}
+#output: {job.output}
+{self.workflow.snakemakepath} --snakefile {self.workflow.snakefile} \
+--force -j{self.cores} \
+--directory {workdir} --nocolor --notemp --quiet --nolock {job.output} \
+> /dev/null && touch "{jobfinished}" || touch "{jobfailed}"
+exit 0

snakemake/shell.py

     STDOUT = None
 
 
-class shell(sp.Popen):
+class shell:
     _process_args = {}
     _process_prefix = ""
 
     def __new__(
         cls, cmd, *args, async=False, iterable=False, read=False, **kwargs):
         cmd = format(cmd, *args, stepout=2, **kwargs)
-        stdout = sp.PIPE if iterable or async else STDOUT
+        stdout = sp.PIPE if iterable or async or read else STDOUT
         proc = sp.Popen(cls._process_prefix + cmd, shell=True, stdout=stdout,
             close_fds=True, **cls._process_args)
 

snakemake/workflow.py

 
 
 class Workflow:
-    def __init__(self, snakefile=None, snakemakepath=None):
+    def __init__(self, snakefile=None, snakemakepath=None, jobscript=None):
         """
         Create the controller.
         """
         self.rule_count = 0
         self.snakefile = snakefile
         self.snakemakepath = os.path.abspath(snakemakepath)
+        self.jobscript = jobscript
         self.persistence = None
         self.resources = None
         self.globals = globals()