Commits

Johannes Köster committed d3b0634

Fixed scheduling of too many jobs when using a cluster.

Comments (0)

Files changed (4)

snakemake/__init__.py

 def snakemake(snakefile,
     listrules=False,
     cores=1,
+    nodes=1,
     resources=None,
     workdir=None,
     targets=None,
                     subsnakemake = partial(
                         snakemake,
                         cores=cores,
+                        nodes=nodes,
                         resources=resources,
                         dryrun=dryrun,
                         touch=touch,
                         timestamp=timestamp)
                     success = workflow.execute(
                         targets=targets, dryrun=dryrun, touch=touch,
-                        cores=cores, forcetargets=forcetargets,
+                        cores=cores, nodes=nodes, forcetargets=forcetargets,
                         forceall=forceall, forcerun=forcerun,
                         prioritytargets=prioritytargets, quiet=quiet,
                         keepgoing=keepgoing, printshellcmds=printshellcmds,
         parser.print_help()
         sys.exit(1)
 
+    cores, nodes = args.cores, sys.maxsize
+    if args.cluster:
+        cores, nodes = nodes, cores
+
     success = snakemake(
             args.snakefile,
             listrules=args.list,
-            cores=args.cores,
+            cores=cores,
+            nodes=nodes,
             resources=resources,
             workdir=args.directory,
             targets=args.target,
             self.temp_output = set()
             self.protected_output = set()
             self.subworkflow_input = dict()
-            self.resources = dict(_cores=1)
+            self.resources = dict(_cores=1, _nodes=1)
             self.priority = 1
             self.version = None
             self._log = None

snakemake/scheduler.py

                 workflow, dag, None, submitcmd=cluster,
                 printreason=printreason, quiet=quiet,
                 printshellcmds=printshellcmds, output_wait=output_wait)
-            self.rule_weight = partial(
-                self.rule_weight,
-                maxcores=1)
             if immediate_submit:
                 self.rule_reward = self.dryrun_rule_reward
                 self._submit_callback = partial(
                 self.resources[name] = b_i
             return solution
 
-    def rule_weight(self, rule, maxcores=None):
+    def rule_weight(self, rule):
         res = rule.resources
-        if maxcores is None:
-            maxcores = self.workflow.global_resources["_cores"]
-
-        def calc_res(item):
-            name, value = item
-            if name == "_cores":
-                return min(maxcores, res["_cores"])
+        def calc_res(name, value):
             return min(res.get(name, 0), value)
 
-        return list(map(calc_res, self.workflow.global_resources.items()))
+        return [calc_res(*item) for item in self.workflow.global_resources.items()]
 
     def rule_reward(self, rule, jobs=None):
         jobs = jobs[rule]

snakemake/workflow.py

         return rule.name in self._localrules
 
     def execute(
-        self, targets=None, dryrun=False,  touch=False, cores=1,
+        self, targets=None, dryrun=False,  touch=False, cores=1, nodes=1,
         forcetargets=False, forceall=False, forcerun=None,
         prioritytargets=None, quiet=False, keepgoing=False,
         printshellcmds=False, printreason=False, printdag=False,
 
         self.global_resources = dict() if cluster or resources is None else resources
         self.global_resources["_cores"] = cores
+        self.global_resources["_nodes"] = nodes
 
         def rules(items):
             return map(self._rules.__getitem__, filter(self.is_rule, items))