Clustering jobs with snakemake

Issue #28 resolved
Unknown user 264a1 created an issue

When jobs are scheduled using snakemake with the --cluster command, snakemake waits until the output files are created before continuing with the next job. The scheduler that we use at our cluster (sbatch) allows one to set dependencies on other jobs when launching them. The advantage of scheduling jobs this way is that your job gets run immediately after the job it is dependent on. With snakemake this kind of scheduling is not possible because it waits for the output files and the next job is scheduled at the end of the current queue again. Is it possible to have some option to make snakemake with --cluster not wait on output of inbetween jobs but run the cluster script immediately? I can keep track myself of which files are generated by what jobid to perform the proper scheduling.

Comments (30)

  1. Unknown user 22efb

    Hi Ino, So for your request, I would simply need to write a "scheduler" that just passes all jobs directly to the cluster. Of course you somehow need to redo the work of snakemake when you want to determine the dependencies on your own then. Is the higher position in the queue really worth it?

    Maybe we could also find a generic way for snakemake to submit the dependencies automatically.

    • let's assume snakemake retrieves the jobid from the stdout of the provided sbatch/qsub script
    • then the scheduler could immediately execute all jobs the dependencies of which are already submitted but not necessarily executed on the cluster, thereby making the jobids of dependencies accessible in the provided cluster script, e.g. via $ snakemake --immediate-submit --cluster "sbatch --dependency {dependencies}".
    • for SGE this would work, too: $ snakemake --immediate-submit --cluster "qsub -hold_jid {dependencies}"
  2. Unknown user 264a1 reporter

    That would be an excellent solution. For me the higher position in the queue is really worth it since I write pipelines which steps vary a lot in resource usage. I don't want to hold all resources the entire time. One program in the pipeline might be heavily parallelized and runs in a couple of hours on a thousand cores, whereas another program might only run on a single node with eight cores for a day. The heavy program might be at the end of the pipeline and with queueing time being the main bottleneck at our cluster it could take a couple of days to acquire enough queueing time to be able to run the job. If I can already acquire that queueing time while running other jobs it results in a great speed-up.

  3. Inti Pedroso

    Hi, perhaps it is possible to construct job names using the wildcards used by snakemake, which could be something like "__".join((rulle_name,[all_wildards ] ) ) (that should make it unique I think). In that way one can submit all jobs and establish the dependencies directly without needing to capture the job ids. Job names are passed to SGE (at least) in the same way than jobids. If snakemake needs to stop the workflow it can use the job names to delete the jobs from the queue.

  4. Unknown user 264a1 reporter

    That works for sbatch as well. The dependencies still need to be clustered with the jobids but the jobids can be retrieved in a clustering script by supplying the job name to squeue. You would need to use the full path names to files in the job name in order to allow for running the same pipeline on different input files with the same name. Or create some other form of namespace. One advantage of this approach is that you don't reschedule jobs that have already been scheduled in previous runs of snakemake.

  5. Unknown user 264a1 reporter

    Hi, just for anybody that is interested. I have made something that does this for GNU make: https://github.com/inodb/gnu-make-job-scheduler. Each rule is scheduled separately with proper dependencies on other rules, works for sbatch and qsub. It is not that robust but it gets the job done. As I explained above, in my situation where resource usage varies a lot between rules and the queueing time is the bottleneck, it can be quite useful. In most other cases I prefer using snakemake over GNU make.

  6. Inti Pedroso

    Hi, any plan for this feature to be implemented on SnakeMake. I just want to know whether it is something it could happen any time soon. BW, Inti

  7. Unknown user 22efb

    Hi, thanks for reminding :-). I just implemented it this morning in commit 6490ef3. It works like I described in my first comment. As I do not have access to a cluster right now, would you two, Inti and Ino like to test it with your setups? You have to make sure that your submit script outputs the jobid to the first line of stdout.

    Thanks, Johannes

  8. Unknown user 264a1 reporter

    Hi,

    Thanks for implementing this! Don't have much time at the moment as well, on my way out for holiday. Using a string for --cluster doesn't seem to work?

    snakemake --immediate-submit --cluster './../Snakefile-sbatch.py {dependencies}' all MissingRuleException: No rule to produce {dependencies}.

    snakemake --immediate-submit --cluster 'echo hi' all MissingRuleException: No rule to produce hi.

    Subject: Re: [johanneskoester/snakemake] Clustering jobs with snakemake (issue #28)

  9. Unknown user 22efb

    Hi, strange. For me, bash puts the command after cluster into one argument if I put it in quotes. What shell (and version) do you use?

  10. Unknown user 264a1 reporter

    Kind of forgot about this over the holiday. The cluster command does work now, it was just because I had a wrapper script called snakemake which called the actual snakemake script after loading the snakemake environment so the quotes were removed by the wrapper script. With that problem solved I took another look.

    I unfortunately don't have a minimal working example, this is what I am working on: https://github.com/inodb/snakemake-parallel-bwa

    The idea is to split up the given paired reads and align each separately with bwa. Then merge resulting bam files back into one. All jobs, except the small ones are submitted with sbatch - see Snakefile-sbatch.py.

    Somehow the first job goes okay - the splitting - snakemake waits till the job submitted with sbatch finishes. This is a dynamic rule so maybe that's the difference. For the second job, creating the burrows-wheeler transformation of the reference file, snakemake quits immediately with:

    $ cd test
    $ snakemake -j 99 --immediate-submit --cluster './../Snakefile-sbatch.py {dependencies}' all
    Provided cluster nodes: 99
    Job counts:
            count   jobs
            2       sort
            2       aln
            1       index
            1       all
            1       bwt
            1       split1
            1       merge
            1       coverage
            1       split2
            1       removeduplicates
            1       sampe
            1       samtobam
            14
    rule split2:
            input: pair2.fastq
            output: split-out/pair2.fastq.split.{splitid} (dynamic)
    Subsequent jobs will be added dynamically depending on the output of this rule
    rule split1:
            input: pair1.fastq
            output: split-out/pair1.fastq.split.{splitid} (dynamic)
    Subsequent jobs will be added dynamically depending on the output of this rule
    Dynamically updating jobs
    1 of 13 steps (8%) done
    rule bwt:
            input: ref.fa
            output: ref.fa.bwt
            log: ref.fa.bwt.sm.log
    Dynamically updating jobs
    2 of 14 steps (14%) done
    Traceback (most recent call last):
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/__init__.py", line 124, in snakemake
        cleanup_metadata=cleanup_metadata
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/workflow.py", line 239, in execute
        success = scheduler.schedule()
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 135, in schedule
        run = self.job_selector(needrun)
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 230, in job_selector
        c = list(map(partial(self.rule_reward, jobs=jobs), rules))  # matrix of cumulative rewards over jobs
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 290, in rule_reward
        cumsum(map(operator.attrgetter("inputsize"), jobs)))
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 320, in cumsum
        return list(chain(zero, accumulate(iterable)))
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/jobs.py", line 78, in inputsize
        self._inputsize = sum(map(os.path.getsize, self.input))
      File "/bubo/home/h16/inod/lib/python3.2/genericpath.py", line 49, in getsize
        return os.stat(filename).st_size
    OSError: [Errno 2] No such file or directory: 'ref.fa.bwt'
    

    The bwt job is still running after that. If I wait till the job's ended successfully, the same thing happens for the next rule - aln -. It would thus seem snakemake is not waiting for the job to finish unless it is a dynamic rule.

  11. Unknown user 22efb

    Hi Ino, strangely I cannot reproduce this with your file. for me it works fine, and all jobs correctly wait for their input. Did you use the latest version? If yes, could you provide the output with the --debug flag enabled?

    Best, Johannes

  12. Unknown user 264a1 reporter

    Hi,

    There you go:

    $ snakemake -j 99 --debug --immediate-submit --cluster './../Snakefile-sbatch.py {dependencies}' all
    Provided cluster nodes: 99
    Job counts:
            count   jobs
            2       sort
            2       aln
            1       index
            1       all
            1       bwt
            1       split1
            1       merge
            1       coverage
            1       split2
            1       removeduplicates
            1       sampe
            1       samtobam
            14
    Ready jobs:
            split2
            split1
    Selected jobs:
            split2
            split1
    rule split2:
            input: pair2.fastq
            output: split-out/pair2.fastq.split.{splitid} (dynamic)
    Subsequent jobs will be added dynamically depending on the output of this rule
    Submitted job with jobid 3946343.
    rule split1:
            input: pair1.fastq
            output: split-out/pair1.fastq.split.{splitid} (dynamic)
    Subsequent jobs will be added dynamically depending on the output of this rule
    Submitted job with jobid 3946344.
    Dynamically updating jobs
    Updating job merge.
    1 of 14 steps (7%) done
    Ready jobs:
            bwt
    Selected jobs:
            bwt
    rule bwt:
            input: ref.fa
            output: ref.fa.bwt
            log: ref.fa.bwt.sm.log
    Dynamically updating jobs
    2 of 14 steps (14%) done
    Submitted job with jobid 3946345.
    Ready jobs:
            aln
            aln
    Traceback (most recent call last):
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/__init__.py", line 124, in snakemake
        cleanup_metadata=cleanup_metadata
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/workflow.py", line 239, in execute
        success = scheduler.schedule()
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 135, in schedule
        run = self.job_selector(needrun)
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 230, in job_selector
        c = list(map(partial(self.rule_reward, jobs=jobs), rules))  # matrix of cumulative rewards over jobs
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 290, in rule_reward
        cumsum(map(operator.attrgetter("inputsize"), jobs)))
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 320, in cumsum
        return list(chain(zero, accumulate(iterable)))
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/jobs.py", line 78, in inputsize
        self._inputsize = sum(map(os.path.getsize, self.input))
      File "/bubo/home/h16/inod/lib/python3.2/genericpath.py", line 49, in getsize
        return os.stat(filename).st_size
    OSError: [Errno 2] No such file or directory: 'ref.fa.bwt'
    unlocking
    removing lock
    removing lock
    removed all locks
    

    I'm using the latest version from the repository:

    $ git log -1
    commit d4c30222f9f6e0357118b3fd81449e5299173d98
    Author: Johannes Köster <johannes.koester@tu-dortmund.de>
    Date:   Mon Aug 12 12:14:32 2013 +0200
    
        Enable the printing of tracebacks.
    
  13. Unknown user 22efb

    Hi Ino, I just noticed that you use immediate-submit. Seems as if that is causing the bug. So to run your workflow for now until I fixed it, you can leave it out, and everything should work correctly. Unfortunately I could not really test immediate-submit so far, but thanks to Sean Davis and the NIH I will very soon be able to debug it on their cluster.

  14. Unknown user 264a1 reporter

    Hi Johannes, I have used it that way before, but the problem with that is I have to queue for ages in between rule execution, which is why the --immediate-submit option seemed like a great alternative (the enhancement requested in this issue). I'll wait then until you have tested it yourself. Thanks for looking into it!

  15. Unknown user 22efb

    Hi Ino, sorry for letting you wait so long. I looked again at the problem. I just fixed the reason for the error in commit c35a04d I think. With --immediate-submit, Snakemake does not care about dependencies (except for dynamic jobs), but the scheduler still wanted to calculate input file sizes. This is now fixed. Please try and see if that fixed your issue.

  16. Unknown user 264a1 reporter

    Hi Johannes, I've tried it but now my clustering script is throwing an error. There's no problem when I run the actual command e.g.

    ./../Snakefile-sbatch.py  "/gulo/glob/inod/github/snakemake-parallel-bwa/test/.snakemake.tmp.8S1VHC/13.snakemake-job.sh"
    

    But apparently there is one from running it within snakemake. Is it possible to change the error catching in snakemake when running the cluster script such that it prints the error from the cluster script itself as well? This would make debugging of the cluster script easier. With the --debug flag it only gives me:

    Traceback (most recent call last):
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/__init__.py", line 133, in snakemake
        cleanup_metadata=cleanup_metadata
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/workflow.py", line 250, in execute
        success = scheduler.schedule()
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/scheduler.py", line 144, in schedule
        error_callback=self._error)
      File "/bubo/home/h16/inod/src/snakemake/bin/../snakemake/executors.py", line 256, in run
        shell=True).decode().split("\n")
      File "/bubo/home/h16/inod/lib/python3.2/subprocess.py", line 522, in check_output
        raise CalledProcessError(retcode, cmd, output=output)
    subprocess.CalledProcessError: Command './../Snakefile-sbatch.py  "/gulo/glob/inod/github/snakemake-parallel-bwa/test/.snakemake.tmp.8S1VHC/13.snakemake-job.sh"' returned non-zero exit status 1
    
  17. Unknown user 264a1 reporter

    Hi, I forgot to reply to this, but it actually works now, so good job and thanks a lot!

  18. Unknown user 80984

    Hi, our cluster is moab using msub, but when I use

    snakemake -k --rerun-incomplete --immediate-submit -j 1000 --forceall --timestamp --cluster-config cluster.json --cluster "msub -V -l nodes={cluster.nodes}:ppn={cluster.cpu} -l mem={cluster.mem} -l walltime={cluster.time} -l depend=afterok:{dependencies} -m {cluster.EmailNotice} -M {cluster.email} -e msub_log/ -o msub_log/" --latency-wait 120 
    

    It gave me error: cannot submit job - cannot create requested dependency 'afterok:'

    Thanks! Ming

  19. Unknown user 80984

    and what if a job has multiple dependent jobs, how should I specify in the commands?

    -l dependent=afterok:<jobid1>:<jobid2>....
    

    Or I have to write a customer job submitting script?

    Thanks!

  20. Ming Tang

    Hi inodb,

    Thanks very much! this helped. I wrote one for moab, but somehow the job hangs on the first job, and qstat shows no job was submitted. could you please look at my wrapper?

    msub_cluster.py:

    #!/usr/bin/env python3
    """
    Submit this clustering script for sbatch to snakemake with:
        snakemake -j 99 --debug --immediate-submit --cluster-config cluster.json --cluster 'msub_cluster.py {dependencies}'
    """
    ## In order to submit all the jobs to the moab queuing system, one needs to write a wrapper.
    
    import sys
    import re
    import os
    from snakemake.utils import read_job_properties
    ## snakemake will generate a jobscript containing all the (shell) commands from your Snakefile. 
    ## I think that's something baked into snakemake's code itself. It passes the jobscript as the last parameter.
    ## https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-job-properties
    
    jobscript = sys.argv[-1]
    job_properties = read_job_properties(jobscript)
    # access property defined in the cluster configuration file (Snakemake >=3.6.0), cluster.json
    time = job_properties["cluster"]["time"]
    cpu = job_properties["cluster"]["cpu"]
    mem = job_properties["cluster"]["mem"]
    nodes = job_properties["cluster"]["nodes"]
    EmailNotice = job_properties["cluster"]["EmailNotice"]
    email = job_properties["cluster"]["email"]
    cmdline = 'msub -V -l nodes={nodes}:ppn={cpu} -l mem={mem} -l walltime={time} -m {EmailNotice} -M {email} -e msub_log/ -o msub_log/'.format(nodes = nodes, cpu = cpu, mem = mem, time = time, EmailNotice = EmailNotice, email = email)
    
    # figure out job dependencies, the last argument is the jobscript which is baked in snakemake
    dependencies = set(sys.argv[1:-1])
    if dependencies:
        cmdline += " -l dependent=afterok:'{}'".format(":".join(dependencies))
    
    cmdline =+ " "
    # the actual job
    cmdline += jobscript
    
    # call the command
    os.system(cmdline)
    

    EDIT:

    I missed a space in the wrapper above, now the jobs get submitted. but still the jobs wait for the dependent job to finish to submit to the cluster.

    snakemake --forceall -j 100  --immediate-submit --cluster-config cluster.json --cluster './msub_cluster.py {dependencies}'
    

    One more questions is that if a job has several dependent jobs, snakemake will figure out the dependencies? I just need to feed {dependencies} to the wrapper?

    Thanks for helping out! you git repo gives me many hints.

    Tommy

  21. Unknown user 80984

    EIDT2:

    it turns out I have to strip out the leading and trailing spaces in the submitted jobid, and now it is working!

    many thanks!

    #!/usr/bin/env python3
    """
    Submit this clustering script for sbatch to snakemake with:
        snakemake -j 99 --debug --immediate-submit --cluster-config cluster.json --cluster 'msub_cluster.py {dependencies}'
    """
    ## In order to submit all the jobs to the moab queuing system, one needs to write a wrapper.
    
    import sys
    import re
    import os
    from snakemake.utils import read_job_properties
    ## snakemake will generate a jobscript containing all the (shell) commands from your Snakefile. 
    ## I think that's something baked into snakemake's code itself. It passes the jobscript as the last parameter.
    ## https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-job-properties
    
    jobscript = sys.argv[-1]
    job_properties = read_job_properties(jobscript)
    # access property defined in the cluster configuration file (Snakemake >=3.6.0), cluster.json
    time = job_properties["cluster"]["time"]
    cpu = job_properties["cluster"]["cpu"]
    mem = job_properties["cluster"]["mem"]
    nodes = job_properties["cluster"]["nodes"]
    EmailNotice = job_properties["cluster"]["EmailNotice"]
    email = job_properties["cluster"]["email"]
    cmdline = 'msub -V -l nodes={nodes}:ppn={cpu} -l mem={mem} -l walltime={time} -m {EmailNotice} -M {email} -e msub_log/ -o msub_log/'.format(nodes = nodes, cpu = cpu, mem = mem, time = time, EmailNotice = EmailNotice, email = email)
    
    # figure out job dependencies, the last argument is the jobscript which is baked in snakemake
    dependencies = set(sys.argv[1:-1])
    if dependencies:
        cmdline += " -l dependent=afterok:'{}'".format(":".join(dependencies))
    
    cmdline =+ " "
    # the actual job
    cmdline += jobscript
    
    # remove the leading and trailing white space for the submitted jobid
    cmdline += r" tail -1 | sed 's/^[ \t]*//;s/[ \t]*$//' "
    
    # call the command
    os.system(cmdline)
    
  22. Unknown user 80984

    one more question, even after the jobs are submitted. snakemake is still waiting on the head node for logging. can I safely control + c it and do not affect my jobs?

  23. Unknown user 264a1 reporter

    Yes that should be fine. As long as you've taken a high enough -j parameter all the jobs should have already been submitted. Only exception I can think of is when you have rules with dynamic input. Not sure if those rules are submitted separately after completion of the rule it is dependent on.

  24. Log in to comment