Source

pygame / test / threads_test.py

Diff from to

File test/threads_test.py

-#################################### IMPORTS ###################################
-
-if __name__ == '__main__':
-    import sys
-    import os
-    pkg_dir = os.path.split(os.path.abspath(__file__))[0]
-    parent_dir, pkg_name = os.path.split(pkg_dir)
-    is_pygame_pkg = (pkg_name == 'tests' and
-                     os.path.split(parent_dir)[1] == 'pygame')
-    if not is_pygame_pkg:
-        sys.path.insert(0, parent_dir)
-else:
-    is_pygame_pkg = __name__.startswith('pygame.tests.')
-
-if is_pygame_pkg:
-    from pygame.tests.test_utils import test_not_implemented, unittest
-else:
-    from test.test_utils import test_not_implemented, unittest
-from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP
-from pygame import threads
-
-import time
-
-################################################################################
-
-class WorkerQueueTypeTest(unittest.TestCase):
-    def test_usage_with_different_functions(self):
-        def f(x):
-            return x+1
-        
-        def f2(x):
-            return x+2
-        
-        wq = WorkerQueue()
-        fr = FuncResult(f)
-        fr2 = FuncResult(f2)
-        wq.do(fr, 1)
-        wq.do(fr2, 1)
-        wq.wait()
-        wq.stop()
-
-        self.assert_(fr.result  == 2)
-        self.assert_(fr2.result == 3)
-
-    def test_do(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do:
-
-          # puts a function on a queue for running later.
-          #
-        return
-
-    def test_stop(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.stop:
-
-          # Stops the WorkerQueue, waits for all of the threads to finish up.
-          #         
-        
-        wq = WorkerQueue()
-        
-        self.assert_(len(wq.pool) > 0)
-        for t in wq.pool: self.assert_(t.isAlive())
-        
-        for i in xrange(200): wq.do(lambda x: x+1, i)
-        
-        wq.stop()
-        for t in wq.pool: self.assert_(not t.isAlive())
-        
-        self.assert_(wq.queue.get() is STOP)
-
-    def todo_test_threadloop(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop:
-
-          # Loops until all of the tasks are finished.
-
-        self.fail()
-
-    def test_wait(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait:
-
-          # waits until all tasks are complete.
-
-        wq = WorkerQueue()
-        
-        for i in xrange(2000): wq.do(lambda x: x+1, i)
-        wq.wait()
-
-        self.assertRaises(Empty, wq.queue.get_nowait)
-
-        wq.stop()
-
-class ThreadsModuleTest(unittest.TestCase):
-    def todo_test_benchmark_workers(self):
-        "tags:long_running"
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers:
-
-          # does a little test to see if workers are at all faster.
-          # Returns the number of workers which works best.
-          # Takes a little bit of time to run, so you should only really call
-          #   it once.
-          # You can pass in benchmark data, and functions if you want.
-          # a_bench_func - f(data)
-          # the_data - data to work on.
-
-        self.fail()
-
-    def test_init(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.init:
-
-          # Does a little test to see if threading is worth it.
-          #   Sets up a global worker queue if it's worth it.
-          #
-          # Calling init() is not required, but is generally better to do.
-
-        threads.init(8)
-
-        self.assert_(isinstance(threads._wq, WorkerQueue))
-
-        threads.quit()
-
-    def test_quit(self):
-
-        # __doc__ (as of 2008-06-28) for pygame.threads.quit:
-
-          # cleans up everything.
-          #
-
-        threads.init(8)
-
-        threads.quit()
-
-        self.assert_(threads._wq is None)
-
-    def test_tmap(self):
-        # __doc__ (as of 2008-06-28) for pygame.threads.tmap:
-
-          # like map, but uses a thread pool to execute.
-          # num_workers - the number of worker threads that will be used.  If pool
-          #                 is passed in, then the num_workers arg is ignored.
-          # worker_queue - you can optionally pass in an existing WorkerQueue.
-          # wait - True means that the results are returned when everything is finished.
-          #        False means that we return the [worker_queue, results] right away instead. 
-          #        results, is returned as a list of FuncResult instances.
-          # stop_on_error -
-
-        func, data = lambda x:x+1, xrange(100)
-
-        tmapped = tmap(func, data)
-        mapped = map(func, data)
-
-        self.assert_(tmapped == mapped)
-        
-    def test_tmap__None_func_and_multiple_sequences(self):
-        return     #TODO
-        
-        """ Using a None as func and multiple seqences """
-
-        res =  tmap(None, [1,2,3,4])
-
-        res2 = tmap(None, [1,2,3,4], [22, 33, 44, 55])
-        
-        res3 = tmap(None, [1,2,3,4], [22, 33, 44, 55, 66])
-        
-        res4 = tmap(None, [1,2,3,4,5], [22, 33, 44, 55])
-        
-        self.assertEqual([1, 2, 3, 4], res)
-        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2)
-        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3)
-        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5,None)], res4)
-        
-    def test_tmap__wait(self):
-        r = range(1000)
-        wq, results = tmap(lambda x:x, r, num_workers = 5, wait=False)
-        wq.wait()
-        r2 = map(lambda x:x.result, results)
-        self.assert_(r == r2)
-
-    def test_FuncResult(self):
-        # as of 2008-06-28
-        # FuncResult(f, callback = None, errback = None)
-        
-        # Used for wrapping up a function call so that the results are stored
-        #      inside the instances result attribute.
-        
-        
-        #     f - is the function we that we call 
-        #         callback(result) - this is called when the function(f) returns
-        #         errback(exception) - this is called when the function(f) raises
-        #                                an exception.
-
-        # Results are stored in result attribute
-        fr = FuncResult(lambda x:x+1)
-        fr(2)
-        self.assert_(fr.result == 3)
-        
-        # Exceptions are store in exception attribute
-        self.assert_(fr.exception is None,  "when no exception raised")
-        
-        exception = ValueError('rast')
-        def x(sdf):
-            raise exception
-        fr = FuncResult(x)
-        fr(None)
-        self.assert_(fr.exception is exception)
-
-################################################################################
-
-if __name__ == '__main__':
-    unittest.main()
+#################################### IMPORTS ###################################
+
+if __name__ == '__main__':
+    import sys
+    import os
+    pkg_dir = os.path.split(os.path.abspath(__file__))[0]
+    parent_dir, pkg_name = os.path.split(pkg_dir)
+    is_pygame_pkg = (pkg_name == 'tests' and
+                     os.path.split(parent_dir)[1] == 'pygame')
+    if not is_pygame_pkg:
+        sys.path.insert(0, parent_dir)
+else:
+    is_pygame_pkg = __name__.startswith('pygame.tests.')
+
+if is_pygame_pkg:
+    from pygame.tests.test_utils import test_not_implemented, unittest
+else:
+    from test.test_utils import test_not_implemented, unittest
+from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP
+from pygame import threads
+
+import time
+
+################################################################################
+
+class WorkerQueueTypeTest(unittest.TestCase):
+    def test_usage_with_different_functions(self):
+        def f(x):
+            return x+1
+        
+        def f2(x):
+            return x+2
+        
+        wq = WorkerQueue()
+        fr = FuncResult(f)
+        fr2 = FuncResult(f2)
+        wq.do(fr, 1)
+        wq.do(fr2, 1)
+        wq.wait()
+        wq.stop()
+
+        self.assert_(fr.result  == 2)
+        self.assert_(fr2.result == 3)
+
+    def test_do(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do:
+
+          # puts a function on a queue for running later.
+          #
+        return
+
+    def test_stop(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.stop:
+
+          # Stops the WorkerQueue, waits for all of the threads to finish up.
+          #         
+        
+        wq = WorkerQueue()
+        
+        self.assert_(len(wq.pool) > 0)
+        for t in wq.pool: self.assert_(t.isAlive())
+        
+        for i in xrange(200): wq.do(lambda x: x+1, i)
+        
+        wq.stop()
+        for t in wq.pool: self.assert_(not t.isAlive())
+        
+        self.assert_(wq.queue.get() is STOP)
+
+    def todo_test_threadloop(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop:
+
+          # Loops until all of the tasks are finished.
+
+        self.fail()
+
+    def test_wait(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait:
+
+          # waits until all tasks are complete.
+
+        wq = WorkerQueue()
+        
+        for i in xrange(2000): wq.do(lambda x: x+1, i)
+        wq.wait()
+
+        self.assertRaises(Empty, wq.queue.get_nowait)
+
+        wq.stop()
+
+class ThreadsModuleTest(unittest.TestCase):
+    def todo_test_benchmark_workers(self):
+        "tags:long_running"
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers:
+
+          # does a little test to see if workers are at all faster.
+          # Returns the number of workers which works best.
+          # Takes a little bit of time to run, so you should only really call
+          #   it once.
+          # You can pass in benchmark data, and functions if you want.
+          # a_bench_func - f(data)
+          # the_data - data to work on.
+
+        self.fail()
+
+    def test_init(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.init:
+
+          # Does a little test to see if threading is worth it.
+          #   Sets up a global worker queue if it's worth it.
+          #
+          # Calling init() is not required, but is generally better to do.
+
+        threads.init(8)
+
+        self.assert_(isinstance(threads._wq, WorkerQueue))
+
+        threads.quit()
+
+    def test_quit(self):
+
+        # __doc__ (as of 2008-06-28) for pygame.threads.quit:
+
+          # cleans up everything.
+          #
+
+        threads.init(8)
+
+        threads.quit()
+
+        self.assert_(threads._wq is None)
+
+    def test_tmap(self):
+        # __doc__ (as of 2008-06-28) for pygame.threads.tmap:
+
+          # like map, but uses a thread pool to execute.
+          # num_workers - the number of worker threads that will be used.  If pool
+          #                 is passed in, then the num_workers arg is ignored.
+          # worker_queue - you can optionally pass in an existing WorkerQueue.
+          # wait - True means that the results are returned when everything is finished.
+          #        False means that we return the [worker_queue, results] right away instead. 
+          #        results, is returned as a list of FuncResult instances.
+          # stop_on_error -
+
+        func, data = lambda x:x+1, xrange(100)
+
+        tmapped = tmap(func, data)
+        mapped = map(func, data)
+
+        self.assert_(tmapped == mapped)
+        
+    def test_tmap__None_func_and_multiple_sequences(self):
+        return     #TODO
+        
+        """ Using a None as func and multiple seqences """
+
+        res =  tmap(None, [1,2,3,4])
+
+        res2 = tmap(None, [1,2,3,4], [22, 33, 44, 55])
+        
+        res3 = tmap(None, [1,2,3,4], [22, 33, 44, 55, 66])
+        
+        res4 = tmap(None, [1,2,3,4,5], [22, 33, 44, 55])
+        
+        self.assertEqual([1, 2, 3, 4], res)
+        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2)
+        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3)
+        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5,None)], res4)
+        
+    def test_tmap__wait(self):
+        r = range(1000)
+        wq, results = tmap(lambda x:x, r, num_workers = 5, wait=False)
+        wq.wait()
+        r2 = map(lambda x:x.result, results)
+        self.assert_(r == r2)
+
+    def test_FuncResult(self):
+        # as of 2008-06-28
+        # FuncResult(f, callback = None, errback = None)
+        
+        # Used for wrapping up a function call so that the results are stored
+        #      inside the instances result attribute.
+        
+        
+        #     f - is the function we that we call 
+        #         callback(result) - this is called when the function(f) returns
+        #         errback(exception) - this is called when the function(f) raises
+        #                                an exception.
+
+        # Results are stored in result attribute
+        fr = FuncResult(lambda x:x+1)
+        fr(2)
+        self.assert_(fr.result == 3)
+        
+        # Exceptions are store in exception attribute
+        self.assert_(fr.exception is None,  "when no exception raised")
+        
+        exception = ValueError('rast')
+        def x(sdf):
+            raise exception
+        fr = FuncResult(x)
+        fr(None)
+        self.assert_(fr.exception is exception)
+
+################################################################################
+
+if __name__ == '__main__':
+    unittest.main()