Source

tox / tox / _pytestplugin.py

import pytest, py
import tox
import os
import sys
from py.builtin import print_
from fnmatch import fnmatch
import time
from tox._config import parseconfig
from tox._venv import VirtualEnv
from tox._cmdline import Action

def pytest_configure():
    if 'TOXENV' in os.environ:
        del os.environ['TOXENV']
    if 'HUDSON_URL' in os.environ:
        del os.environ['HUDSON_URL']

def pytest_report_header():
    return "tox comes from: %r" % (tox.__file__)

def pytest_funcarg__newconfig(request, tmpdir):
    def newconfig(args, source=None):
        if source is None:
            source = args
            args = []
        s = py.std.textwrap.dedent(source)
        p = tmpdir.join("tox.ini")
        p.write(s)
        old = tmpdir.chdir()
        try:
            return parseconfig(args)
        finally:
            old.chdir()
    return newconfig

def pytest_funcarg__cmd(request):
    return Cmd(request)

class ReportExpectMock:
    def __init__(self, session):
        self._calls = []
        self._index = -1
        self.session = session

    def clear(self):
        self._calls[:] = []

    def __getattr__(self, name):
        if name[0] == "_":
            raise AttributeError(name)

        def generic_report(*args, **kwargs):
            self._calls.append((name,)+args)
            print ("%s" %(self._calls[-1], ))
        return generic_report

    def action(self, venv, msg, *args):
        self._calls.append(("action", venv, msg))
        print ("%s" %(self._calls[-1], ))
        return Action(self.session, venv, msg, args)

    def getnext(self, cat):
        __tracebackhide__ = True
        newindex = self._index + 1
        while newindex < len(self._calls):
            call = self._calls[newindex]
            lcat = call[0]
            if fnmatch(lcat, cat):
                self._index = newindex
                return call
            newindex += 1
        raise LookupError(
            "looking for %r, no reports found at >=%d in %r" %
            (cat, self._index+1, self._calls))

    def expect(self, cat, messagepattern="*", invert=False):
        __tracebackhide__ = True
        if not messagepattern.startswith("*"):
            messagepattern = "*" + messagepattern
        while self._index < len(self._calls):
            try:
                call = self.getnext(cat)
            except LookupError:
                break
            for lmsg in call[1:]:
                lmsg = str(lmsg).replace("\n", " ")
                if fnmatch(lmsg, messagepattern):
                    if invert:
                        raise AssertionError("found %s(%r), didn't expect it" %
                            (cat, messagepattern))
                    return
        if not invert:
            raise AssertionError(
             "looking for %s(%r), no reports found at >=%d in %r" %
                (cat, messagepattern, self._index+1, self._calls))

    def not_expect(self, cat, messagepattern="*"):
        return self.expect(cat, messagepattern, invert=True)

class pcallMock:
    def __init__(self, args, cwd, env, stdout, stderr, shell):
        self.arg0 = args[0]
        self.args = args[1:]
        self.cwd = cwd
        self.env = env
        self.stdout = stdout
        self.stderr = stderr
        self.shell = shell

    def communicate(self):
        return "", ""
    def wait(self):
        pass

def pytest_funcarg__mocksession(request):
    from tox._cmdline import Session
    class MockSession(Session):
        def __init__(self):
            self._clearmocks()
            self.config = request.getfuncargvalue("newconfig")([], "")
            self._actions = []
        def getenv(self, name):
            return VirtualEnv(self.config.envconfigs[name], session=self)
        def _clearmocks(self):
            self._pcalls = []
            self._spec2pkg = {}
            self.report = ReportExpectMock(self)
        def make_emptydir(self, path):
            pass
        def popen(self, args, cwd, shell=None,
            stdout=None, stderr=None, env=None):
            pm = pcallMock(args, cwd, env, stdout, stderr, shell)
            self._pcalls.append(pm)
            return pm
    return MockSession()

def pytest_funcarg__newmocksession(request):
    mocksession = request.getfuncargvalue("mocksession")
    newconfig = request.getfuncargvalue("newconfig")
    def newmocksession(args, source):
        config = newconfig(args, source)
        mocksession.config = config
        return mocksession
    return newmocksession

class Cmd:
    def __init__(self, request):
        self.tmpdir = request.getfuncargvalue("tmpdir")
        self.request = request
        current = py.path.local()
        self.request.addfinalizer(current.chdir)
    def chdir(self, target):
        target.chdir()

    def popen(self, argv, stdout, stderr, **kw):
        if not hasattr(py.std, 'subprocess'):
            py.test.skip("no subprocess module")
        env = os.environ.copy()
        env['PYTHONPATH'] = ":".join(filter(None, [
            str(os.getcwd()), env.get('PYTHONPATH', '')]))
        kw['env'] = env
        #print "env", env
        return py.std.subprocess.Popen(argv, stdout=stdout, stderr=stderr, **kw)

    def run(self, *argv):
        argv = [str(x) for x in argv]
        p1 = self.tmpdir.join("stdout")
        p2 = self.tmpdir.join("stderr")
        print("%s$ %s" % (os.getcwd(), " ".join(argv)))
        f1 = p1.open("wb")
        f2 = p2.open("wb")
        now = time.time()
        popen = self.popen(argv, stdout=f1, stderr=f2,
            close_fds=(sys.platform != "win32"))
        ret = popen.wait()
        f1.close()
        f2.close()
        out = p1.read("rb")
        out = getdecoded(out).splitlines()
        err = p2.read("rb")
        err = getdecoded(err).splitlines()
        def dump_lines(lines, fp):
            try:
                for line in lines:
                    py.builtin.print_(line, file=fp)
            except UnicodeEncodeError:
                print("couldn't print to %s because of encoding" % (fp,))
        dump_lines(out, sys.stdout)
        dump_lines(err, sys.stderr)
        return RunResult(ret, out, err, time.time()-now)

def getdecoded(out):
        try:
            return out.decode("utf-8")
        except UnicodeDecodeError:
            return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
                    py.io.saferepr(out),)

class RunResult:
    def __init__(self, ret, outlines, errlines, duration):
        self.ret = ret
        self.outlines = outlines
        self.errlines = errlines
        self.stdout = LineMatcher(outlines)
        self.stderr = LineMatcher(errlines)
        self.duration = duration

class LineMatcher:
    def __init__(self,  lines):
        self.lines = lines

    def str(self):
        return "\n".join(self.lines)

    def fnmatch_lines(self, lines2):
        if isinstance(lines2, str):
            lines2 = py.code.Source(lines2)
        if isinstance(lines2, py.code.Source):
            lines2 = lines2.strip().lines

        from fnmatch import fnmatch
        lines1 = self.lines[:]
        nextline = None
        extralines = []
        __tracebackhide__ = True
        for line in lines2:
            nomatchprinted = False
            while lines1:
                nextline = lines1.pop(0)
                if line == nextline:
                    print_("exact match:", repr(line))
                    break
                elif fnmatch(nextline, line):
                    print_("fnmatch:", repr(line))
                    print_("   with:", repr(nextline))
                    break
                else:
                    if not nomatchprinted:
                        print_("nomatch:", repr(line))
                        nomatchprinted = True
                    print_("    and:", repr(nextline))
                extralines.append(nextline)
            else:
                assert line == nextline

@pytest.fixture
def initproj(request, tmpdir):
    """ create a factory function for creating example projects. """
    def initproj(name, filedefs=None):
        if filedefs is None:
            filedefs = {}
        parts = name.split("-")
        if len(parts) == 1:
            parts.append("0.1")
        name, version = parts
        base = tmpdir.ensure(name, dir=1)
        create_files(base, filedefs)
        if 'setup.py' not in filedefs:
            create_files(base, {'setup.py': '''
                from setuptools import setup
                setup(
                    name='%(name)s',
                    description='%(name)s project',
                    version='%(version)s',
                    license='MIT',
                    platforms=['unix', 'win32'],
                    packages=['%(name)s', ],
                )
            ''' % locals()})
        if name not in filedefs:
            create_files(base, {name:
                {'__init__.py': '__version__ = %r' % version}})
        manifestlines = []
        for p in base.visit(lambda x: x.check(file=1)):
            manifestlines.append("include %s" % p.relto(base))
        create_files(base, {"MANIFEST.in": "\n".join(manifestlines)})
        print ("created project in %s" %(base,))
        base.chdir()
    return initproj

def create_files(base, filedefs):
    for key, value in filedefs.items():
        if isinstance(value, dict):
            create_files(base.ensure(key, dir=1), value)
        elif isinstance(value, str):
            s = py.std.textwrap.dedent(value)
            base.join(key).write(s)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.