1. Andrew Gillis
  2. asyncthreads

Commits

gill...@gmail.com  committed 573e7cb

Add printing back into unit test. Reformat output.

  • Participants
  • Parent commits 23a10a3
  • Branches default

Comments (0)

Files changed (2)

File test/test_reactor.py

View file
  • Ignore whitespace
     import Queue as queue
 
 # Uncomment to import from repo instead of site-packages.
-import os
-import sys
-parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.insert(0, parentdir)
+#import os
+#import sys
+#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+#sys.path.insert(0, parentdir)
 
 from asyncthreads.reactor import Reactor
 from asyncthreads.threadpool import ThreadPool
 
+MIN_THREADS = 2
+MAX_THREADS = 8
 
 class TestReactor(object):
 
     def setup_class(cls):
         gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
-        cls.rktr = Reactor(ThreadPool(2, 16))
+        cls.rktr = Reactor(ThreadPool(MIN_THREADS, MAX_THREADS))
         cls.called = False
         cls.cb_event = threading.Event()
         cls.cb_rsp = None
         cls.new_cb_called = False
         cls.defer_called = False
 
-
     def _func1(self):
         TestReactor.called = True
         return 'abc'
         assert self.rktr.is_alive(), 'reactor should be alive'
         print('started')
 
-
     def test_call(self):
         """Run in main thread"""
         print('Calling function with no args in main thread')
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-
     def test_call_args(self):
         """Test different arg representations"""
         print('Calling function with dict args')
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-
     def test_call_in_thread(self):
         """Run in other thread"""
         print('Calling function in other thread')
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-
     def test_call_scheduled(self):
         """Run scheduled in main thread"""
         print('Calling function in main thread in 3 seconds')
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-
     def test_call_in_thread_scheduled(self):
         """Run in other thread"""
         print('Calling function in other thread in 3 seconds')
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-
     def test_cancel_scheduled(self):
         """Cancel scheduled call"""
         self.reset_called()
         assert rsp is None, 'result should be None'
         assert not self.called, 'called should be False'
 
-
     def test_exception_in_call(self):
         print('Calling function in main thread that raises exception')
         self.reset_called()
         assert ret is None, 'return should be None'
         assert len(result.traceback()) > 0, 'should be traceback info'
 
-
     def test_exception_in_callback(self):
         print('Calling function in main thread callback that raises exception')
         self.reset_called()
         assert self.called, 'called should be True'
         assert len(result.traceback()) > 0, 'should be traceback info'
 
-
     def test_defer_call(self):
         print('Calling func on reactor thread that defers processing to thread')
         self.reset_called()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-
     def test_defer_call_with_new_cb(self):
         print('Calling func on reactor thread that defers processing to '\
               'thread and sets new callback')
                'new callback not called by deferred thread'
         assert ret == 'hello-world'
 
-
     def test_result_queue(self):
         print('Calling 3 functions in pooled thread, with delay of 1, 2, and '\
               '3 seconds for each thread, and waiting for all results on '\
             else:
                 assert ret == 'delay-three'
 
-
     def test_shutdown(self):
         print('Shutting down reactor')
         self.rktr.shutdown()
         assert not self.rktr.is_alive(), 'reactor should not be alive'
 
-
     def test_memory_leaks(self):
         """Check for memory leaks"""
         # Test for any objects that cannot be freed.

File test/test_threadpool.py

View file
  • Ignore whitespace
 # Sample task 1: given a start and end value, shuffle integers,
 # then sort them
 def sort_task(first, last):
-    #print("SortTask starting for ", (first, last))
+    print("SortTask starting for %s" % ((first, last),))
     numbers = list(range(first, last))
     for a in numbers:
         rnd = randrange(0, len(numbers) - 1)
         a, numbers[rnd] = numbers[rnd], a
-    #print("SortTask sorting for ", (first, last))
+    print("SortTask sorting for %s" % ((first, last),))
     numbers.sort()
-    #print("SortTask done for ", (first, last))
+    print("SortTask done for %s" % ((first, last),))
     return ("Sorter ", (first, last))
 
 # Sample task 2: just sleep for a number of seconds.
 def wait_task(data):
-    #print("WaitTask starting for ", data)
-    #print("WaitTask sleeping for %d seconds" % data)
+    print("WaitTask starting for %s" % (data,))
+    print("WaitTask sleeping for %d seconds" % (data,))
     time.sleep(data)
     return "Waiter", data
 
 # Both tasks use the same callback
 def task_callback(data):
-    #print("Callback called for", data)
+    print("Callback called for %s" % (data,))
     pass
 
 POOL_MIN = 4
 
         cls.pool = ThreadPool(POOL_MIN, POOL_MAX)
 
-
     def test_start(self):
         self.pool.submit(wait_task, 15, task_callback)
         self.pool.submit(wait_task, 15, task_callback)
         self.pool.submit(sort_task, (205, 200005), task_callback)
 
         for _ in range(3):
-            print('---> queued items:', self.pool.get_queue_size())
-            print('---> idle threads:', self.pool.get_idle_threads())
-            print('---> thread count:', len(self.pool))
+            print('\n---> queued items: %s' % self.pool.get_queue_size())
+            print('---> idle threads: %s' % self.pool.get_idle_threads())
+            print('---> thread count: %s' % len(self.pool))
             time.sleep(2)
 
-        #self.pool.resize_pool(80)
-        #assert len(self.pool) == 80
+        self.pool.resize_pool(80)
+        assert len(self.pool) == 80
 
         while self.pool.get_queue_size():
-            print('---> queued items:', self.pool.get_queue_size())
-            print('---> idle threads:', self.pool.get_idle_threads())
-            print('---> thread count:', len(self.pool))
+            print('\n---> queued items: %s' % self.pool.get_queue_size())
+            print('---> idle threads: %s' % self.pool.get_idle_threads())
+            print('---> thread count: %s' % len(self.pool))
             time.sleep(5)
 
         print('finish')
 
-
     def test_shutdown(self):
         # When all tasks are finished, allow the threads to terminate
         print('Shutting down thread pool')
         self.pool.shutdown(False, False)
 
-        print('---> queued items:', self.pool.get_queue_size())
-        while self.pool.get_queue_size():
-            print('---> queued items:', self.pool.get_queue_size())
-            print('---> idle threads:', self.pool.get_idle_threads())
-            print('---> thread count:', len(self.pool))
+        while True:
+            print('\n---> queued items: %s' % self.pool.get_queue_size())
+            print('---> idle threads: %s' % self.pool.get_idle_threads())
+            print('---> thread count: %s' % len(self.pool))
             time.sleep(5)
-
+            if not self.pool.get_queue_size():
+                break
 
     def test_memory_leaks(self):
         """Check for memory leaks"""