Commits

Anonymous committed 208b0e9

Added ThreadRunner class that is like ThreadPool, but runs each submitted task in a new thread.

Reactor uses ThreadRunner if no ThreadPool given to ctor. ThreadRunner has same interface as ThreadPool and simplifies Reactor by allowing same syntax for thread pool or no thread pool.

ThreadPool has map() method that is a parallel version of Python map().

Reactor map_call_in_thread() which calls into its threadpool's map() method.

Comments (0)

Files changed (7)

asyncthreads/completion.py

         # Timeout waiting for completion.
         raise RuntimeError('timeout waiting for result')
 
+    def wait(self, timeout=None):
+        """Wait until the result is available or until timeout seconds pass."""
+        return self._call_done.wait(timeout)
+
     def done(self):
         """Return True if call completed or canceled."""
         return self._call_done.is_set()
     def canceled(self):
         return self._task_id is False
 
+    def successful(self):
+        """Return whether the call completed without raising an exception.
+
+        Raises AssertionError if the result is not ready.
+
+        """
+        if not self._call_done.is_set():
+            raise AssertionError('call not completed')
+        return not self._exception
+
 
 class _Task(object):
 

asyncthreads/reactor.py

     import Queue as queue
 
 from asyncthreads import completion
+from asyncthreads import threadrunner
 
 
 class Reactor(threading.Thread):
         if thread_pool is not None:
             self._thread_pool = thread_pool
         else:
-            self._thread_pool = None
+            self._thread_pool = threadrunner.ThreadRunner()
 
         self._task_heap = []
 
 
         """
         assert(self.is_alive()), 'reactor is not started'
-        future = completion.Completion()
-        task = completion._Task(future, func, args, callback, result_q)
-        if self._thread_pool:
-            self._thread_pool.submit_task(task)
-        else:
-            th = threading.Thread(target=task.execute)
-            th.daemon = True
-            th.start()
-        return future
+        return self._thread_pool.submit(func, args, callback, result_q)
 
     def call_in_thread_later(self, time_until_call, func, args=None,
                              callback=None, result_q=None):
              action_time, True))
         return future
 
+    def map_call_in_thread(self, func, iterable, callback=None, result_q=None):
+        """Parallel equivalent to map() built-in function.
+
+        Calls call_in_thread() with the given function and callback, for every
+        item of iterable.
+
+        Blocks until all results are ready.
+
+        """
+        return self._thread_pool.map(func, iterable, callback, result_q)
+
     def cancel_scheduled(self, future):
         """Cancel a call scheduled to be called at a later time.
 
 
         # Shutdown threadpool after reactor main thread has exited to ensure no
         # more tasks are submitted to the thread pool.
-        if self._thread_pool is not None and shutdown_threadpool:
+        if shutdown_threadpool:
             self._thread_pool.shutdown()
 
         return True
     #==========================================================================
 
     def _deferred_thread(self, task):
-        if self._thread_pool:
-            self._thread_pool.submit_task(task)
-        else:
-            th = threading.Thread(target=task.execute)
-            th.daemon = True
-            th.start()
+        self._thread_pool.submit_task(task)
 
     def _deferred_call(self, task):
         self._call_queue.put((task, None, None))
 
         def do_sched_task(task, _, in_thread):
             if in_thread:
-                if self._thread_pool is not None:
-                    # Call thread pool to execute scheduled function.
-                    self._thread_pool.submit_task(task)
-                else:
-                    # Create new thread to execute scheduled function.
-                    th = threading.Thread(target=task.execute)
-                    th.daemon = True
-                    th.start()
+                # Call thread pool to execute scheduled function.
+                self._thread_pool.submit_task(task)
             else:
                 #print('===> calling scheduled function')
                 task.execute()

asyncthreads/threadpool.py

 
     """
 
-    def __init__(self, min_size, max_size=None, queue_size=0):
+    def __init__(self, min_size, max_size=0, queue_size=0):
         """Initialize the thread pool and start the minimum number of threads.
 
         If max_size is specified and is larger than min_size, then the pool
-        will automatically resize.
+        will automatically resize.  If min_size is > 0 and max_size is zero,
+        then max_size is set to min_size and thread pool is fixed at min_size.
+
+        It is an error if min_size and max_size are both zero.
 
         Arguments:
         min_size    -- Minimum number of threads to have in pool.
         assert(isinstance(min_size, int)), 'min_size must be int'
         assert(isinstance(max_size, int)), 'max_size must be int'
 
+        # Validate and set min_size
+        if min_size <= 0 and max_size <= 0:
+            raise ValueError('min_size and/or max_size must be > 0')
+        if min_size < 0:
+            min_size = 0
+        self._min_size = min_size
+
+        if max_size <= 0:
+            # Pool is manually resizeable only.
+            max_size = min_size
+        elif max_size < min_size:
+            raise ValueError('max_size must be 0 or >= min_size')
+        # Pool is automatically resizeable if max_size > min_size.
+        self._max_size = max_size
+
+        self._threads = 0
         self._shutdown = False
-        self._threads = 0
         self._task_queue = queue.Queue(queue_size)
         self._resize_lock = threading.Lock()
-        self._idle_counter = ThreadCounter()
-
-        self._min_size = min_size
-        if max_size and max_size > min_size:
-            # Pool is automatically resizeable.
-            self._max_size = max_size
-        else:
-            # Pool is manually resizeable only.
-            self._max_size = min_size
+        self._idle_lock = threading.Lock()
+        self._idle_count = 0
 
     def submit(self, function, args=None, callback=None, result_q=None):
         """Insert a task function into the queue.
             completion._Task(future, function, args, callback, result_q))
         return future
 
+    def map(self, func, iterable, callback=None):
+        """Parallel equivalent to map() built-in function.
+
+        Blocks until all results are ready.
+
+        """
+        futures = [self.submit(func, (i,), callback) for i in iterable]
+        return [f.result() for f in futures]
+
+    def map_async(self, func, iterable, callback=None):
+        """Variant as map() which returns a Completion object immediately.
+
+        This method consumes an extra thread to collect the result from each
+        call and return the array of results on a single Completion.
+
+        """
+        return self.submit(self.map, (func, iterable, callback))
+
     def submit_task(self, task):
         """Insert a Task object into the queue.
 
 
     def idle_threads(self):
         """Return the current number of idle threads."""
-        return int(self._idle_counter)
+        return self._idle_count
 
     def qsize(self):
         """Return the number of items currently in task queue."""
 
         return True
 
+    def resize_pool(self, new_size):
+        """Resize the thread pool to have the specified number of threads.
+
+        """
+        if new_size < 0:
+            raise ValueError('pool size must be >= 0')
+
+        with self._resize_lock:
+            if new_size > self._threads:
+                # Growing pool.
+                while self._threads < new_size:
+                    self._threads += 1
+                    t = threading.Thread(target=self._worker)
+                    t.daemon = True
+                    t.start()
+            else:
+                # Shrinking pool.
+                while self._threads > new_size:
+                    self._threads -= 1
+                    self._task_queue.put(None)
+
     def _adjust_thread_count(self):
         """If more items in the queue than idle threads, add thread.
 
         """
-        idle = int(self._idle_counter)
+        idle = self._idle_count
         qsize = self._task_queue.qsize()
 
         # Grow pool if there are more items in queue than there idle threads.
         if qsize > idle:
             if self._threads < self._max_size:
-                t = threading.Thread(target=_worker,
-                                     args=(self._task_queue,
-                                           self._idle_counter))
+                t = threading.Thread(target=self._worker)
                 t.daemon = True
                 t.start()
                 self._threads += 1
             self._task_queue.put(None)
             self._threads -= 1
 
-        return self._threads
-
-    def resize_pool(self, new_size):
-        """Resize the thread pool to have the specified number of threads.
+    def _worker(self):
+        """Retrieve the next task and execute it, calling the callback if any.
 
         """
-        if new_size < 0:
-            raise ValueError('pool size must be >= 0')
+        task_queue = self._task_queue
+        idle_lock = self._idle_lock
+        while True:
+            try:
+                task = task_queue.get(False)
+            except queue.Empty:
+                with idle_lock:
+                    self._idle_count += 1
+                # Wait forever.
+                task = task_queue.get()
+                with idle_lock:
+                    self._idle_count -= 1
 
-        with self._resize_lock:
-            if new_size > self._threads:
-                # Growing pool.
-                args = (self._task_queue, self._idle_counter)
-                while self._threads < new_size:
-                    self._threads += 1
-                    t = threading.Thread(target=_worker, args=args)
-                    t.daemon = True
-                    t.start()
-            else:
-                # Shrinking pool.
-                while self._threads > new_size:
-                    self._threads -= 1
-                    self._task_queue.put(None)
+            # Exit if told to quit.
+            if task is None:
+                task_queue.task_done()
+                break
 
+            # Execute task.
+            task.execute()
 
-class ThreadCounter(object):
-
-    def __init__(self):
-        self._lock = threading.Lock()
-        self._count = 0
-
-    def __int__(self):
-        return self._count
-
-    def incr(self):
-        with self._lock:
-            self._count += 1
-
-    def decr(self):
-        with self._lock:
-            self._count -= 1
-
-    def set(self, value):
-        with self._lock:
-            self._count = value
-
-
-def _worker(task_queue, idle_counter):
-    """Retrieve the next task and execute it, calling the callback if any.
-
-    """
-    while True:
-        try:
-            task = task_queue.get(False)
-        except queue.Empty:
-            idle_counter.incr()
-            # Wait forever.
-            task = task_queue.get()
-            idle_counter.decr()
-
-        # Exit if told to quit.
-        if task is None:
+            del task
             task_queue.task_done()
-            break
-
-        # Execute task.
-        task.execute()
-
-        del task
-        task_queue.task_done()

asyncthreads/threadrunner.py

+"""
+Like ThreadPool, but submit() runs each task in a new thread.
+
+"""
+__author__ = "Andrew Gillis"
+__license__ = "http://www.opensource.org/licenses/mit-license.php"
+__maintainer__ = "Andrew Gillis"
+__email__ = "gillis.andrewj@gmail.com"
+
+import threading
+from asyncthreads import completion
+
+
+class ThreadRunner(object):
+
+    """
+    Object similar to ThreadPool, but only launches new threads on submit().
+
+    """
+    def __init__(self):
+        """Initialize the ThreadRunner instance.
+
+        """
+        self._shutdown = False
+
+    def submit(self, function, args=None, callback=None, result_q=None):
+        """See ThreadPool.submit()"""
+        future = completion.Completion()
+        self.submit_task(
+            completion._Task(future, function, args, callback, result_q))
+        return future
+
+    def map(self, func, iterable, callback=None, result_q=None):
+        """See ThreadPool.map()"""
+        futures = [self.submit(func, (i,), callback, result_q)
+                   for i in iterable]
+        return [f.result() for f in futures]
+
+    def map_async(self, func, iterable, callback=None, result_q=None):
+        """See ThreadPool.map_async()"""
+        return self.submit(self.map, (func, iterable, callback, result_q))
+
+    def submit_task(self, task):
+        """See ThreadPool.submit_task()"""
+        if self._shutdown:
+            raise RuntimeError('cannot submit tasks after shutdown')
+        th = threading.Thread(target=task.execute)
+        th.daemon = True
+        th.start()
+
+    def shutdown(self, wait_for_threads=True, drop_queued_tasks=False):
+        if self._shutdown:
+            return False
+
+        self._shutdown = True
+        return True
+
+
 def main():
     setup(
         name='asyncthreads',
-        version= '1.5.0',
+        version= '1.5.1',
         author='Andrew Gillis',
         author_email='gillis.andrewj@gmail.com',
         url='http://bitbucket.org/agillis/asyncthreads',

test/test_reactor.py

     def setup_class(cls):
         gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
+        #cls.tp = None
         cls.tp = ThreadPool(MIN_THREADS, MAX_THREADS)
         cls.rktr = Reactor(cls.tp)
+        cls.rktr = Reactor(None)
         cls.called = False
         cls.cb_event = threading.Event()
         cls.cb_rsp = None
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-        print('Threads in pool: %s' % TestReactor.tp.threads())
-        assert TestReactor.tp.threads() > 0
+        if TestReactor.tp is not None:
+            print('Threads in pool: %s' % TestReactor.tp.threads())
+            assert TestReactor.tp.threads() > 0
 
     def test_call_scheduled(self):
         """Run scheduled in main thread"""
         """Check for memory leaks"""
         # Test for any objects that cannot be freed.
         assert len(gc.garbage) == 0, 'uncollected garbage: '+str(gc.garbage)
-
-
-

test/test_threadpool.py

 #
 # Run with py.test
 #
-
+import pytest
 import gc
 import time
 from random import randrange
 #parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 #sys.path.insert(0, parentdir)
 
-from asyncthreads.threadpool import ThreadPool
+from asyncthreads import threadpool
 
 # Sample task 1: given a start and end value, shuffle integers,
 # then sort them
     @classmethod
     def setup_class(cls):
         gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
+        cls.pool = threadpool.ThreadPool(POOL_MIN, POOL_MAX)
 
-        cls.pool = ThreadPool(POOL_MIN, POOL_MAX)
+    def test_create(self):
+        with pytest.raises(ValueError) as e:
+             bad_pool = threadpool.ThreadPool(0, 0)
+        print('OK, caught: ' + str(e))
+
+        with pytest.raises(ValueError) as e:
+             bad_pool = threadpool.ThreadPool(-3, -7)
+        print('OK, caught: ' + str(e))
+
+        with pytest.raises(ValueError) as e:
+             bad_pool = threadpool.ThreadPool(5, 2)
+        print('OK, caught: ' + str(e))
+
+        good_pool = threadpool.ThreadPool(5, -22)
+
 
     def test_start(self):
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
-        self.pool.submit(wait_task, 15, task_callback)
+        for _ in range(50):
+            self.pool.submit(wait_task, 15, task_callback)
+
         # Insert tasks into the queue and let them run
         print('start')
         self.pool.submit(sort_task, (1000, 100000), task_callback)
         self.pool.resize_pool(80)
         assert self.pool.threads() == 80
 
-        while self.pool.qsize():
-            print('\n---> queued items: %s' % self.pool.qsize())
-            print('---> idle threads: %s' % self.pool.idle_threads())
-            print('---> thread count: %s' % self.pool.threads())
-            time.sleep(5)
-
-        print('\n---> queued items: %s' % self.pool.qsize())
-        print('---> idle threads: %s' % self.pool.idle_threads())
-        print('---> thread count: %s' % self.pool.threads())
-
-        while self.pool.threads() > POOL_MIN:
-            self.pool.submit(range, (0,1))
+        future = self.pool.submit(range, (0,1))
+        while not future.done():
             print('\n---> queued items: %s' % self.pool.qsize())
             print('---> idle threads: %s' % self.pool.idle_threads())
             print('---> thread count: %s' % self.pool.threads())
             time.sleep(0.25)
 
+        while True:
+            self.pool.submit(range, (0,1))
+            time.sleep(0.25)
+            threads = self.pool.threads()
+            idle = self.pool.idle_threads()
+            print('\n---> queued items: %s' % self.pool.qsize())
+            print('---> idle threads: %s' % idle)
+            print('---> thread count: %s' % threads)
+            if (threads == POOL_MIN and idle == self.pool.threads()):
+                break
+
         print('finish')
 
+    def test_map(self):
+        results = self.pool.map(wait_task, range(1,6), task_callback)
+        assert len(results) == 5
+        for r in results:
+            print(r)
+
     def test_shutdown(self):
         # When all tasks are finished, allow the threads to terminate
         print('Shutting down thread pool')
-        self.pool.shutdown(False, False)
 
-        while True:
-            print('\n---> queued items: %s' % self.pool.qsize())
-            print('---> idle threads: %s' % self.pool.idle_threads())
-            print('---> thread count: %s' % self.pool.threads())
-            time.sleep(5)
-            if not self.pool.qsize():
-                break
+        # Shutdown, and wait for all threads to exit (default).
+        self.pool.shutdown()
+
+        with pytest.raises(RuntimeError) as e:
+            self.pool.submit(wait_task, 10)
+
+        assert not self.pool.shutdown()
+
+        print('\n---> queued items: %s' % self.pool.qsize())
+        print('---> idle threads: %s' % self.pool.idle_threads())
+        print('---> thread count: %s' % self.pool.threads())
+        assert self.pool.qsize() == 0
+        assert self.pool.idle_threads() == 0
+        assert self.pool.threads() == 0
 
     def test_memory_leaks(self):
         """Check for memory leaks"""