Commits

akalias  committed 083254a

fixed bug in run_tests.py, add mixer_music_test.py

  • Participants
  • Parent commits c587b8f

Comments (0)

Files changed (25)

File async_sub.py

 
 ################################################################################
 
-def proc_in_time_or_kill(cmd, time_out):
+def proc_in_time_or_kill(cmd, time_out, wd = None, env = None):
     proc = Popen (
-        cmd, bufsize = -1,
+        cmd, cwd = wd, env = env,
         stdin = subprocess.PIPE, stdout = subprocess.PIPE, 
         stderr = subprocess.STDOUT, universal_newlines = 1
     )
         try:
             proc.kill()
             ret_code += 'and was successfully terminated"'
-        except (win32api.error, OSError), e:
+        except Exception, e:
             ret_code += 'and termination failed (exception: %s)"' % e
 
     return ret_code, ''.join(response) #+ ''.join(err)

File run_tests.py

-#!/usr/bin/env python
-
-"""
-
-Test runner for pygame unittests:
-
-By default, runs all test/xxxx_test.py files in a single process.
-
-Option to run tests in subprocesses using subprocess and async_sub. Will poll 
-tests for return code and if tests don't return after TIME_OUT, will kill 
-process with os.kill. On win32 platform win32api.TerminateProcess is used.
-
-Dependencies:
-    async_sub.py:
-        Requires win32 extensions when run on windows
-
-"""
-
 #################################### IMPORTS ###################################
 
 import sys, os, re, unittest, subprocess, time, optparse
 import pygame.threads 
 
+from test_runner import run_test, TEST_RESULTS_RE
+from pprint import pformat
+
 # async_sub imported if needed when run in subprocess mode
 
 main_dir = os.path.split(os.path.abspath(sys.argv[0]))[0]
 test_subdir = os.path.join(main_dir, 'test')
 fake_test_subdir = os.path.join(test_subdir, 'run_tests__tests')
+test_runner_py = os.path.join(main_dir, "test_runner.py")
 
 sys.path.insert(0, test_subdir)
-
 import test_utils
 
 ################################### CONSTANTS ##################################
 Traceback (most recent call last):
   File "test\%(module)s.py", line 1, in all_tests_for
 
-subprocess completely failed with return code of %(ret_code)s
+subprocess completely failed with return code of %(return_code)s
 
-cmd: %(cmd)s
+cmd:          %(cmd)s
+test_env:     %(test_env)s
+working_dir:  %(working_dir)s
 
-return (abbrv):
-%(ret)s
+return (top 5 lines):
+%(raw_return)s
 
 """  # Leave that last empty line else build page regex won't match
 
+TEST_MODULE_RE = re.compile('^(.+_test)\.py$')
+
 RAN_TESTS_DIV = (70 * "-") + "\nRan"
 
 DOTS = re.compile("^([FE.]*)$", re.MULTILINE)
 
-TEST_MODULE_RE = re.compile('^(.+_test)\.py$')
-
 ################################################################################
 # Set the command line options
 #
 """
 
 opt_parser = optparse.OptionParser(USEAGE)
-opt_parser.add_option(
-     "-v",  "--verbose", action = 'store_true',
-     help   = "be verbose in output (only single process mode)" )
 
 opt_parser.add_option (
      "-i",  "--incomplete", action = 'store_true',
      help   = "fail incomplete tests (only single process mode)" )
 
 opt_parser.add_option (
-     "-r",  "--redirect", action = 'store_true',
-     help   = "redirect stderr/stdio, print only test results" )
+     "-s",  "--subprocess", action = 'store_true',
+     help   = "run test suites in subprocesses (default: same process)" )
 
 opt_parser.add_option (
-     "-s",  "--subprocess", action = 'store_true',
-     help   = "run test suites in subprocesses (default: same process)" )
+     "-d",  "--dump", action = 'store_true',
+     help   = "dump results as dict ready to eval" )
 
 opt_parser.add_option (
      "-m",  "--multi_thread", metavar = 'THREADS', type = 'int',
 # this is used for testing subprocess output against single process mode
 
 if options.fake:
-    os.environ.update({"PYTHONPATH" : test_subdir})
     test_subdir = os.path.join(fake_test_subdir, options.fake )
     sys.path.append(test_subdir)
+    working_dir = test_subdir
+else:
+    working_dir = main_dir
 
-os.chdir(main_dir)
+test_env = {"PYTHONPATH": test_subdir}
+os.chdir(working_dir)
 
 test_modules = []
 for f in sorted(os.listdir(test_subdir)):
         test_modules.append(match)
 
 ################################################################################
-# Run all the tests in one process
-# unittest.TextTestRunner().run(unittest.TestSuite())
+# Single process mode
 #
 
 if not options.subprocess:
-    ## INITIATE TEST SUITE
-    suite = unittest.TestSuite()
-    if options.redirect: test_out, runner = test_utils.StringIO_TextTestRunner()
-    else: runner = unittest.TextTestRunner()
-
-    ## LOAD THE TEST MODULES AND COMPILE TESTS
-    for module in [m for m in test_modules if m not in IGNORE]:
-        print 'loading ' + module
-        __import__( module )
-        test = unittest.defaultTestLoader.loadTestsFromName( module )
-        suite.addTest( test )
-    
-    ## REDIRECT STDERR, STDOUT
-    if options.redirect:
-        (stderr, stdout), redirected = test_utils.redirect_io()
-
-    ## GET OPTIONS AND RUN THE TESTS    
     test_utils.fail_incomplete_tests = options.incomplete
-    if options.verbose: runner.verbosity = 2
-    runner.run( suite )
-
-    ## RETURN STDERR, STDOUT
-    if options.redirect:
-        sys.stderr, sys.stdout = stderr, stdout
-        test_out.seek(0)
-        sys.stderr.write(test_out.read())
-        # redirected has anything >> stderr | stdout
-
-    ###########################
-    # SYS.EXIT() FLOW CONTROL #
-    ###########################
-    sys.exit()
+    single_results = run_test([m for m in test_modules if m not in IGNORE])
+    if options.dump: print pformat(single_results)
+    else: print single_results['output']
 
 ################################################################################
-# Runs an individual xxxx_test.py test suite in a subprocess
+# Subprocess mode
 #
 
-import async_sub
+def count(results, *args):
+    for arg in args:
+        all_of = [a for a in [v.get(arg) for v in results.values()] if a]
+        if not all_of: yield 0
+        else:
+            yield sum (
+            isinstance(all_of[0], int) and all_of or (len(v) for v in all_of)
+        )
 
-def run_test(args):
-    module, cmd = args
-    print 'loading %s' % module
-    ret_code, response = async_sub.proc_in_time_or_kill (
-        cmd, time_out = options.time_out
-    )
-    return cmd, module, ret_code, response
+def combine_results(all_results, t):
+    """
+
+    Return pieced together subprocessed results in a form fit for human 
+    consumption. Don't rely on results. Was originally meant for that purpose 
+    but was found to be unreliable. See options.dump for reliable results.
+
+    """
+
+    all_dots = ''
+    failures = []
+
+    for module, results in sorted(all_results.items()):
+        output, return_code, raw_return = map (
+            results.get, ('output','return_code', 'raw_return')
+        )
+
+        if not output or (return_code and RAN_TESTS_DIV not in output):
+            results['raw_return'] = ''.join(raw_return.splitlines(1)[:5])
+            failures.append( COMPLETE_FAILURE_TEMPLATE % results )
+            continue
+
+        dots = DOTS.search(output).group(1)
+        all_dots += dots
+
+        if 'E' in dots or 'F' in dots:
+            failures.append( output[len(dots)+1:].split(RAN_TESTS_DIV)[0] )
+    
+    total_fails, total_errors = map(all_dots.count, 'FE')
+    total_tests = len(all_dots)
+
+    combined = [all_dots]
+    if failures: combined += [''.join(failures).lstrip('\n')[:-1]]
+    combined += ["%s %s tests in %.3fs\n" % (RAN_TESTS_DIV, total_tests, t)]
+
+    if not failures: combined += ['OK\n']
+    else: combined += [
+        'FAILED (%s)\n' % ', '.join (
+            (total_fails  and ["failures=%s" % total_fails] or []) +
+            (total_errors and ["errors=%s"  % total_errors] or [])
+        )]
+
+    return total_tests, '\n'.join(combined)
 
 ################################################################################
-# Run all the tests in subprocesses
-#
-flags = []
-if options.redirect: flags += ['-r']
-test_cmds = [ 
-    (m, [options.python, os.path.join(test_subdir, '%s.py' % m)] + flags)
-        for m in test_modules if  m not in SUBPROCESS_IGNORE 
-]
 
-t = time.time()
+if options.subprocess:
+    from async_sub import proc_in_time_or_kill
 
-if options.multi_thread:
-    test_results = pygame.threads.tmap (
-        run_test, test_cmds,
-        stop_on_error = False,
-        num_workers = options.multi_thread
-    )
-else:
-    test_results = map(run_test, test_cmds)
+    def sub_test(module):
+        print 'loading', module
+        
+        cmd = [options.python, test_runner_py, module ]
 
-t = time.time() - t
+        return module, (cmd, test_env, working_dir), proc_in_time_or_kill (
+            cmd,
+            options.time_out,
+            env = test_env,
+            wd = working_dir,
+        )
+    
+    if options.multi_thread:
+        def tmap(f, args):
+            return pygame.threads.tmap (
+                f, args, stop_on_error = False,
+                num_workers = options.multi_thread
+            )
+    else: tmap = map
+        
 
-################################################################################
-# Combine subprocessed TextTestRunner() results to mimick single run
-# Puts complete failures in a form the build page will pick up
-#
-# NOTE: regexes will possibly fail if tests are noisy on stdout or stderr
-#       so use -r --redirect io option when using subprocess
+    test_modules = (m for m in test_modules if m not in SUBPROCESS_IGNORE)
+    results = {}
 
-def print_err(*args):
-    for arg in args: sys.stderr.write(arg + '\n')
+    t = time.time()
 
-all_dots = ''
-failures = []
+    for module, proc, (return_code, raw_return) in tmap(sub_test, test_modules):
+        cmd, test_env, working_dir = proc
 
-for cmd, module, ret_code, ret in test_results:
-    if ret_code and RAN_TESTS_DIV not in ret:
-        ret = ''.join(ret.splitlines(1)[:5])
-        failures.append( COMPLETE_FAILURE_TEMPLATE % locals() )
-        continue
+        test_results = TEST_RESULTS_RE.search(raw_return)
+        if test_results: 
+            try:     results.update(eval(test_results.group(1)))
+            except:  raise Exception("BUGGY EVAL:\n %s" % test_results.group(1))
 
-    dots = DOTS.search(ret).group(1)
-    all_dots += dots
+        else: results[module] = {}
 
-    if 'E' in dots or 'F' in dots:
-        failure = ret[len(dots)+1:].split(RAN_TESTS_DIV)[0]
-        failures.append (
-            failure.replace( "(__main__.", "(%s." % module)
+        results[module].update (
+            {
+                'return_code': return_code,
+                'raw_return' : raw_return,
+                'cmd'        : cmd,
+                'test_env'   : test_env,
+                'working_dir': working_dir,
+                'module'     : module,
+            }
         )
 
-total_fails, total_errors = map(all_dots.count, 'FE')
-total_tests = len(all_dots)
+    untrusty_total, combined = combine_results(results, time.time() -t)
+    errors, failures, total  = count(results, 'errors', 'failures', 'num_tests')
 
-print_err(all_dots)
-if failures: print_err(''.join(failures).lstrip('\n')[:-1])
-print_err("%s %s tests in %.3fs\n" % (RAN_TESTS_DIV, total_tests, t))
+    if not options.dump and untrusty_total == total:
+        print combined
+    else:
+        for module, result in results.items():
+            for breaker in ['errors', 'return_code', 'failures']:
+                if breaker not in result or result[breaker]:
+                    print pformat(result)
 
-if not failures:
-    print_err('OK')
-else:
-    print_err ('FAILED (%s)' % ', '.join (
-        (total_fails  and ["failures=%s" % total_fails] or []) +
-        (total_errors and ["errors=%s"  % total_errors] or [])
-    ))
+        print "Tests:%s Errors:%s Failures:%s"% (total, errors, failures)
 
 ################################################################################

File test/mixer_music_test.py

+import unittest, os, pygame
+from test_utils import test_not_implemented
+
+class MixerMusicModuleTest(unittest.TestCase):
+    def test_load(self):
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.load:
+        
+          # pygame.mixer.music.load(filename): return None
+          # Load a music file for playback
+        
+        data_fname = os.path.join('examples', 'data')
+        pygame.init()
+        #note, I just added house_lo.mus to svn.
+        #formats = ['ogg', 'wav', 'mp3']
+        formats = ['mp3']
+        for f in formats:
+            musfn = os.path.join(data_fname, 'house_lo.%s' % f)
+    
+            pygame.mixer.music.load(musfn)
+            pygame.mixer.music.load(open(musfn))
+            musf = open(musfn)
+            pygame.mixer.music.load(musf)
+    
+    
+    def test_queue(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.queue:
+
+          # pygame.mixer.music.queue(filename): return None
+          # queue a music file to follow the current
+
+        self.assert_(test_not_implemented())
+
+    def test_stop(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.stop:
+
+          # pygame.mixer.music.stop(): return None
+          # stop the music playback
+
+        self.assert_(test_not_implemented())
+
+    def test_rewind(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.rewind:
+
+          # pygame.mixer.music.rewind(): return None
+          # restart music
+
+        self.assert_(test_not_implemented())
+
+    def test_get_pos(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.get_pos:
+
+          # pygame.mixer.music.get_pos(): return time
+          # get the music play time
+
+        self.assert_(test_not_implemented())
+
+    def test_fadeout(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.fadeout:
+
+          # pygame.mixer.music.fadeout(time): return None
+          # stop music playback after fading out
+
+        self.assert_(test_not_implemented())
+
+    def test_play(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.play:
+
+          # pygame.mixer.music.play(loops=0, start=0.0): return None
+          # Start the playback of the music stream
+
+        self.assert_(test_not_implemented())
+
+    def test_get_volume(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.get_volume:
+
+          # pygame.mixer.music.get_volume(): return value
+          # get the music volume
+
+        self.assert_(test_not_implemented())
+
+    def test_set_endevent(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.set_endevent:
+
+          # pygame.mixer.music.set_endevent(): return None
+          # pygame.mixer.music.set_endevent(type): return None
+          # have the music send an event when playback stops
+
+        self.assert_(test_not_implemented())
+
+    def test_pause(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.pause:
+
+          # pygame.mixer.music.pause(): return None
+          # temporarily stop music playback
+
+        self.assert_(test_not_implemented())
+
+    def test_get_busy(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.get_busy:
+
+          # pygame.mixer.music.get_busy(): return bool
+          # check if the music stream is playing
+
+        self.assert_(test_not_implemented())
+
+    def test_get_endevent(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.get_endevent:
+
+          # pygame.mixer.music.get_endevent(): return type
+          # get the event a channel sends when playback stops
+
+        self.assert_(test_not_implemented())
+
+    def test_unpause(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.unpause:
+
+          # pygame.mixer.music.unpause(): return None
+          # resume paused music
+
+        self.assert_(test_not_implemented())
+
+    def test_set_volume(self):
+
+        # __doc__ (as of 2008-07-13) for pygame.mixer_music.set_volume:
+
+          # pygame.mixer.music.set_volume(value): return None
+          # set the music volume
+
+        self.assert_(test_not_implemented())
+        
+if __name__ == '__main__':
+    unittest.main()

File test/mixer_test.py

           # stop playback of all sound channels
     
         self.assert_(test_not_implemented())
-        
-
-    def test_mixer_music__load(self):
-        data_fname = os.path.join('examples', 'data')
-        pygame.init()
-        #note, I just added house_lo.mus to svn.
-        #formats = ['ogg', 'wav', 'mp3']
-        formats = ['mp3']
-        for f in formats:
-            musfn = os.path.join(data_fname, 'house_lo.%s' % f)
-
-            pygame.mixer.music.load(musfn)
-            pygame.mixer.music.load(open(musfn))
-            musf = open(musfn)
-            pygame.mixer.music.load(musf)
-
 
 ############################## CHANNEL CLASS TESTS #############################
 

File test/run_tests__tests/all_ok/fake_2_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/fake_3_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/fake_4_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/fake_5_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/fake_6_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/no_assertions(ret_code_of_1)_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         pass
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/all_ok/zero_tests_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     pass
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/failures1/fake_2_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/failures1/fake_3_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/failures1/fake_4_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/infinite_loop/fake_1_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/infinite_loop/fake_2_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stderr/fake_2_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stderr/fake_3_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stderr/fake_4_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stdout/fake_2_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stdout/fake_3_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/print_stdout/fake_4_test.py

-import unittest, test_utils
+import unittest
 
 class KeyModuleTest(unittest.TestCase):
     def test_get_focused(self):
         self.assert_(True) 
 
 if __name__ == '__main__':
-    test_utils.get_command_line_options()
+    unittest.main()

File test/run_tests__tests/run_tests__test.py

 NORMALIZERS = (
     (r"Ran (\d+) tests in (\d+\.\d+)s",   "Ran \\1 tests in X.XXXs" ),
     (r'File ".*?([^/\\.]+\.py)"',         'File "\\1"'),
-
-    #TODO: look into why os.path.sep differs
 )
 
 ################################################################################
         cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd = cd,
 	universal_newlines = True,
     )
-    assert not proc.wait()
+    if proc.wait():
+        print cmd, proc.wait()
+        raise Exception(proc.stdout.read())
+
     return proc.stdout.read()
 
 ################################################################################
 unnormed_diff = '-u' in sys.argv
 verbose = '-v' in sys.argv or unnormed_diff
 if '-h' in sys.argv or '--help' in sys.argv: sys.exit (
+    "\nCOMPARES OUTPUT OF SINGLE VS SUBPROCESS MODE OF RUN_TESTS.PY\n\n"
     '-v, to output diffs even on success\n'
-    '-u, to output diffs of unnormalized tests'
+    '-u, to output diffs of unnormalized tests\n\n'
+    "Each line of a Differ delta begins with a two-letter code:\n\n"
+    "    '- '    line unique to sequence 1\n"
+    "    '+ '    line unique to sequence 2\n"
+    "    '  '    line common to both sequences\n"
+    "    '? '    line not present in either input sequence\n"
 )
 
 main_dir  = os.path.split(os.path.abspath(sys.argv[0]))[0]
 trunk_dir = os.path.normpath(os.path.join(main_dir, '../../'))
 
-os.environ.update({"PYTHONPATH" : os.path.join(trunk_dir, 'test')})
-
 test_suite_dirs = [x for x in os.listdir(main_dir) 
                            if os.path.isdir(os.path.join(main_dir, x))
                            and x not in IGNORE ]
 # Test that output is the same in single process and subprocess modes 
 #
 
-cmd = [sys.executable, 'run_tests.py', '-r', '-f']
-sub_cmd = [sys.executable, 'run_tests.py', '-r', '-s', '-f']
+base_cmd = [sys.executable, 'run_tests.py']
 
-time_out_cmd = [
-    sys.executable, 'run_tests.py', '-t', '4', '-s', '-f', 'infinite_loop',
-]
+cmd = base_cmd + ['-f']
+sub_cmd = base_cmd + ['-s', '-f']
+time_out_cmd =  base_cmd  + ['-t', '4', '-s', '-f', 'infinite_loop' ]
 
 passes = 0
 failed = False
 
 print "infinite_loop suite (subprocess mode timeout)",
 loop_test = call_proc(time_out_cmd, trunk_dir)
+assert "ERROR: all_tests_for" in loop_test
 passes += 1
 print "OK"
 

File test/test_utils.py

 #################################### IMPORTS ###################################
 
-import tempfile, sys, pygame, unittest, StringIO
+import tempfile, sys, pygame, unittest
 
 ############################### INCOMPLETE TESTS ###############################
 
 def get_command_line_options():
     global fail_incomplete_tests
     if check_option("--incomplete", "-i"):  fail_incomplete_tests = 1
-    if check_option("--redirect", "-r"):    low_noise_test()
-    else: unittest.main()
+    
+    unittest.main()
 
 ################################## TEMP FILES ##################################
 
 def get_tmp_dir():
     return tempfile.mkdtemp()
 
-############################# UNITTEST EXTENSIONS ##############################
-
-REDIRECT_DIVISION = "<[[! MULTIPLEXED UNITTEST, STDERR, STDOUT STARTS HERE !]]>"
-
-class Redirect(StringIO.StringIO):
-    def write(self, val):
-        sys.stdout.write(val)
-        StringIO.StringIO.write(self, val)
-        
-def StringIO_TextTestRunner():
-    test_out = Redirect()
-    runner = unittest.TextTestRunner(stream=test_out)
-    return test_out, runner
-
-def redirect_io():
-    yield sys.stderr, sys.stdout
-    sys.stderr = sys.stdout = StringIO.StringIO()
-    yield sys.stdout
-
-class Main(unittest.main):
-    def runTests(self):
-        self.testRunner.run(self.test)
-
-def low_noise_test(exit=True):
-    (oerr, oout), rerr_out = redirect_io()
-
-    test_out, runner = StringIO_TextTestRunner()
-    Main(testRunner = runner)
-
-    sys.stderr, sys.stdout = oerr, oout
-
-    test_out.seek(0)
-    # rerr_out.seek(0)
-
-    sys.stderr.write(test_out.read())          # unittest
-    # print REDIRECT_DIVISION
-    # sys.stdout.write(rerr_out.read())        # unittest, stderr, stdout
-
-    if exit: sys.exit()
-
 #################################### HELPERS ###################################
 
 def rgba_between(value, minimum=0, maximum=255):

File test_runner.py

+import sys, os, re, unittest, StringIO, time
+from pprint import pformat
+
+TEST_RESULTS_START = "<--!! TEST RESULTS START HERE !!-->"
+TEST_RESULTS_RE = re.compile('%s\n(.*)' % TEST_RESULTS_START, re.DOTALL | re.M)
+
+def redirect_output():
+    yield sys.stderr, sys.stdout
+    sys.stderr, sys.stdout = StringIO.StringIO(), StringIO.StringIO()
+    yield sys.stderr, sys.stdout
+
+def restore_output(err, out):
+    sys.stderr, sys.stdout = err, out
+
+def StringIOContents(io):
+    io.seek(0)
+    return io.read()
+    
+unittest._TextTestResult.monkey = lambda self, errors: [ 
+    (self.getDescription(e[0]), e[1]) for e in errors
+]
+
+def run_test(module, sub_process_mode=False):
+    suite = unittest.TestSuite()
+    if not isinstance(module, list): module = [module]
+
+    for modules in module:   
+        __import__(modules)
+        print 'loading', modules
+        test = unittest.defaultTestLoader.loadTestsFromName(modules)
+        suite.addTest(test)
+    
+    (realerr, realout), (err, out) =  redirect_output()
+    # restore_output(realerr, realout)   DEBUG
+    
+    captured = StringIO.StringIO()
+    runner = unittest.TextTestRunner(stream = captured)
+    results = runner.run( suite )
+
+    captured, err, out = map(StringIOContents, (captured, err, out))
+    restore_output(realerr, realout)
+
+    results = (
+        {
+            len(module) == 1 and module[0] or 'all_tests':
+            {
+                'num_tests' : results.testsRun,
+                'failures'  : results.monkey(results.failures),
+                'errors'    : results.monkey(results.errors),
+                'output'    : captured,
+                'stderr'    : err,
+                'stdout'    : out,
+            }
+        }
+    )
+
+    if sub_process_mode:
+        print TEST_RESULTS_START
+        print pformat(results)
+    else:
+        return results['all_tests']
+
+if __name__ == '__main__':
+    run_test(sys.argv[1], 1)