Commits

Matteo Bertini committed edff99b

Compact progress reporting in a single log line

Comments (0)

Files changed (1)

     def run(self):
         while True:
             func, args, kargs = self.tasks.get()
-            func(self.gateway)(*args, **kargs)
+            func(self.gateway, self.tasks)(*args, **kargs)
             self.tasks.task_done()
-            logger.info("{0} tasks remaining in the current group ...".format(self.tasks.qsize()))
 
 class ThreadPool:
     """Pool of threads consuming tasks from a queue"""
                 missings += [str(MissingTargetError(self, dep))]
         if missings:
             raise IOError("\n".join(missings))
-    def run_if_needed(self, call_func=subprocess.check_call, inplace=False, force=False):
+    def run_if_needed(self, call_func=subprocess.check_call, inplace=False, force=False, progress=None):
         self.check_deps()
         thread = threading.current_thread()
+        p = progress or ""
         for cmdline in self.commands:
             inplace = inplace or self._inplace
             cmdline_color_skip = " ".join((col.GREEN if (a in self.targets) else (col.YELLOW if a in self.deps else col.BLUE))+a+col.ENDC for a in cmdline.split())
             cmdline_color_run = " ".join((col.RED if (a in self.targets) else (col.YELLOW if a in self.deps else col.BLUE))+a+col.ENDC for a in cmdline.split())
             if self.is_outdated() or force:
-                logger.info("Running<{thread.name}>{f}: {cmdline_color_run}".format(f="[F]" if force else "", **locals()))
+                logger.info("{p}Running<{thread.name}>{f}: {cmdline_color_run}".format(f="[F]" if force else "", **locals()))
 
                 movable_targets = set(self.targets) - self._inplace
                 tmp_files = {}
                     os.rename(tmp_files[target], target)
                 return output
             else:
-                logger.debug("Skipping<{thread.name}>: {cmdline_color_skip}".format(**locals()))
+                logger.debug("{p}Skipping<{thread.name}>: {cmdline_color_skip}".format(**locals()))
     def __hash__(self):
         return self._hash
     def __eq__(self, other):
             heads[dep] = False
     return sorted([t for t in heads if heads[t]], key=lambda t: targets[t].linenum)
 
-def execnet_if_needed(gateway):
+def execnet_if_needed(gateway, tasks):
     channel = gateway.remote_exec("""
     from subprocess import Popen, PIPE, CalledProcessError
     import execnet
     def call_func(*args, **kwargs):
         channel.send(*args, **kwargs)
         return channel.receive()
-    def _execnet_if_needed(task, inplace, task_changed):
+    def _execnet_if_needed(task, inplace, task_changed, progress):
         if task_changed:
             logger.debug("{task!r} changed, force rebuild".format(**locals()))
         try:
-            output = task.run_if_needed(call_func, inplace, task_changed)
+            progress += "[remaining {0}]".format(tasks.qsize())
+            output = task.run_if_needed(call_func, inplace, task_changed, progress)
             if output:
                 sys.stdout.write(output)
         except:
             dryrun(groups)
             continue
         for c, group in enumerate(groups):
-            logger.info("Executing group {0}/{1} ...".format(c+1, len(groups)))
             for t in group:
-                pool.add_task(execnet_if_needed, t, args.inplace, t not in prev_tasks)
+                #FIXME: added tasks should not result as modified
+                pool.add_task(execnet_if_needed, t, args.inplace, t not in prev_tasks,
+                              progress="[group {0}/{1}]".format(c+1, len(groups)))
             pool.join()
     pool.terminate()
+    logger.info("Done!")
 
 def xspec(value):
     if os.path.exists(value):