Anonymous avatar Anonymous committed dfbaf84

And so it begins. (This is definitely the wrong way to load plugins, but I'm not sure there is a right way yet.)

Comments (0)

Files changed (10)

nose2/__init__.py

+from nose2.main import nose2_main

nose2/__main__.py

+"""Main entry point"""
+
+import sys
+if sys.argv[0].endswith("__main__.py"):
+    sys.argv[0] = "nose2"
+
+__unittest = True
+
+from nose2.main import main_
+main_()
+import os
+
+from unittest2 import main_, config, events
+
+from . import plugins
+
+
+def nose2_main():
+    events.loadPlugins(configLocations=[plugins.configFile()])
+    main_()

nose2/plugins/__init__.py

+import os
+
+HERE = os.path.abspath(os.path.dirname(__file__))
+
+def configFile():
+    return os.path.join(HERE, 'plugins.cfg')
+

nose2/plugins/attrib.py

+from unittest2.events import hooks, Plugin, addOption
+from unittest import TestSuite
+
+undefined = object()
+
+
+class AttributeSelector(Plugin):
+    def __init__(self):
+        self.attribs = []
+        self.register()
+        addOption(self.attribs, "A", "attr", "Attribulate")
+
+    def startTestRun(self, event):
+        if not self.attribs:
+            return
+        attribs = []
+        for attr in self.attribs:
+            # all attributes within an attribute group must match
+            attr_group = []
+            for attrib in attr.strip().split(","):
+                # don't die on trailing comma
+                if not attrib:
+                    continue
+                items = attrib.split("=", 1)
+                if len(items) > 1:
+                    # "name=value"
+                    # -> 'str(obj.name) == value' must be True
+                    key, value = items
+                else:
+                    key = items[0]
+                    if key[0] == "!":
+                        # "!name"
+                        # 'bool(obj.name)' must be False
+                        key = key[1:]
+                        value = False
+                    else:
+                        # "name"
+                        # -> 'bool(obj.name)' must be True
+                        value = True
+                attr_group.append((key, value))
+            attribs.append(attr_group)
+        if not attribs:
+            return
+
+        event.suite = self.filterSuite(event.suite, attribs)
+
+    def filterSuite(self, suite, attribs):
+        new_suite = suite.__class__()
+        for test in suite:
+            if isinstance(test, TestSuite):
+                new_suite.addTest(self.filterSuite(test, attribs))
+            elif self.validateAttrib(test, attribs):
+                new_suite.addTest(test)
+        return new_suite
+
+    def validateAttrib(self, test, attribs):
+        any_ = False
+        for group in attribs:
+            match = True
+            for key, value in group:
+                obj_value = self.getAttr(test, key)
+                if callable(value):
+                    if not value(key, test_attribs):
+                        match = False
+                        break
+                elif value is True:
+                    # value must exist and be True
+                    if not bool(obj_value):
+                        match = False
+                        break
+                elif value is False:
+                    # value must not exist or be False
+                    if bool(obj_value):
+                        match = False
+                        break
+                elif type(obj_value) in (list, tuple):
+                    # value must be found in the list attribute
+                    if not str(value).lower() in [str(x).lower()
+                                                  for x in obj_value]:
+                        match = False
+                        break
+                else:
+                    # value must match, convert to string and compare
+                    if (value != obj_value
+                        and str(value).lower() != str(obj_value).lower()):
+                        match = False
+                        break
+            any_ = any_ or match
+        return any_
+
+    def getAttr(self, test, key):
+        val = getattr(test, key, undefined)
+        if val is not undefined:
+            return val
+        if hasattr(test, '_testFunc'):
+            val = getattr(test._testFunc, key, undefined)
+            if val is not undefined:
+                return val
+        elif hasattr(test, '_testMethodName'):
+            meth = getattr(test, test._testMethodName, undefined)
+            if meth is not undefined:
+                val = getattr(meth, key, undefined)
+                if val is not undefined:
+                    return val

nose2/plugins/outcomes.py

+from unittest2.events import hooks, Plugin
+
+class Outcomes(Plugin):
+
+    configSection = 'outcomes'
+
+    def __init__(self):
+        self.treat_as_fail = set(self.config.as_list('treat-as-fail', []))
+        self.treat_as_skip = set(self.config.as_list('treat-as-skip', []))
+        self.register()
+
+    def stopTest(self, event):
+        if event.exc_info:
+            ec, ev, tb = event.exc_info
+            classname = ec.__name__
+            if classname in self.treat_as_fail:
+                short, long_ = self.labels(classname)
+                event.setOutcome(long_, 'failed', short, long_)
+            elif classname in self.treat_as_skip:
+                short, long_ = self.labels(classname, upper=False)
+                event.setOutcome(
+                    long_, 'skipped', short, "%s '%%s'" % long_, ev)
+
+    def labels(self, label, upper=True):
+        if upper:
+            label = label.upper()
+        else:
+            label = label.lower()
+        short = label[0]
+        return short, label

nose2/plugins/plugins.cfg

+[unittest]
+plugins =
+        nose2.plugins.attrib
+        nose2.plugins.outcomes
+        nose2.plugins.prof
+        nose2.plugins.testid

nose2/plugins/prof.py

+try:
+    import hotshot
+    from hotshot import stats
+except ImportError:
+    hotshot, stats = None, None
+import os
+import tempfile
+
+from unittest2 import Plugin
+
+
+class Profiler(Plugin):
+    configSection = 'profiler'
+    commandLineSwitch = ('P', 'profile', 'Run tests under profiler')
+
+    def __init__(self):
+        self.pfile = self.config.as_str('filename', '')
+        self.sort = self.config.as_str('sort', 'cumulative')
+        self.restrict = self.config.as_list('restrict', [])
+        self.clean = False
+        self.fileno = None
+
+    def startTestRun(self, event):
+        if not hotshot:
+            return
+        self.createPfile()
+        self.prof = hotshot.Profile(self.pfile)
+        self.suite = event.suite
+        event.suite = self.runProf
+
+    def runProf(self, result):
+        self.prof.runcall(self.suite, result)
+
+    def stopTestRun(self, event):
+        if not hotshot:
+            return
+        # write prof output to stream
+        class Stream:
+            def write(self, *msg):
+                for m in msg:
+                    event.message(unicode(m), (1, 2))
+                    event.message(u' ', (1, 2))
+        stream = Stream()
+        self.prof.close()
+        prof_stats = stats.load(self.pfile)
+        prof_stats.sort_stats(self.sort)
+        event.message("\n\nProfiling results\n", (1, 2))
+        compat_25 = hasattr(prof_stats, 'stream')
+        if compat_25:
+            tmp = prof_stats.stream
+            prof_stats.stream = stream
+        else:
+            tmp = sys.stdout
+            sys.stdout = stream
+        try:
+            if self.restrict:
+                prof_stats.print_stats(*self.restrict)
+            else:
+                prof_stats.print_stats()
+        finally:
+            if compat_25:
+                prof_stats.stream = tmp
+            else:
+                sys.stdout = tmp
+        self.prof.close()
+        if self.clean:
+            if self.fileno:
+                try:
+                    os.close(self.fileno)
+                except OSError:
+                    pass
+            try:
+                os.unlink(self.pfile)
+            except OSError:
+                pass
+
+    def createPfile(self):
+        if not self.pfile:
+            self.fileno, self.pfile = tempfile.mkstemp()
+            self.clean = True

nose2/plugins/testid.py

+import os
+import pickle
+import re
+from unittest2.events import Plugin
+
+
+class TestId(Plugin):
+    configSection = 'testid'
+    commandLineSwitch = ('I', 'with-id', 'Add test ids to output')
+    idpat = re.compile(r'(\d+)')
+
+    def __init__(self):
+        self.idfile = self.config.get('id-file', '.noseids')
+        self.ids = {}
+        self.tests = {}
+        if not os.path.isabs(self.idfile):
+            # FIXME expand-user?
+            self.idfile = os.path.join(os.getcwd(), self.idfile)
+        self.id = 0
+        self._loaded = False
+
+    def nextId(self):
+        self.id += 1
+        return self.id
+
+    def startTest(self, event):
+        testid = event.test.id()
+        if testid not in self.tests:
+            id_ = self.nextId()
+            self.ids[id_] = testid
+            self.tests[testid] = id_
+        else:
+            id_ = self.tests[testid]
+        event.message('#%s ' % id_, (2,))
+
+    def loadTestsFromName(self, event):
+        self.loadIds()
+        id_ = self.argToId(event.name)
+        if id_ and id_ in self.ids:
+            event.name = self.ids[id_]
+
+    def loadTestsFromNames(self, event):
+        self.loadIds()
+        new_names = []
+        for name in event.names:
+            if self.argToId(name) in self.ids:
+                new_names.append(self.ids[name])
+            else:
+                new_names.append(name)
+        event.names[:] = new_names
+
+    def stopTestRun(self, event):
+        fh = open(self.idfile, 'w')
+        pickle.dump({'ids': self.ids, 'tests': self.tests}, fh)
+
+    def argToId(self, name):
+        m = self.idpat.match(name)
+        if m:
+            return int(m.groups()[0])
+
+    def loadIds(self):
+        if self._loaded:
+            return
+        fh = open(self.idfile, 'r')
+        data = pickle.load(fh)
+        if 'ids' in data:
+            self.ids = data['ids']
+        if 'tests' in data:
+            self.tests = data['tests']
+        self.id = max(self.ids.keys())
+        self._loaded = True
+from setuptools import setup
+
+setup(
+    name='nose2',
+    version='0.1',
+    entry_points= {
+        'console_scripts': [
+            'nose2 = nose2:nose2_main',
+            ],
+        }
+)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.