Commits

Anonymous committed 51e3c19

Started on logcapture plugin. Does capture, does not output. Fixed up loader plugin to avoid double-loading.

  • Participants
  • Parent commits 3eb7fb6

Comments (0)

Files changed (6)

nose2/plugins/loader.py

 
 
 class Functions(ml.Functions):
+    configSection = 'nose-functions'
     unpack = unpack
 
     def createTests(self, obj, testIndex=None):
 
 
 class Generators(ml.Generators):
+    configSection = 'nose-generators'
     unpack = unpack
 
     def isGenerator(self, obj):
                 super(Generators, self).isGenerator(obj))
 
 
-

nose2/plugins/logcapture.py

+import logging
+from logging.handlers import BufferingHandler
+from StringIO import StringIO
+import threading
+from unittest2.events import Plugin, addOption
+
+from nose2.util import ln
+
+
+log = logging.getLogger(__name__)
+
+
+class LogCapture(Plugin):
+    configSection = 'logging'
+    commandLineSwitch = (None, 'log-capture', 'Enable log capture')
+    logformat = '%(name)s: %(levelname)s: %(message)s'
+    logdatefmt = None
+    clear = False
+    filters = ['-nose']
+
+    def __init__(self):
+        self.logformat = self.config.as_str('format', self.logformat)
+        self.logdatefmt = self.config.as_str('date-format', self.logdatefmt)
+        self.filters = self.config.as_list('filter', self.filters)
+        self.clear = self.config.as_bool('clear-handlers', self.clear)
+        print "inited"
+
+    def setupLoghandler(self):
+        # setup our handler with root logger
+        print "setup"
+        root_logger = logging.getLogger()
+        if self.clear:
+            if hasattr(root_logger, "handlers"):
+                for handler in root_logger.handlers:
+                    root_logger.removeHandler(handler)
+            for logger in logging.Logger.manager.loggerDict.values():
+                if hasattr(logger, "handlers"):
+                    for handler in logger.handlers:
+                        logger.removeHandler(handler)
+        # make sure there isn't one already
+        # you can't simply use "if self.handler not in root_logger.handlers"
+        # since at least in unit tests this doesn't work --
+        # LogCapture() is instantiated for each test case while root_logger
+        # is module global
+        # so we always add new MyMemoryHandler instance
+        for handler in root_logger.handlers[:]:
+            if isinstance(handler, MyMemoryHandler):
+                root_logger.handlers.remove(handler)
+        root_logger.addHandler(self.handler)
+        # to make sure everything gets captured
+        root_logger.setLevel(logging.NOTSET)
+
+    def addCapturedLogs(self, event):
+        format = self.handler.format
+        records = [format(r) for r in self.handler.buffer]
+        event.metadata['logs'] = records
+        # FIXME when there's a better way, do that instead
+        records = [ln('>> begin captured logging <<')] + records + [
+            ln('>> end captured logging <<')]
+        records = u'\n'.join(records)
+        event.traceback = "%s\n%s" % (
+            event.traceback, records.encode('utf-8'))
+
+    def startTestRun(self, event):
+        self.handler = MyMemoryHandler(1000, self.logformat, self.logdatefmt,
+                                       self.filters)
+        self.setupLoghandler()
+
+    def startTest(self, event):
+        self.setupLoghandler()
+
+    def stopTest(self, event):
+        if event.error or event.failed:
+            self.addCapturedLogs(event)
+        self.handler.truncate()
+
+
+class FilterSet(object):
+    def __init__(self, filter_components):
+        self.inclusive, self.exclusive = self._partition(filter_components)
+
+    @staticmethod
+    def _partition(components):
+        inclusive, exclusive = [], []
+        for component in components:
+            if component.startswith('-'):
+                exclusive.append(component[1:])
+            else:
+                inclusive.append(component)
+        return inclusive, exclusive
+
+    def allow(self, record):
+        """returns whether this record should be printed"""
+        if not self:
+            # nothing to filter
+            return True
+        return self._allow(record) and not self._deny(record)
+
+    @staticmethod
+    def _any_match(matchers, record):
+        """return the bool of whether `record` starts with
+        any item in `matchers`"""
+        def record_matches_key(key):
+            return record == key or record.startswith(key + '.')
+        return any(map(record_matches_key, matchers))
+
+    def _allow(self, record):
+        if not self.inclusive:
+            return True
+        return self._any_match(self.inclusive, record)
+
+    def _deny(self, record):
+        if not self.exclusive:
+            return False
+        return self._any_match(self.exclusive, record)
+
+
+class MyMemoryHandler(BufferingHandler):
+    def __init__(self, capacity, logformat, logdatefmt, filters):
+        BufferingHandler.__init__(self, capacity)
+        fmt = logging.Formatter(logformat, logdatefmt)
+        self.setFormatter(fmt)
+        self.filterset = FilterSet(filters)
+
+    def flush(self):
+        pass # do nothing
+
+    def truncate(self):
+        self.buffer = []
+
+    def filter(self, record):
+        return self.filterset.allow(record.name)
+
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state['lock']
+        return state
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+        self.lock = threading.RLock()

nose2/plugins/plugins.cfg

 [unittest]
 plugins =
+        nose2.plugins.logcapture
         nose2.plugins.attrib
         nose2.plugins.doctests
         nose2.plugins.outcomes
         nose2.plugins.prof
         nose2.plugins.testid
         nose2.plugins.loader
-
-excluded-plugins =
         unittest2.plugins.moduleloading
 
-[functions]
+[nose-functions]
 always-on = True
 
-[generators]
+[nose-generators]
 always-on = True
 
 [parameters]
 always-on = True
 
-[compat]
+[logging]
 always-on = True
+def ln(label, char='-', width=70):
+    """Draw a divider, with label in the middle.
+
+    >>> ln('hello there')
+    '---------------------------- hello there -----------------------------'
+
+    Width and divider char may be specified. Defaults are 70 and '-'
+    respectively.
+
+    """
+    label_len = len(label) + 2
+    chunk = (width - label_len) / 2
+    out = '%s %s %s' % (char * chunk, label, char * chunk)
+    pad = width - len(out)
+    if pad > 0:
+        out = out + (char * pad)
+    return out

support/layout2/lib/pkg2/__init__.py

+import logging
+
+log = logging.getLogger(__name__)
+
 def get_one():
+    log.debug("Returning %s", 1)
     return 1

support/layout2/tests.py

+import logging
+import unittest2
+
 from pkg2 import get_one
 
+
+log = logging.getLogger(__name__)
+log.debug("module imported")
+
 def test():
+    log.debug("test run")
     assert get_one() == 1
+
+
+def test_fail():
+    log.debug("test_fail run")
+    assert get_one() == 2
+
+
+class Tests(unittest2.TestCase):
+    def test_fail2(self):
+        log.debug("test_fail2 run")
+        self.assertEqual(get_one(), 4)