Source

xnet / conftest.py


#rsyncdirs = ['xnet', 'testing']

## an elaborate command line tester, see test_z_functional.py for some usage
from py.builtin import print_

GREEN=1
if GREEN:
    from eventlet.green import subprocess
    from eventlet import queue
    from eventlet import spawn
else:
    import subprocess
    import Queue as queue
    import threading
    def spawn(func):
        thread = threading.Thread(target=func)
        thread.setDaemon(True)
        thread.start()

from fnmatch import fnmatch
import sys

class Output:
    def __init__(self, pipe, forward=None):
        self.pipe = pipe
        self.lines = queue.Queue()
        self.forward = forward
        spawn(self.read_pipe)

    def read_pipe(self):
        while 1:
            line = self.pipe.readline()
            if not line:
                self.lines.put(None)
                break
            self.lines.put(line)
            if self.forward:
                self.forward.write(line)
                self.forward.flush()

    def expectline(self, pattern):
        read_so_far = []
        while 1:
            line = self.lines.get(timeout=5)
            if line is None:
                read = "".join(read_so_far)
                raise EOFError("pattern %r not found in\n%s" % (
                    pattern, read))
            if fnmatch(line, pattern):
                return line
            read_so_far.append(line)

class Proc:
    def __init__(self, args):
        self.popen = subprocess.Popen(args, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE, )
        self.pid = self.popen.pid
        self.out = Output(self.popen.stdout, sys.stdout)
        self.err = Output(self.popen.stderr, sys.stderr)

    def terminate(self):
        try:
            self.popen.terminate()
        except OSError:
            pass
    def wait(self):
        return self.popen.wait()

class CommandManager:
    def __init__(self, tmpdir):
        self.tmpdir = tmpdir
        self.active = []

    def start(self, argstring):
        args = argstring.split()
        proc = Proc(args)
        self.active.append(proc)
        return proc

    def run(self, argstring):
        proc = self.start(argstring)
        return proc.wait()

    def runok(self, argstring):
        proc = self.start(argstring)
        ret = proc.wait()
        assert ret == 0
        return proc

    def killall(self):
        while self.active:
            popen = self.active.pop()
            print_("terminating", popen.pid)
            popen.terminate()
            print_("waiting", popen.pid)
            ret = popen.wait()
            print_("retcode", ret)

def pytest_funcarg__cmd(request):
    tmpdir = request.getfuncargvalue("tmpdir")
    cmd = CommandManager(tmpdir)
    request.addfinalizer(cmd.killall)
    return cmd