Commits

Ronny Pfannschmidt committed 67b927e

create a testrunner util module and move simple functions there

  • Participants
  • Parent commits 20258fb
  • Branches refine-testrunner

Comments (0)

Files changed (5)

 ---> When pypy/__init__.py becomes empty again, we have reached stage 2.
 """)
 
+import warnings
+warnings.filterwarnings('ignore', module='_pytest.core')
+
 from _pytest.core import main, UsageError, _preloadplugins
 from _pytest import core as cmdline
 from _pytest import __version__

File testrunner/runner.py

 import sys, os, signal, thread, Queue, time
 import py
-import subprocess, optparse
+import util
 
-if sys.platform == 'win32':
-    PROCESS_TERMINATE = 0x1
-    try:
-        import win32api, pywintypes
-    except ImportError:
-        def _kill(pid, sig):
-            import ctypes
-            winapi = ctypes.windll.kernel32
-            proch = winapi.OpenProcess(PROCESS_TERMINATE, 0, pid)
-            winapi.TerminateProcess(proch, 1) == 1
-            winapi.CloseHandle(proch)
-    else:
-        def _kill(pid, sig):
-            try:
-                proch = win32api.OpenProcess(PROCESS_TERMINATE, 0, pid)
-                win32api.TerminateProcess(proch, 1)
-                win32api.CloseHandle(proch)
-            except pywintypes.error, e:
-                pass
-    #Try to avoid opeing a dialog box if one of the tests causes a system error
-    import ctypes
-    winapi = ctypes.windll.kernel32
-    SetErrorMode = winapi.SetErrorMode
-    SetErrorMode.argtypes=[ctypes.c_int]
 
-    SEM_FAILCRITICALERRORS = 1
-    SEM_NOGPFAULTERRORBOX  = 2
-    SEM_NOOPENFILEERRORBOX = 0x8000
-    flags = SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX
-    #Since there is no GetErrorMode, do a double Set
-    old_mode = SetErrorMode(flags)
-    SetErrorMode(old_mode | flags)
+import optparse
 
-    SIGKILL = SIGTERM = 0
-    READ_MODE = 'rU'
-    WRITE_MODE = 'wb'
-else:
-    def _kill(pid, sig):
-        try:
-            os.kill(pid, sig)
-        except OSError:
-            pass
+READ_MODE = 'rU'
+WRITE_MODE = 'wb'
 
-    SIGKILL = signal.SIGKILL
-    SIGTERM = signal.SIGTERM
-    READ_MODE = 'r'
-    WRITE_MODE = 'w'
 
-EXECUTEFAILED = -1001
-RUNFAILED  = -1000
-TIMEDOUT = -999
 
-def busywait(p, timeout):
-    t0 = time.time()
-    delay = 0.5
-    while True:
-        time.sleep(delay)
-        returncode = p.poll()
-        if returncode is not None:
-            return returncode
-        tnow = time.time()
-        if (tnow-t0) >= timeout:
-            return None
-        delay = min(delay * 1.15, 7.2)
-
-def run(args, cwd, out, timeout=None):
-    f = out.open('w')
-    try:
-        try:
-            p = subprocess.Popen(args, cwd=str(cwd), stdout=f, stderr=f)
-        except Exception, e:
-            f.write("Failed to run %s with cwd='%s' timeout=%s:\n"
-                    " %s\n"
-                    % (args, cwd, timeout, e))
-            return RUNFAILED
-
-        if timeout is None:
-            return p.wait()
-        else:
-            returncode = busywait(p, timeout)
-            if returncode is not None:
-                return returncode
-            # timeout!
-            _kill(p.pid, SIGTERM)
-            if busywait(p, 10) is None:
-                _kill(p.pid, SIGKILL)
-            return TIMEDOUT
-    finally:
-        f.close()
-
-def dry_run(args, cwd, out, timeout=None):
-    f = out.open('w')
-    try:
-        f.write("run %s with cwd='%s' timeout=%s\n" % (args, cwd, timeout))
-    finally:
-        f.close()
-    return 0
-
-def getsignalname(n):
-    for name, value in signal.__dict__.items():
-        if value == n and name.startswith('SIG'):
-            return name
-    return 'signal %d' % (n,)
 
 def execute_test(cwd, test, out, logfname, interp, test_driver,
                  do_dry_run=False, timeout=None,
         args[0] = os.path.join(str(cwd), interp0)
 
     if do_dry_run:
-        runfunc = dry_run
+        runfunc = util.dry_run
     else:
-        runfunc = run
+        runfunc = util.run
     
     exitcode = runfunc(args, cwd, out, timeout=timeout)
     
     return exitcode
 
-def should_report_failure(logdata):
-    # When we have an exitcode of 1, it might be because of failures
-    # that occurred "regularly", or because of another crash of py.test.
-    # We decide heuristically based on logdata: if it looks like it
-    # contains "F", "E" or "P" then it's a regular failure, otherwise
-    # we have to report it.
-    for line in logdata.splitlines():
-        if (line.startswith('F ') or
-            line.startswith('E ') or
-            line.startswith('P ')):
-            return False
-    return True
-
-def interpret_exitcode(exitcode, test, logdata=""):
-    extralog = ""
-    if exitcode:
-        failure = True
-        if exitcode != 1 or should_report_failure(logdata):
-            if exitcode > 0:
-                msg = "Exit code %d." % exitcode
-            elif exitcode == TIMEDOUT:
-                msg = "TIMEOUT"
-            elif exitcode == RUNFAILED:
-                msg = "Failed to run interp"
-            elif exitcode == EXECUTEFAILED:
-                msg = "Failed with exception in execute-test"
-            else:
-                msg = "Killed by %s." % getsignalname(-exitcode)
-            extralog = "! %s\n %s\n" % (test, msg)
-    else:
-        failure = False
-    return failure, extralog
 
 def worker(num, n, run_param, testdirs, result_queue):
     sessdir = run_param.sessdir
             print "execute-test for %r failed with:" % test
             import traceback
             traceback.print_exc()
-            exitcode = EXECUTEFAILED
+            exitcode = util.EXECUTEFAILED
 
         if one_output.check(file=1):
             output = one_output.read(READ_MODE)
         else:
             logdata = ""
 
-        failure, extralog = interpret_exitcode(exitcode, test, logdata)
+        failure, extralog = util.interpret_exitcode(exitcode, test, logdata)
 
         if extralog:
             logdata += extralog

File testrunner/test/test_runner.py

 import py, sys, os, signal, cStringIO, tempfile
 
 import runner
+import util
 import pypy
 
 pytest_script = py.path.local(pypy.__file__).dirpath('test_all.py')
 
 
-def test_busywait():
-    class FakeProcess:
-        def poll(self):
-            if timers[0] >= timers[1]:
-                return 42
-            return None
-    class FakeTime:
-        def sleep(self, delay):
-            timers[0] += delay
-        def time(self):
-            timers[2] += 1
-            return 12345678.9 + timers[0]
-    p = FakeProcess()
-    prevtime = runner.time
-    try:
-        runner.time = FakeTime()
-        #
-        timers = [0.0, 0.0, 0]
-        returncode = runner.busywait(p, 10)
-        assert returncode == 42 and 0.0 <= timers[0] <= 1.0
-        #
-        timers = [0.0, 3.0, 0]
-        returncode = runner.busywait(p, 10)
-        assert returncode == 42 and 3.0 <= timers[0] <= 5.0 and timers[2] <= 10
-        #
-        timers = [0.0, 500.0, 0]
-        returncode = runner.busywait(p, 1000)
-        assert returncode == 42 and 500.0<=timers[0]<=510.0 and timers[2]<=100
-        #
-        timers = [0.0, 500.0, 0]
-        returncode = runner.busywait(p, 100)    # get a timeout
-        assert returncode == None and 100.0 <= timers[0] <= 110.0
-        #
-    finally:
-        runner.time = prevtime
-
-def test_should_report_failure():
-    should_report_failure = runner.should_report_failure
-    assert should_report_failure("")
-    assert should_report_failure(". Abc\n. Def\n")
-    assert should_report_failure("s Ghi\n")
-    assert not should_report_failure(". Abc\nF Def\n")
-    assert not should_report_failure(". Abc\nE Def\n")
-    assert not should_report_failure(". Abc\nP Def\n")
-    assert not should_report_failure("F Def\n. Ghi\n. Jkl\n")
-
-
-
-class TestRunHelper(object):
-    def pytest_funcarg__out(self, request):
-        tmpdir = request.getfuncargvalue('tmpdir')
-        return tmpdir.ensure('out')
-
-    def test_run(self, out):
-        res = runner.run([sys.executable, "-c", "print 42"], '.', out)
-        assert res == 0
-        assert out.read() == "42\n"
-
-    def test_error(self, out):
-        res = runner.run([sys.executable, "-c", "import sys; sys.exit(3)"], '.', out)
-        assert res == 3
-
-    def test_signal(self, out):
-        if sys.platform == 'win32':
-            py.test.skip("no death by signal on windows")
-        res = runner.run([sys.executable, "-c", "import os; os.kill(os.getpid(), 9)"], '.', out)
-        assert res == -9
-
-    def test_timeout(self, out):
-        res = runner.run([sys.executable, "-c", "while True: pass"], '.', out, timeout=3)
-        assert res == -999
-
-    def test_timeout_lock(self, out):
-        res = runner.run([sys.executable, "-c", "import threading; l=threading.Lock(); l.acquire(); l.acquire()"], '.', out, timeout=3)
-        assert res == -999
-
-    def test_timeout_syscall(self, out):
-        res = runner.run([sys.executable, "-c", "import socket; s=s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.bind(('', 0)); s.recv(1000)"], '.', out, timeout=3)
-        assert res == -999        
-
-    def test_timeout_success(self, out):
-        res = runner.run([sys.executable, "-c", "print 42"], '.',
-                         out, timeout=2)
-        assert res == 0
-        out = out.read()
-        assert out == "42\n"        
 
 
 class TestExecuteTest(object):
 
-    def setup_class(cls):
-        cls.real_run = (runner.run,)
-        cls.called = []
-        cls.exitcode = [0]
-        
+    def pytest_funcarg__info(self, request):
+        monkeypatch = request.getfuncargvalue('monkeypatch')
+        info = {'exitcode' : 0}
         def fake_run(args, cwd, out, timeout):
-            cls.called = (args, cwd, out, timeout)
-            return cls.exitcode[0]
-        runner.run = fake_run
+            info['called'] = (args, cwd, out, timeout)
+            return info['exitcode']
+        monkeypatch.setattr(util, 'run', fake_run)
+        return info
 
-    def teardown_class(cls):
-        runner.run = cls.real_run[0]
 
-    def test_explicit(self):
+    def test_explicit(self, info):
         res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
                                   interp=['INTERP', 'IARG'],
                                   test_driver=['driver', 'darg'],
 
                     'test_one']
 
-        assert self.called == (expected, '/wd', 'out', 'secs')        
+        assert info['called'] == (expected, '/wd', 'out', 'secs')        
         assert res == 0
 
-    def test_explicit_win32(self):
+    def test_explicit_win32(self, info):
         res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
                                   interp=['./INTERP', 'IARG'],
                                   test_driver=['driver', 'darg'],
                     '--resultlog=LOGFILE',
                     '--junitxml=LOGFILE.junit',
                     'test_one']
-        assert self.called[0] == expected
-        assert self.called == (expected, '/wd', 'out', 'secs')        
+        assert info['called'][0] == expected
+        assert info['called'] == (expected, '/wd', 'out', 'secs') 
         assert res == 0
 
-    def test_error(self):
-        self.exitcode[:] = [1]
+    def test_error(self, info):
+        info['exitcode'] = 1
         res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
                                   interp=['INTERP', 'IARG'],
                                   test_driver=['driver', 'darg'])
         assert res == 1
 
 
-        self.exitcode[:] = [-signal.SIGSEGV]
+        info['exitcode'] = -signal.SIGSEGV
         res = runner.execute_test('/wd', 'test_one', 'out', 'LOGFILE',
                                   interp=['INTERP', 'IARG'],
                                   test_driver=['driver', 'darg'])
         assert res == -signal.SIGSEGV
 
-    def test_interpret_exitcode(self):
-        failure, extralog = runner.interpret_exitcode(0, "test_foo")
-        assert not failure
-        assert extralog == ""
-
-        failure, extralog = runner.interpret_exitcode(1, "test_foo", "")
-        assert failure
-        assert extralog == """! test_foo
- Exit code 1.
-"""
-
-        failure, extralog = runner.interpret_exitcode(1, "test_foo", "F Foo\n")
-        assert failure
-        assert extralog == ""
-
-        failure, extralog = runner.interpret_exitcode(2, "test_foo")
-        assert failure
-        assert extralog == """! test_foo
- Exit code 2.
-"""
-
-        failure, extralog = runner.interpret_exitcode(-signal.SIGSEGV,
-                                                      "test_foo")
-        assert failure
-        assert extralog == """! test_foo
- Killed by SIGSEGV.
-"""
 
 class RunnerTests(object):
     with_thread = True

File testrunner/test/test_util.py

+import util
+import runner
+
+def test_busywait():
+    class FakeProcess:
+        def poll(self):
+            if timers[0] >= timers[1]:
+                return 42
+            return None
+    class FakeTime:
+        def sleep(self, delay):
+            timers[0] += delay
+        def time(self):
+            timers[2] += 1
+            return 12345678.9 + timers[0]
+    p = FakeProcess()
+    prevtime = runner.time
+    try:
+        runner.time = FakeTime()
+        #
+        timers = [0.0, 0.0, 0]
+        returncode = util.busywait(p, 10)
+        assert returncode == 42 and 0.0 <= timers[0] <= 1.0
+        #
+        timers = [0.0, 3.0, 0]
+        returncode = util.busywait(p, 10)
+        assert returncode == 42 and 3.0 <= timers[0] <= 5.0 and timers[2] <= 10
+        #
+        timers = [0.0, 500.0, 0]
+        returncode = util.busywait(p, 1000)
+        assert returncode == 42 and 500.0<=timers[0]<=510.0 and timers[2]<=100
+        #
+        timers = [0.0, 500.0, 0]
+        returncode = util.busywait(p, 100)    # get a timeout
+        assert returncode == None and 100.0 <= timers[0] <= 110.0
+        #
+    finally:
+        runner.time = prevtime
+
+def test_should_report_failure():
+    should_report_failure = util.should_report_failure
+    assert should_report_failure("")
+    assert should_report_failure(". Abc\n. Def\n")
+    assert should_report_failure("s Ghi\n")
+    assert not should_report_failure(". Abc\nF Def\n")
+    assert not should_report_failure(". Abc\nE Def\n")
+    assert not should_report_failure(". Abc\nP Def\n")
+    assert not should_report_failure("F Def\n. Ghi\n. Jkl\n")
+
+
+class TestRunHelper(object):
+    def pytest_funcarg__out(self, request):
+        tmpdir = request.getfuncargvalue('tmpdir')
+        return tmpdir.ensure('out')
+
+    def test_run(self, out):
+        res = util.run([sys.executable, "-c", "print 42"], '.', out)
+        assert res == 0
+        assert out.read() == "42\n"
+
+    def test_error(self, out):
+        res = util.run([sys.executable, "-c", "import sys; sys.exit(3)"], '.', out)
+        assert res == 3
+
+    def test_signal(self, out):
+        if sys.platform == 'win32':
+            py.test.skip("no death by signal on windows")
+        res = util.run([sys.executable, "-c", "import os; os.kill(os.getpid(), 9)"], '.', out)
+        assert res == -9
+
+    def test_timeout(self, out):
+        res = util.run([sys.executable, "-c", "while True: pass"], '.', out, timeout=3)
+        assert res == -999
+
+    def test_timeout_lock(self, out):
+        res = util.run([sys.executable, "-c", "import threading; l=threading.Lock(); l.acquire(); l.acquire()"], '.', out, timeout=3)
+        assert res == -999
+
+    def test_timeout_syscall(self, out):
+        res = util.run([sys.executable, "-c", "import socket; s=s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM); s.bind(('', 0)); s.recv(1000)"], '.', out, timeout=3)
+        assert res == -999        
+
+    def test_timeout_success(self, out):
+        res = util.run([sys.executable, "-c", "print 42"], '.',
+                         out, timeout=2)
+        assert res == 0
+        out = out.read()
+        assert out == "42\n"        
+
+
+
+def test_interpret_exitcode():
+    failure, extralog = util.interpret_exitcode(0, "test_foo")
+    assert not failure
+    assert extralog == ""
+
+    failure, extralog = runner.interpret_exitcode(1, "test_foo", "")
+    assert failure
+    assert extralog == """! test_foo
+Exit code 1.
+"""
+
+    failure, extralog = runner.interpret_exitcode(1, "test_foo", "F Foo\n")
+    assert failure
+    assert extralog == ""
+
+    failure, extralog = runner.interpret_exitcode(2, "test_foo")
+    assert failure
+    assert extralog == """! test_foo
+Exit code 2.
+"""
+
+    failure, extralog = runner.interpret_exitcode(-signal.SIGSEGV,
+                                                  "test_foo")
+    assert failure
+    assert extralog == """! test_foo
+ Killed by SIGSEGV.
+"""

File testrunner/util.py

+import sys
+import os
+import subprocess
+import signal
+import time
+
+if sys.platform == 'win32':
+    PROCESS_TERMINATE = 0x1
+    try:
+        import win32api, pywintypes
+    except ImportError:
+        def _kill(pid, sig):
+            import ctypes
+            winapi = ctypes.windll.kernel32
+            proch = winapi.OpenProcess(PROCESS_TERMINATE, 0, pid)
+            winapi.TerminateProcess(proch, 1) == 1
+            winapi.CloseHandle(proch)
+    else:
+        def _kill(pid, sig):
+            try:
+                proch = win32api.OpenProcess(PROCESS_TERMINATE, 0, pid)
+                win32api.TerminateProcess(proch, 1)
+                win32api.CloseHandle(proch)
+            except pywintypes.error:
+                pass
+    #Try to avoid opeing a dialog box if one of the tests causes a system error
+    import ctypes
+    winapi = ctypes.windll.kernel32
+    SetErrorMode = winapi.SetErrorMode
+    SetErrorMode.argtypes=[ctypes.c_int]
+
+    SEM_FAILCRITICALERRORS = 1
+    SEM_NOGPFAULTERRORBOX  = 2
+    SEM_NOOPENFILEERRORBOX = 0x8000
+    flags = SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX
+    #Since there is no GetErrorMode, do a double Set
+    old_mode = SetErrorMode(flags)
+    SetErrorMode(old_mode | flags)
+
+    SIGKILL = SIGTERM = 0
+else:
+    def _kill(pid, sig):
+        try:
+            os.kill(pid, sig)
+        except OSError:
+            pass
+
+    SIGKILL = signal.SIGKILL
+    SIGTERM = signal.SIGTERM
+
+
+EXECUTEFAILED = -1001
+RUNFAILED  = -1000
+TIMEDOUT = -999
+
+
+def getsignalname(n):
+    for name, value in signal.__dict__.items():
+        if value == n and name.startswith('SIG'):
+            return name
+    return 'signal %d' % (n,)
+
+
+def should_report_failure(logdata):
+    # When we have an exitcode of 1, it might be because of failures
+    # that occurred "regularly", or because of another crash of py.test.
+    # We decide heuristically based on logdata: if it looks like it
+    # contains "F", "E" or "P" then it's a regular failure, otherwise
+    # we have to report it.
+    for line in logdata.splitlines():
+        if (line.startswith('F ') or
+            line.startswith('E ') or
+            line.startswith('P ')):
+            return False
+    return True
+
+
+
+def busywait(p, timeout):
+    t0 = time.time()
+    delay = 0.5
+    while True:
+        time.sleep(delay)
+        returncode = p.poll()
+        if returncode is not None:
+            return returncode
+        tnow = time.time()
+        if (tnow-t0) >= timeout:
+            return None
+        delay = min(delay * 1.15, 7.2)
+
+
+
+def interpret_exitcode(exitcode, test, logdata=""):
+    extralog = ""
+    if exitcode:
+        failure = True
+        if exitcode != 1 or should_report_failure(logdata):
+            if exitcode > 0:
+                msg = "Exit code %d." % exitcode
+            elif exitcode == TIMEDOUT:
+                msg = "TIMEOUT"
+            elif exitcode == RUNFAILED:
+                msg = "Failed to run interp"
+            elif exitcode == EXECUTEFAILED:
+                msg = "Failed with exception in execute-test"
+            else:
+                msg = "Killed by %s." % getsignalname(-exitcode)
+            extralog = "! %s\n %s\n" % (test, msg)
+    else:
+        failure = False
+    return failure, extralog
+
+
+
+def run(args, cwd, out, timeout=None):
+    with out.open('w') as f:
+        try:
+            p = subprocess.Popen(args, cwd=str(cwd), stdout=f, stderr=f)
+        except Exception, e:
+            f.write("Failed to run %s with cwd='%s' timeout=%s:\n"
+                    " %s\n"
+                    % (args, cwd, timeout, e))
+            return RUNFAILED
+
+        if timeout is None:
+            return p.wait()
+        else:
+            returncode = busywait(p, timeout)
+            if returncode is not None:
+                return returncode
+            # timeout!
+            _kill(p.pid, SIGTERM)
+            if busywait(p, 10) is None:
+                _kill(p.pid, SIGKILL)
+            return TIMEDOUT
+
+
+def dry_run(args, cwd, out, timeout=None):
+    with out.open('w') as f:
+        f.write("run %s with cwd='%s' timeout=%s\n" % (args, cwd, timeout))
+    return 0