Proposal: partial outputs

Issue #616 new
Simon Ye
created an issue

One use case I want to support is one job processing multiple input files serially. This is useful if the program has to load a large database and parallelizing it the normal way (1 job per input) is inefficient, especially in a cluster setting where these jobs may get dispatched to different hosts, leading to loading the database multiple times on multiple hosts when it would be much faster to have process all files serially in one job.

The normal one job per input file rule would be something like this:

rule process_single:
  input: 'input/{sample}.fastq'
  output: 'output/{sample}.results'
  shell:
    '''
    do_something {input} {output}
    '''

While processing them all in a single rule would be:

rule process_batch:
  input: expand('input/{sample}.fastq', sample=samples)
  output: expand('output/{sample}.results', sample=samples)
  shell:
    '''
    do_something --inputs {input} --outputs {output}  # Assume program is smart about this
    '''

However, the problem with this approach is if process_batch fails in the middle or if samples changes, the processing has to be rerun on all files - even ones that succeeded. The reason for this problem is that all output files are deleted before the job starts if the job needs to run. This can be solved via dynamic outputs, but that's not the best fit because in this case we know all the inputs and outputs that we expect. It can also be solved via extensive python scripting to manually check mtime and edit the samples variable, but both of these methods are clunky.

The main problem is that snakemake doesn't inherently know that how the input and outputs of a single rule are related. i.e. It doesn't know that output1.txt depends on the mtime of input1.fastq but not on input3.txt or some combination. I'm proposing a new directive for snakemake rules for detailed linking of input->output files and partial job running that accepts a function that returns a dict linking the input and output filenames.

def dependencies(inputs, outputs):
  return { o: i for i:o in zip(inputs, outputs) }

rule process_batch_smart:
  input: expand('input/{sample}.fastq', sample=samples)
  output: expand('output/{sample}.results', sample=samples)
  partial_dependencies: dependencies
  shell:
    '''
    do_something --inputs {input} --outputs {output}  # Assume program is smart about this
    '''

In this case, we would add a processing step that would remove any output filenames whose mtime are newer than its partial dependencies inputs, and similarly also remove any input filenames that don't map to an output. This would basically filter out the input and output properly before running the job. The hardest part of this is probably deciding how to name this.

Comments (6)

  1. Johannes Köster

    Does this go away with job groups (an upcoming feature)? You keep your 1 job per file solution and put all these jobs in one job group. Snakemake submits this job group to one cluster node, such that you don't have the described problem. What remains is the seriality issue which can be solved with e.g. resources: db=1.

  2. Simon Ye reporter

    I don't fully understand the job groups proposal. In each rule we give it number i. I assume that two rules linked together with the sample group number will be executed together in 1 jobs versus 2 jobs. But what about jobs from the same rule? Does that mean that all instances of the rule are grouped together? What does grouping entail - does it have to wait until all inputs ready before starting or does it start a job when it can and then block until it can finish the rest of the job?

  3. Johannes Köster

    It would wait for the entire input. Every subgraph in the DAG that has the same group id will be submitted together. That means in your case that you would have to write an aggregating rule with the same group id, such that a connected subgraph with all the jobs appears.

  4. Simon Ye reporter

    It would be almost perfect. Only slight problem is that we cannot manipulate the shell directive for cases if the command can process multiple inputs/outputs in a single command. Each shell block still has to be idempotent even if they run serially.

  5. Log in to comment