Commits

Johannes Köster committed abab1db

Finished refactoring.

Comments (0)

Files changed (8)

snakemake/__init__.py

 
 from snakemake.workflow import Workflow
 from snakemake.exceptions import print_exception
-import snakemake.logging as logging
+from snakemake.logging import setup_logger
 
 __author__ = "Johannes Köster"
 __version__ = "2.4.9"
     nodeps=False,
     jobscript=None,
     timestamp=False,
-    log_handler=logging.console_handler):
+    log_handler=None):
     """
     Run snakemake on a given snakefile.
     Note: at the moment, this function is not thread-safe!
     time_measurements -- measure the running times of all rules
     lock              -- lock the working directory
     """
-
-    if quiet:
-        log_handler = logging.quiet_console_handler
-    init_logger(log_handler=log_handler, nocolor=nocolor, stdout=dryrun, debug=debug, timestamp=timestamp)
+    setup_logger(handler=log_handler, quiet=quiet, printshellcmds=printshellcmds, nocolor=nocolor, stdout=dryrun, debug=debug, timestamp=timestamp)
 
     if not os.path.exists(snakefile):
         logger.error("Error: Snakefile \"{}\" not present.".format(snakefile))
         """ Raise exception if output files of job are missing. """
         for f in job.expanded_output:
             if not f.exists:
-                logger.warning("Output file {} not present. Waiting {} "
+                logger.info("Output file {} not present. Waiting {} "
                 "seconds to ensure that this is not because of filesystem "
                 "latency.".format(f, wait))
                 while not f.exists and wait > 0:
         """ Write-protect output files that are marked with protected(). """
         for f in job.expanded_output:
             if f in job.protected_output:
-                logger.warning("Write-protecting output file {}".format(f))
+                logger.info("Write-protecting output file {}".format(f))
                 f.protect()
 
     def handle_temp(self, job):
                     yield f
 
         for f in unneeded_files():
-            logger.warning("Removing temporary output file {}".format(f))
+            logger.info("Removing temporary output file {}".format(f))
             f.remove()
 
     def jobid(self, job):
                 self._ready_jobs.add(job_)
 
         if update_dynamic and job.dynamic_output:
-            logger.warning("Dynamically updating jobs")
+            logger.info("Dynamically updating jobs")
             newjob = self.update_dynamic(job)
             if newjob:
                 # simulate that this job ran and was finished before

snakemake/exceptions.py

     origin = get_exception_origin(ex, linemaps)
     if origin is not None:
         lineno, file = origin
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, lineno, linemaps=linemaps, snakefile=file,
             show_traceback=print_traceback))
         return
     if isinstance(ex, SyntaxError):
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, ex.lineno, linemaps=linemaps, snakefile=ex.filename,
             show_traceback=print_traceback))
     elif isinstance(ex, TokenError):
-        logger.critical(format_error(
+        logger.error(format_error(
             ex, None,
             show_traceback=print_traceback))
     elif isinstance(ex, RuleException):
         for e in ex._include + [ex]:
             if not e.omit:
-                logger.critical(format_error(
+                logger.error(format_error(
                     e, e.lineno, linemaps=linemaps, snakefile=e.filename,
                     show_traceback=print_traceback))
     elif isinstance(ex, WorkflowError):
-        logger.critical(
+        logger.error(
             format_error(
                 ex, ex.lineno, linemaps=linemaps, snakefile=ex.snakefile,
                 show_traceback=print_traceback))
     elif isinstance(ex, KeyboardInterrupt):
-        logger.warning("Cancelling snakemake on user request.")
+        logger.info("Cancelling snakemake on user request.")
     else:
         traceback.print_exception(type(ex), ex, ex.__traceback__)
 

snakemake/executors.py

 
 from snakemake.jobs import Job
 from snakemake.shell import shell
-import snakemake.logging as logging
+from snakemake.logging import logger
 from snakemake.stats import Stats
 from snakemake.utils import format, Unformattable
 from snakemake.exceptions import print_exception, get_exception_origin
             msg=job.message,
             name=job.rule.name,
             local=self.workflow.is_local(job.rule),
-            input=list(format_files(job, job.input, job.ruleio, job.dynamic_input))),
+            input=list(format_files(job, job.input, job.ruleio, job.dynamic_input)),
             output=list(format_files(job, job.output, job.ruleio, job.dynamic_output)),
             log=job.log,
             reason=self.dag.reason(job),
         try:
             self.workflow.persistence.started(job)
         except IOError as e:
-            logger.warning("Failed to set marker file for job started ({}). "
+            logger.info("Failed to set marker file for job started ({}). "
                 "Snakemake will work, but cannot ensure that output files "
                 "are complete in case of a kill signal or power loss. "
                 "Please ensure write permissions for the "
         try:
             self.workflow.persistence.finished(job)
         except IOError as e:
-            logger.warning("Failed to remove marker file for job started "
+            logger.info("Failed to remove marker file for job started "
                 "({}). Please ensure write permissions for the "
                 "directory {}".format(
                     e, self.workflow.persistence.path))

snakemake/logging.py

         return "".join(message)
 
 
-
-logger = Logger()
-stream_handler = None
-handler = None
-
-
 class Logger:
     def __init__(self):
         self.logger = logging.getLogger(__name__)
+        self.handler = self.console_handler
+        self.stream_handler = None
+        self.printshellcmds = False
+        self.printreason = False
+    
+    def set_stream_handler(self, stream_handler):
+        if self.stream_handler is not None:
+            self.logger.removeHandler(self.stream_handler)
+        self.stream_handler = stream_handler
+        self.logger.addHandler(stream_handler)
+
+    def set_level(self, level):
+        self.logger.setLevel(level)
 
     def info(self, msg):
-        handler(dict(level="info", msg=msg))
+        self.handler(dict(level="info", msg=msg))
 
     def debug(self, msg):
-        handler(dict(level="debug", msg=msg))
+        self.handler(dict(level="debug", msg=msg))
 
-    def critical(self, msg):
-        handler(dict(level="critical", msg=msg))
+    def error(self, msg):
+        self.handler(dict(level="error", msg=msg))
 
-    def progress(self, count, total):
-        handler(dict(level="progress", count=count, total=total))
+    def progress(self, done=None, total=None):
+        self.handler(dict(level="progress", done=done, total=total))
 
     def job_info(self, **msg):
         msg["level"] = "job_info"
-        handler(msg)
-    
+        self.handler(msg)
+
+    def console_handler(self, msg):
+        def job_info(msg):
+            def format_item(item, omit=None, valueformat=str):
+                value = msg[item]
+                if value != omit:
+                    return "\t{}: {}".format(item, valueformat(value))
+                    
+            yield "{}rule {}:".format("local" if msg["local"] else "", msg["name"])
+            for item in "input output".split():
+                fmt = format_item(item, omit=[], valueformat=", ".join)
+                if fmt != None:
+                    yield fmt
+            singleitems = ["log"]
+            if self.printreason:
+                singleitems.append("reason")
+            for item in singleitems:
+                fmt = format_item(item, omit=None)
+                if fmt != None:
+                    yield fmt
+            for item in "priority threads".split():
+                fmt = format_item(item, omit=1)
+                if fmt != None:
+                    yield fmt
+            
+        level = msg["level"]
+        if level == "info":
+            self.logger.warning(msg["msg"])
+        elif level == "error":
+            self.logger.error(msg["msg"])
+        elif level == "debug":
+            self.logger.debug(msg["msg"])
+        elif level == "progress" and not self.quiet:
+            done = msg["done"]
+            total = msg["total"]
+            self.logger.info("{} of {} steps ({:.0%}) done".format(done, total, done / total))
+        elif level == "job_info":
+            if not self.quiet:
+                if msg["msg"] is not None:
+                    self.logger.info(msg["msg"])
+                else:
+                    self.logger.info("\n".join(job_info(msg)))
+            elif self.printshellcmds:
+                self.logger.info(msg["shellcmd"])
 
-def quiet_console_handler(msg):
-    level = msg["level"]
-    if level == "info":
-        logger.warning(msg["msg"])
-    elif level == "critical":
-        logger.critical(msg["msg"])
-    elif level == "debug":
-        logger.debug(msg["msg"])
-
-
-def console_handler(msg):
-    def job_info(msg):
-        def format_item(item, omit=None, valueformat=str):
-            value = msg[item]
-            if value != omit:
-                return "\t{}: {}".format(item, valueformat(value))
-                
-        yield "{}rule {}:".format("local" if msg["local"] else "", msg["name"])
-        for item in "input output".split():
-            yield format_item(item, omit=[], ", ".join)
-        for item in "log reason".split():
-            yield format_item(item, omit="")
-        for item in "priority threads".split():
-            yield format_item(item, omit=1)
-        
-    quiet_console_handler(msg)
-    if level == "progress":
-        done = msg["done"]
-        total = msg["total"]
-        logger.info("{} of {} steps ({:.0%}) done".format(done, total, done / total))
-    elif level == "job_info":
-        if "msg" in msg:
-            logger.info(msg["msg"])
-        else:
-            logger.info("\n".join(job_info(msg)))
-
-
-def init_logger(log_handler=console_handler, nocolor=False, stdout=False, debug=False, timestamp=False):
-    global logger
-    global stream_handler
-    global handler
-    handler = log_handler
-    if stream_handler:
-        logger.removeHandler(stream_handler)
+
+logger = Logger()
+
+
+def setup_logger(handler=None, quiet=False, printshellcmds=False, printreason=False, nocolor=False, stdout=False, debug=False, timestamp=False):
+    if handler is not None:
+        logger.handler = handler
     stream_handler = ColorizingStreamHandler(
         nocolor=nocolor, stream=sys.stdout if stdout else sys.stderr,
         timestamp=timestamp
     )
-    logger.logger.addHandler(stream_handler)
-    logger.logger.setLevel(logging.DEBUG if debug else logging.INFO)
-
-
-init_logger()
+    logger.set_stream_handler(stream_handler)
+    logger.set_level(logging.DEBUG if debug else logging.INFO)
+    logger.quiet = quiet
+    logger.printshellcmds = printshellcmds
+    logger.printreason = printreason

snakemake/scheduler.py

 
 from snakemake.executors import DryrunExecutor, TouchExecutor
 from snakemake.executors import ClusterExecutor, CPUExecutor
-import snakemake.logging as logging
+from snakemake.logging import logger
 
 __author__ = "Johannes Köster"
 
                 self._open_jobs.wait()
             except:
                 # this will be caused because of SIGTERM or SIGINT
-                logger.warning("Terminating processes on user request.")
+                logger.info("Terminating processes on user request.")
                 self._executor.shutdown()
                 return False
             self._open_jobs.clear()
             if not self.keepgoing and self._errors:
-                logger.warning("Will exit after finishing "
+                logger.info("Will exit after finishing "
                     "currently running jobs.")
                 self._executor.shutdown()
                 return False
             self.running.remove(job)
             self.failed.add(job)
             if self.keepgoing:
-                logger.warning("Job failed, going on with independent jobs.")
+                logger.info("Job failed, going on with independent jobs.")
             else:
                 self._open_jobs.set()
 
 
     def progress(self):
         """ Display the progress. """
-        logger.progress(self.finished_jobs, len(self.dag))
+        logger.progress(done=self.finished_jobs, total=len(self.dag))
 
 
 def cumsum(iterable, zero=[0]):

snakemake/utils.py

         mime, encoding = mimetypes.guess_type(file)
         if mime is None:
             mime = "text/plain"
-            logger.warning("Could not detect mimetype for {}, assuming "
+            logger.info("Could not detect mimetype for {}, assuming "
             "text/plain.".format(file))
         if encoding is None:
             encoding = defaultenc

snakemake/workflow.py

         if unlock:
             try:
                 self.persistence.cleanup_locks()
-                logger.warning("Unlocking working directory.")
+                logger.info("Unlocking working directory.")
                 return True
             except IOError:
                 logger.error("Error: Unlocking the directory {} failed. Maybe "
         try:
             self.persistence.lock()
         except IOError:
-            logger.critical("Error: Directory cannot be locked. Please make "
+            logger.error("Error: Directory cannot be locked. Please make "
                 "sure that no other Snakemake process is trying to create "
                 "the same files in the following directory:\n{}\n"
                 "If you are sure that no other "
             for subworkflow in self.subworkflows:
                 subworkflow_targets = subworkflow.targets(dag)
                 if subworkflow_targets:
-                    logger.warning("Executing subworkflow {}.".format(subworkflow.name))
+                    logger.info("Executing subworkflow {}.".format(subworkflow.name))
                     if not subsnakemake(subworkflow.snakefile, workdir=subworkflow.workdir, targets=subworkflow_targets):
                         return False
                 else:
-                    logger.warning("Subworkflow {}: Nothing to be done.".format(subworkflow.name))
+                    logger.info("Subworkflow {}: Nothing to be done.".format(subworkflow.name))
             if self.subworkflows:
-                logger.warning("Executing main workflow.")
+                logger.info("Executing main workflow.")
             # rescue globals
             self.globals.update(globals_backup)
 
         if nodeps:
             missing_input = [f for job in dag.targetjobs for f in job.input if dag.needrun(job) and not os.path.exists(f)]
             if missing_input:
-                logger.critical("Dependency resolution disabled (--nodeps) "
+                logger.error("Dependency resolution disabled (--nodeps) "
                     "but missing input " 
                     "files detected. If this happens on a cluster, please make sure "
                     "that you handle the dependencies yourself or turn of "
 
         if not dryrun and not quiet and len(dag):
             if cluster:
-                logger.warning("Provided cluster nodes: {}".format(nodes))
+                logger.info("Provided cluster nodes: {}".format(nodes))
             else:
-                logger.warning("Provided cores: {}".format(cores))
-            logger.warning("\n".join(dag.stats()))
+                logger.info("Provided cores: {}".format(cores))
+            logger.info("\n".join(dag.stats()))
 
         success = scheduler.schedule()
 
         if success:
             if dryrun:
                 if not quiet:
-                    logger.warning("\n".join(dag.stats()))
+                    logger.info("\n".join(dag.stats()))
             elif stats:
                 scheduler.stats.to_csv(stats)
         else:
-            logger.critical(
+            logger.error(
                 "Exiting because a job execution failed. "
                 "Look above for error message")
             return False
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.