Commits

Johannes Köster committed da397aa Merge

Merge branch 'logging_callback'

Comments (0)

Files changed (8)

snakemake/__init__.py

 
 from snakemake.workflow import Workflow
 from snakemake.exceptions import print_exception
-from snakemake.logging import logger, init_logger
+from snakemake.logging import setup_logger
 
 __author__ = "Johannes Köster"
 __version__ = "2.4.9"
     notemp=False,
     nodeps=False,
     jobscript=None,
-    timestamp=False):
+    timestamp=False,
+    log_handler=None):
     """
     Run snakemake on a given snakefile.
     Note: at the moment, this function is not thread-safe!
     time_measurements -- measure the running times of all rules
     lock              -- lock the working directory
     """
-
-    init_logger(nocolor=nocolor, stdout=dryrun, debug=debug, timestamp=timestamp)
+    setup_logger(handler=log_handler, quiet=quiet, printshellcmds=printshellcmds, nocolor=nocolor, stdout=dryrun, debug=debug, timestamp=timestamp)
 
     if not os.path.exists(snakefile):
         logger.error("Error: Snakefile \"{}\" not present.".format(snakefile))
         """ Raise exception if output files of job are missing. """
         for f in job.expanded_output:
             if not f.exists:
-                logger.warning("Output file {} not present. Waiting {} "
+                logger.info("Output file {} not present. Waiting {} "
                 "seconds to ensure that this is not because of filesystem "
                 "latency.".format(f, wait))
                 while not f.exists and wait > 0:
         """ Write-protect output files that are marked with protected(). """
         for f in job.expanded_output:
             if f in job.protected_output:
-                logger.warning("Write-protecting output file {}".format(f))
+                logger.info("Write-protecting output file {}".format(f))
                 f.protect()
 
     def handle_temp(self, job):
                     yield f
 
         for f in unneeded_files():
-            logger.warning("Removing temporary output file {}".format(f))
+            logger.info("Removing temporary output file {}".format(f))
             f.remove()
 
     def jobid(self, job):
                 self._ready_jobs.add(job_)
 
         if update_dynamic and job.dynamic_output:
-            logger.warning("Dynamically updating jobs")
+            logger.info("Dynamically updating jobs")
             newjob = self.update_dynamic(job)
             if newjob:
                 # simulate that this job ran and was finished before

snakemake/exceptions.py

     origin = get_exception_origin(ex, linemaps)
     if origin is not None:
         lineno, file = origin
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, lineno, linemaps=linemaps, snakefile=file,
             show_traceback=print_traceback))
         return
     if isinstance(ex, SyntaxError):
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, ex.lineno, linemaps=linemaps, snakefile=ex.filename,
             show_traceback=print_traceback))
     elif isinstance(ex, TokenError):
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, None,
             show_traceback=print_traceback))
     elif isinstance(ex, RuleException):
         for e in ex._include + [ex]:
             if not e.omit:
-                logger.critical(format_error(
+                logger.error(format_error(
                     e, e.lineno, linemaps=linemaps, snakefile=e.filename,
                     show_traceback=print_traceback))
     elif isinstance(ex, WorkflowError):
-        logger.critical(
+        logger.error(
             format_error(
                 ex, ex.lineno, linemaps=linemaps, snakefile=ex.snakefile,
                 show_traceback=print_traceback))
     elif isinstance(ex, KeyboardInterrupt):
-        logger.warning("Cancelling snakemake on user request.")
+        logger.info("Cancelling snakemake on user request.")
     else:
         traceback.print_exception(type(ex), ex, ex.__traceback__)
 

snakemake/executors.py

                 else:
                     yield f
 
-        def format_ruleitem(name, value):
-            return "" if not value else "\t{}: {}".format(name, value)
-
-        desc = list()
-        if not self.quiet:
-            if job.message:
-                desc.append(job.message)
-            else:
-                desc.append("{}rule {}:".format(self.rule_prefix(job), job.rule.name))
-                for name, value in (
-                    ("input", ", ".join(format_files(
-                        job, job.input, job.ruleio, job.dynamic_input))),
-                    ("output", ", ".join(format_files(
-                        job, job.output, job.ruleio,
-                        job.dynamic_output))),
-                    ("log", job.log),
-                    ("reason",
-                        self.dag.reason(job) if self.printreason else None)):
-                    if value:
-                        desc.append(format_ruleitem(name, value))
-                priority = self.dag.priority(job)
-                if priority > 1:
-                    desc.append(format_ruleitem(
-                        "priority", "highest"
-                        if priority == Job.HIGHEST_PRIORITY
-                        else priority))
-                if self.printthreads and job.threads > 1:
-                    desc.append(format_ruleitem("threads", job.threads))
-        if self.printshellcmds and job.shellcmd:
-            desc.append(job.shellcmd)
-        if desc:
-            logger.info("\n".join(desc))
-            if job.dynamic_output:
-                logger.warning("Subsequent jobs will be added dynamically "
+        priority = self.dag.priority(job)
+        logger.job_info(
+            msg=job.message,
+            name=job.rule.name,
+            local=self.workflow.is_local(job.rule),
+            input=list(format_files(job, job.input, job.ruleio, job.dynamic_input)),
+            output=list(format_files(job, job.output, job.ruleio, job.dynamic_output)),
+            log=job.log,
+            reason=self.dag.reason(job),
+            priority="highest" if priority == Job.HIGHEST_PRIORITY else priority,
+            threads=job.threads,
+            shellcmd=job.shellcmd)
+
+        if job.dynamic_output:
+            logger.info(
+                    "Subsequent jobs will be added dynamically "
                     "depending on the output of this rule")
 
     def finish_job(self, job):
         try:
             self.workflow.persistence.started(job)
         except IOError as e:
-            logger.warning("Failed to set marker file for job started ({}). "
+            logger.info("Failed to set marker file for job started ({}). "
                 "Snakemake will work, but cannot ensure that output files "
                 "are complete in case of a kill signal or power loss. "
                 "Please ensure write permissions for the "
         try:
             self.workflow.persistence.finished(job)
         except IOError as e:
-            logger.warning("Failed to remove marker file for job started "
+            logger.info("Failed to remove marker file for job started "
                 "({}). Please ensure write permissions for the "
                 "directory {}".format(
                     e, self.workflow.persistence.path))

snakemake/logging.py

             message.append(self.RESET_SEQ)
         return "".join(message)
 
-logger = logging.getLogger(__name__)
-handler = None
 
+class Logger:
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+        self.handler = self.console_handler
+        self.stream_handler = None
+        self.printshellcmds = False
+        self.printreason = False
+    
+    def set_stream_handler(self, stream_handler):
+        if self.stream_handler is not None:
+            self.logger.removeHandler(self.stream_handler)
+        self.stream_handler = stream_handler
+        self.logger.addHandler(stream_handler)
 
-def init_logger(nocolor=False, stdout=False, debug=False, timestamp=False):
-    global logger
-    global handler
-    if handler:
-        logger.removeHandler(handler)
-    handler = ColorizingStreamHandler(
+    def set_level(self, level):
+        self.logger.setLevel(level)
+
+    def info(self, msg):
+        self.handler(dict(level="info", msg=msg))
+
+    def debug(self, msg):
+        self.handler(dict(level="debug", msg=msg))
+
+    def error(self, msg):
+        self.handler(dict(level="error", msg=msg))
+
+    def progress(self, done=None, total=None):
+        self.handler(dict(level="progress", done=done, total=total))
+
+    def job_info(self, **msg):
+        msg["level"] = "job_info"
+        self.handler(msg)
+
+    def console_handler(self, msg):
+        def job_info(msg):
+            def format_item(item, omit=None, valueformat=str):
+                value = msg[item]
+                if value != omit:
+                    return "\t{}: {}".format(item, valueformat(value))
+                    
+            yield "{}rule {}:".format("local" if msg["local"] else "", msg["name"])
+            for item in "input output".split():
+                fmt = format_item(item, omit=[], valueformat=", ".join)
+                if fmt != None:
+                    yield fmt
+            singleitems = ["log"]
+            if self.printreason:
+                singleitems.append("reason")
+            for item in singleitems:
+                fmt = format_item(item, omit=None)
+                if fmt != None:
+                    yield fmt
+            for item in "priority threads".split():
+                fmt = format_item(item, omit=1)
+                if fmt != None:
+                    yield fmt
+            
+        level = msg["level"]
+        if level == "info":
+            self.logger.warning(msg["msg"])
+        elif level == "error":
+            self.logger.error(msg["msg"])
+        elif level == "debug":
+            self.logger.debug(msg["msg"])
+        elif level == "progress" and not self.quiet:
+            done = msg["done"]
+            total = msg["total"]
+            self.logger.info("{} of {} steps ({:.0%}) done".format(done, total, done / total))
+        elif level == "job_info":
+            if not self.quiet:
+                if msg["msg"] is not None:
+                    self.logger.info(msg["msg"])
+                else:
+                    self.logger.info("\n".join(job_info(msg)))
+            elif self.printshellcmds:
+                self.logger.info(msg["shellcmd"])
+
+
+logger = Logger()
+
+
+def setup_logger(handler=None, quiet=False, printshellcmds=False, printreason=False, nocolor=False, stdout=False, debug=False, timestamp=False):
+    if handler is not None:
+        logger.handler = handler
+    stream_handler = ColorizingStreamHandler(
         nocolor=nocolor, stream=sys.stdout if stdout else sys.stderr,
         timestamp=timestamp
     )
-    logger.addHandler(handler)
-    logger.setLevel(logging.DEBUG if debug else logging.INFO)
-
-init_logger()
+    logger.set_stream_handler(stream_handler)
+    logger.set_level(logging.DEBUG if debug else logging.INFO)
+    logger.quiet = quiet
+    logger.printshellcmds = printshellcmds
+    logger.printreason = printreason

snakemake/scheduler.py

                 self._open_jobs.wait()
             except:
                 # this will be caused because of SIGTERM or SIGINT
-                logger.warning("Terminating processes on user request.")
+                logger.info("Terminating processes on user request.")
                 self._executor.shutdown()
                 return False
             self._open_jobs.clear()
             if not self.keepgoing and self._errors:
-                logger.warning("Will exit after finishing "
+                logger.info("Will exit after finishing "
                     "currently running jobs.")
                 self._executor.shutdown()
                 return False
             self.running.remove(job)
             self.failed.add(job)
             if self.keepgoing:
-                logger.warning("Job failed, going on with independent jobs.")
+                logger.info("Job failed, going on with independent jobs.")
             else:
                 self._open_jobs.set()
 
 
     def progress(self):
         """ Display the progress. """
-        logger.info("{} of {} steps ({:.0%}) done".format(self.finished_jobs,
-            len(self.dag), self.finished_jobs / len(self.dag)))
+        logger.progress(done=self.finished_jobs, total=len(self.dag))
 
 
 def cumsum(iterable, zero=[0]):

snakemake/utils.py

         mime, encoding = mimetypes.guess_type(file)
         if mime is None:
             mime = "text/plain"
-            logger.warning("Could not detect mimetype for {}, assuming "
+            logger.info("Could not detect mimetype for {}, assuming "
             "text/plain.".format(file))
         if encoding is None:
             encoding = defaultenc

snakemake/workflow.py

         if unlock:
             try:
                 self.persistence.cleanup_locks()
-                logger.warning("Unlocking working directory.")
+                logger.info("Unlocking working directory.")
                 return True
             except IOError:
                 logger.error("Error: Unlocking the directory {} failed. Maybe "
         try:
             self.persistence.lock()
         except IOError:
-            logger.critical("Error: Directory cannot be locked. Please make "
+            logger.error("Error: Directory cannot be locked. Please make "
                 "sure that no other Snakemake process is trying to create "
                 "the same files in the following directory:\n{}\n"
                 "If you are sure that no other "
             for subworkflow in self.subworkflows:
                 subworkflow_targets = subworkflow.targets(dag)
                 if subworkflow_targets:
-                    logger.warning("Executing subworkflow {}.".format(subworkflow.name))
+                    logger.info("Executing subworkflow {}.".format(subworkflow.name))
                     if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow_targets):
                         return False
                 else:
-                    logger.warning("Subworkflow {}: Nothing to be done.".format(subworkflow.name))
+                    logger.info("Subworkflow {}: Nothing to be done.".format(subworkflow.name))
             if self.subworkflows:
-                logger.warning("Executing main workflow.")
+                logger.info("Executing main workflow.")
             # rescue globals
             self.globals.update(globals_backup)
 
         if nodeps:
             missing_input = [f for job in dag.targetjobs for f in job.input if dag.needrun(job) and not os.path.exists(f)]
             if missing_input:
-                logger.critical("Dependency resolution disabled (--nodeps) "
+                logger.error("Dependency resolution disabled (--nodeps) "
                     "but missing input " 
                     "files detected. If this happens on a cluster, please make sure "
                     "that you handle the dependencies yourself or turn of "
 
         if not dryrun and not quiet and len(dag):
             if cluster:
-                logger.warning("Provided cluster nodes: {}".format(nodes))
+                logger.info("Provided cluster nodes: {}".format(nodes))
             else:
-                logger.warning("Provided cores: {}".format(cores))
-            logger.warning("\n".join(dag.stats()))
+                logger.info("Provided cores: {}".format(cores))
+            logger.info("\n".join(dag.stats()))
 
         success = scheduler.schedule()
 
         if success:
             if dryrun:
                 if not quiet:
-                    logger.warning("\n".join(dag.stats()))
+                    logger.info("\n".join(dag.stats()))
             elif stats:
                 scheduler.stats.to_csv(stats)
         else:
-            logger.critical(
+            logger.error(
                 "Exiting because a job execution failed. "
                 "Look above for error message")
             return False
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.