Commits

gill...@gmail.com  committed 1b9b54a

- ThreadPool can now, optionally, automatically resize itself.
- Changed "Result" class to "Completion".
- Renamed methods in Completion class.
- Reactor constructor now takes optional ThreadPool object instead of ThreadPool construction parameters. Decouples ctors: changes to ThreadPool ctor do not require change to Reactor ctor.
- Removed overloaded (internal) parameters when queuing scheduled calls.
- ThreadPool queue_task() renamed to submit().

  • Participants
  • Parent commits 6db8f4f

Comments (0)

Files changed (8)

 
 Using the ThreadPool and Reactor is as simple as creating an instance and submitting work::
 
- r = Reactor()
+ r = Reactor(ThreadPool(MIN_SIZE, MAX_SIZE))
  r.start()
  r.call(handle_event, (transport, event_id))
  r.call_later(300, five_min_sync_check, (param1, param2))

File asyncthreads/reactor.py

 except ImportError:
     import Queue as queue
 
-import asyncthreads.threadpool
 
-
-class Result(object):
+class Completion(object):
 
     """
-    A Result object is returned by a Reactor's call_() methods.
+    Completion represents the result of an asynchronous computation.
 
-    The Result object allows the caller to wait for and retrieve the result of
-    a function called by the Reactor.  This not only includes any return from
-    the called function, but any exception as well.
+    A Completion object is returned by a Reactor's call_() methods.  It holds a
+    value that may become available at some point.  The Completion class is
+    equivalent to the Future class offered by other concurrency frameworks.
 
-    A Result object is also used to identify a scheduled function to cancel,
-    and it can be used to check if a scheduled call has been canceled.
+    The Completion object allows the caller to wait for and retrieve the result
+    of a function called by the Reactor.  This not only includes any return
+    from the called function, but any exception as well.
+
+    A Completion object is also used to identify a scheduled function to
+    cancel, and it can be used to check if a scheduled call has been canceled.
 
     """
 
         # For canceling scheduled tasks.
         self._task_id = None
 
-        # Optional queue to put this Result on when completed.
+        # Optional queue to put this Completion on when completed.
         self._result_q = result_q
 
+    def result(self, timeout=None):
+        """Return the value returned by the call.
 
-    def get_result(self, timeout=None):
+        If the call has not yet completed then wait up to timeout seconds or
+        until call complete if timeout is None.  It timeout occurs, then raise
+        a RuntimeError exception.  If canceled before completion, then a
+        ThreadError exception is raised.
+
+        If the call raised an exception, then this methods returns the same
+        exception.
+
+        """
         if self._call_done.wait(timeout):
             if self._exception:
                 raise self._exception
 
             return self._rsp
-        return None
 
+        if self.canceled():
+            # Canceled before completed.
+            raise threading.ThreadError('call canceled before completion')
 
-    def has_result(self):
+        # Timeout waiting for completion.
+        raise RuntimeError('timeout waiting for result')
+
+    def done(self):
+        """Return True if call completed or canceled."""
         return self._call_done.is_set()
 
-
-    def get_traceback(self):
+    def traceback(self):
+        """Return the traceback string from exception in call."""
         return self._traceback_str
 
-
-    def is_canceled(self):
+    def canceled(self):
         return self._task_id is False
 
 
     executed at some later time.  Functions may also be executed, immediately
     or later, in threads separate from the reactor's main thread.
 
-    All functions executed by the reactor have an associated Result object.
-    A Result object allows the caller to wait for and retrieve the result of a
-    function called by the reactor.
+    All functions executed by the reactor have an associated Completion object.
+    A Completion object allows the caller to wait for and retrieve the result
+    of a function called by the reactor.
 
     The reason to use a Reactor is to execute functions in a separate thread,
     asynchronously, from the thread of the caller.
 
     """
 
-    # How long to wait for thread to shutdown.
-    SHUTDOWN_FAILURE_TIMEOUT_SEC = 10
-
-
-    def __init__(self, thread_pool_size=0, queue_size=None):
-        """
-        Initialize Reactor thread object and internal thread pool (if used).
+    def __init__(self, thread_pool=None):
+        """Initialize Reactor thread object and internal thread pool (if used).
 
         Arguments:
-        pool_size     -- Number of threads to have in pool.  If this is set to
-                         0 or None, then the reactor will not use a thread pool
-                         and will start threads directly.
-        queue_size    -- Maximum items allowed in task queue.  None means no
-                         limit.  This is ignored if thread pool not used.
+        thread_pool -- ThreadPool object to use to submit tasks to.  If this is
+                       None, then the reactor will not use a thread pool and
+                       will start threads directly.  Use a thread pool to
+                       throttle concurrency and to avoid thread creation
+                       overhead.
 
         """
         threading.Thread.__init__(self, None, None, None)
         self._call_queue = queue.Queue()
         self.idle_wake_sec = None
-        if thread_pool_size:
-            self._thread_pool = asyncthreads.threadpool.ThreadPool(
-                thread_pool_size, queue_size)
+        if thread_pool:
+            assert(isinstance(thread_pool, asyncthreads.threadpool.ThreadPool))
+            self._thread_pool = thread_pool
         else:
             self._thread_pool = None
 
         self._task_heap = []
 
+    def call(self, func, args=None, callback=None, result_q=None):
+        """Enqueue a function for the reactor to run in its main thread.
 
-    def call(self, func, args=None, callback=None, result_q=None):
-        """
-        Enqueue a function for the reactor to run in its main thread.
-
-        The caller can wait on the response using the returned Result object.
-        Running a function in the reactor's main thread means that no other
-        functions will be running in the reactor's main thread at the same
-        time.  This can be used in place of other synchronization.
+        The caller can wait on the response using the returned Completion
+        object. Running a function in the reactor's main thread means that no
+        other functions will be running in the reactor's main thread at the
+        same time.  This can be used in place of other synchronization.
 
         Arguments:
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
         callback -- Optional callback that takes return from func as argument.
-        result_q -- Optional response queue to place complete Result on.
+        result_q -- Optional response queue to place finished Completion on.
 
         Return:
-        Result object.
+        Completion object.
 
         """
         assert(self.is_alive()), 'reactor is not started'
                isinstance(callback, Callable)),'callback is not callable'
         assert(result_q is None or
                isinstance(result_q, queue.Queue)), 'result_q is not Queue'
-        result = Result(result_q)
-        self._call_queue.put((func, args, result, callback))
-        return result
-
+        future = Completion(result_q)
+        self._call_queue.put((func, args, future, callback, None, None))
+        return future
 
     def call_later(self, time_until_call, func, args=None, callback=None,
                    result_q=None):
-        """
-        Run a function in the reactor's main thread at a later time.
+        """Run a function in the reactor's main thread at a later time.
 
-        The caller can wait on the response using the returned Result object.
+        The caller can wait on the response using the returned Completion
+        object.
 
         Arguments:
         time_until_call -- Seconds remaining until call.
         result_q        -- Optional response queue to place complete Result on.
 
         Return:
-        Result object.
+        Completion object.
 
         """
         assert(self.is_alive()), 'reactor is not started'
         else:
             action_time = 0
 
-        result = Result(result_q)
-        d_data = (func, args, result, callback)
-        self._call_queue.put((None, d_data, action_time, False))
-        return result
-
+        future = Completion(result_q)
+        self._call_queue.put((func, args, future, callback, action_time, None))
+        return future
 
     def call_in_thread(self, func, args=None, callback=None, result_q=None):
         """
         Run the given function in a separate thread.
 
-        The caller can wait on the response using the returned Result object.
-        A thread from the reactor's internal thread pool is used to avoid the
-        overhead of creating a new thread.
+        The caller can wait on the response using the returned Completion
+        object.  A thread from the reactor's internal thread pool is used to
+        avoid the overhead of creating a new thread.
 
         Arguments:
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
         callback -- Optional callback that takes return from func as argument.
-        result_q -- Optional response queue to place complete Result on.
+        result_q -- Optional response queue to place finished Completion on.
 
         Return:
-        Result object.
+        Completion object.
 
         """
         assert(self.is_alive()), 'reactor is not started'
         assert(isinstance(func, Callable)), 'function is not callable'
         assert(callback is None or
                isinstance(callback, Callable)), 'callback is not callable'
-        result = Result(result_q)
+        future = Completion(result_q)
+        args = (func, args, future, callback)
         if self._thread_pool:
-            self._thread_pool.queue_task(self._thread_wrapper,
-                                         (func, args, result, callback))
+            self._thread_pool.submit(self._thread_wrapper, args)
         else:
-            th = threading.Thread(target=self._thread_wrapper,
-                                  args=(func, args, result, callback))
+            th = threading.Thread(target=self._thread_wrapper, args=args)
             th.daemon = True
             th.start()
-        return result
-
+        return future
 
     def call_in_thread_later(self, time_until_call, func, args=None,
                              callback=None, result_q=None):
-        """
-        Run the given function in a separate thread, at a later time.
+        """Run the given function in a separate thread, at a later time.
 
-        The caller can wait on the response using the returned Result object.
-        A thread from the reactor's internal thread pool is used to avoid the
-        overhead of creating a new thread.
+        The caller can wait on the response using the returned Completion
+        object.  A thread from the reactor's internal thread pool is used to
+        avoid the overhead of creating a new thread.
 
         Arguments:
         time_until_call -- Seconds remaining until call.
         args            -- Argument tuple to pass to function.
         callback        -- Optional callback that takes return from func as
                            argument.
-        result_q        -- Optional response queue to place complete Result on.
+        result_q        -- Optional response queue to place Completion on.
 
         Return:
-        Result object.
+        Completion object.
 
         """
         assert(self.is_alive()), 'reactor is not started'
         else:
             action_time = 0
 
-        result = Result(result_q)
-        d_data = (func, args, result, callback)
-        self._call_queue.put((None, d_data, action_time, True))
-        return result
+        future = Completion(result_q)
+        self._call_queue.put((func, args, future, callback, action_time, True))
+        return future
 
-
-    def cancel_scheduled(self, result):
-        """
-        Cancel a call scheduled to be called at a later time.
+    def cancel_scheduled(self, future):
+        """Cancel a call scheduled to be called at a later time.
 
         Arguments:
-        result -- Result object returned from call_later() or from
+        future -- Completion object returned from call_later() or from
                   call_in_thread_later().
 
         Return:
         call was not found (already executed) or already canceled.
 
         """
-        # The result's task_id cannot be checked in this method.  The request
-        # must be placed on the reactor's call queue to ensure that a scheduled
-        # task request is received by the reactor before this request to cancel
-        # it is received.
-        tmp_result = self.call(self._cancel_scheduled, result)
+        # The Completion's task_id cannot be checked in this method.  The
+        # request must be placed on the reactor's call queue to ensure that a
+        # scheduled task request is received by the reactor before this request
+        # to cancel it is received.
+        tmp_future = self.call(self._cancel_scheduled, future)
         # Wait for reactor to remove task, and return whether or not the task
         # was removed.
-        return tmp_result.get_result()
-
+        return tmp_future.result()
 
     def shutdown(self):
-        """
-        Cause the reactor thread to exit gracefully.
+        """Cause the reactor thread to exit gracefully.
 
         """
         if not self.is_alive():
             return False
 
         self._call_queue.put(None)
+        # Wait for thread to exit.
+        self.join()
+
+        # Shutdown threadpool after reactor main thread has exited to ensure no
+        # more tasks are submitted to the thread pool.
         if self._thread_pool is not None:
             self._thread_pool.shutdown()
 
-        # Wait for thread to exit.
-        self.join(Reactor.SHUTDOWN_FAILURE_TIMEOUT_SEC)
-        if self.is_alive():
-            #print '===> timed out waiting to join:', self.getName()
-            return False
-
-        #print '===> thread exited:', self.getName()
-        return True
-
-
     def qsize(self):
         """Return number of items in the message queue."""
         return self._call_queue.qsize()
 
-
     def empty(self):
         """Return True is no items in the message queue."""
         return self._call_queue.empty()
 
-
     def defer_to_thread(self, func, args=None, callback=None):
         """
         Defer execution from main thread to pooled thread, and use the caller's
-        Result object to return the result of the additional execution.
+        Completion object to return the result of the additional execution.
 
         Arguments:
         func     -- Function to execute in pooled thread.
         """
         return _Deferred(func, args, callback)
 
-
     #==========================================================================
     # Private methods follow:
     #==========================================================================
 
-    def _thread_wrapper(self, func, args, result, callback):
+    def _thread_wrapper(self, func, args, future, callback):
         try:
             if args is None:
                 rsp = func()
                 rsp = func(args)
         except Exception as e:
             rsp = None
-            result._exception = e
-            result._traceback_str = traceback.format_exc()
+            future._exception = e
+            future._traceback_str = traceback.format_exc()
 
         if isinstance(rsp, _Deferred):
             func, args, new_callback = rsp.action_tuple
             if new_callback is not None:
                 callback = new_callback
             if self._thread_pool:
-                self._thread_pool.queue_task(
-                    self._thread_wrapper, (func, args, result, callback))
+                self._thread_pool.submit(
+                    self._thread_wrapper, (func, args, future, callback))
             else:
                 th = threading.Thread(
                     target=self._thread_wrapper,
-                    args=(func, args, result, callback))
+                    args=(func, args, future, callback))
                 th.daemon = True
                 th.start()
             return
                 callback(rsp)
             except Exception as e:
                 rsp = None
-                result._exception = e
-                result._traceback_str = traceback.format_exc()
+                future._exception = e
+                future._traceback_str = traceback.format_exc()
 
         # Set call done only after callback has completed.  This is necessary
-        # for code waiting on a result to be able to reliably see an exception
-        # from a callback.
-        result._rsp = rsp
-        result._call_done.set()
+        # for code waiting on a Completion to be able to reliably see an
+        # exception from a callback.
+        future._rsp = rsp
+        future._call_done.set()
 
-        if result._result_q:
-            result_q = result._result_q
-            result._result_q = None
+        if future._result_q:
+            result_q = future._result_q
+            future._result_q = None
             try:
-                result_q.put(result)
+                result_q.put(future)
             except queue.Full:
                 pass
 
-
     def _process_scheduled_queue(self):
-        """
-        Process all scheduled tasks that are currently due.
+        """Process all scheduled tasks that are currently due.
 
         """
         task_heap = self._task_heap
         # Execute the first task without checking the time, because this method
         # is only called when a scheduled task is due.
-        task_time, task_id, task_data, call_in_thread = heapq.heappop(
-            task_heap)
+        task_time, task_id, task_data = heapq.heappop(task_heap)
 
         # If this task has been removed, then do nothing.
         if task_data is None:
             return
 
-        def do_sched_task(func, args, result, callback):
-            if call_in_thread:
+        def do_sched_task(func, args, future, callback, call_at, in_thread):
+            if in_thread:
                 if self._thread_pool is not None:
                     # Call thread pool to execute scheduled function.
-                    self._thread_pool.queue_task(
-                        self._thread_wrapper, (func, args, result, callback))
+                    self._thread_pool.submit(
+                        self._thread_wrapper, (func, args, future, callback))
                 else:
                     # Create new thread to execute scheduled function.
                     th = threading.Thread(
                         target=self._thread_wrapper,
-                        args=(func, args, result, callback))
+                        args=(func, args, future, callback))
                     th.daemon = True
                     th.start()
             else:
                 #print '===> calling scheduled function'
-                self._thread_wrapper(func, args, result, callback)
-
+                self._thread_wrapper(func, args, future, callback)
 
         do_sched_task(*task_data)
         if task_heap:
             now = time.time()
             next_time = task_heap[0][0]
             while task_heap and (not next_time or next_time <= now):
-                task_time, task_id, task_data, call_in_thread = heapq.heappop(
-                    task_heap)
+                task_time, task_id, task_data = heapq.heappop(task_heap)
                 do_sched_task(*task_data)
                 if task_heap:
                     next_time = task_heap[0][0]
 
-
-    def _cancel_scheduled(self, result):
-        """
-        Mark a scheduled call, in scheduled task heap, as canceled.
+    def _cancel_scheduled(self, future):
+        """Mark a scheduled call, in scheduled task heap, as canceled.
 
         """
-        task_id = result._task_id
+        task_id = future._task_id
         if task_id is None or task_id is False:
             # Task is not scheduled or is already canceled.
             return False
         task_heap = self._task_heap
         for idx, heap_item in enumerate(task_heap):
-            task_time, heap_task_id, task_data, call_in_thread = heap_item
+            task_time, heap_task_id, task_data = heap_item
             if heap_task_id == task_id:
-                func, args, result, callback = task_data
-                result._rsp = None
-                result._task_id = False
-                result._call_done.set()
+                func, args, future, callback, call_at, in_thread = task_data
+                future._rsp = None
+                future._task_id = False
+                future._call_done.set()
                 # Need to keep task_id to preserve heap invariant.
-                task_heap[idx] = (task_time, heap_task_id, None, None)
+                task_heap[idx] = (task_time, heap_task_id, None)
                 return True
         return False
 
-
     def run(self):
-        """
-        Function run in thread, started by the start() method.
+        """Function run in thread, started by the start() method.
 
         """
         call_queue = self._call_queue
                     sleep_time = action_time - now
 
             try:
-                #print '===> sleeping for %s seconds, or until next call put on queue' % (sleep_time,)
+                #print '===> sleeping for %s seconds' % (sleep_time,)
                 # Get the next function from the call queue.
-                action_tuple = call_queue.get(True, sleep_time)
-                if action_tuple is None:
+                work_item = call_queue.get(True, sleep_time)
+                if work_item is None:
                     break
-                func, args, result, callback = action_tuple
+                func, args, future, callback, call_at, in_thread = work_item
 
                 # If this is a new scheduled task.
-                if func is None:
-                    time_to_call = result
-                    call_in_thread = callback # callback is really thread flag
-                    f,a,r,c = args
-                    r._task_id = next_task_id
+                if call_at:
+                    future._task_id = next_task_id
                     heapq.heappush(task_heap,
-                                   (time_to_call, next_task_id, args,
-                                    call_in_thread))
+                                   (call_at, next_task_id, work_item))
                     next_task_id += 1
                 else:
-                    self._thread_wrapper(func, args, result, callback)
+                    self._thread_wrapper(func, args, future, callback)
 
             except queue.Empty:
                 # Woke up because it is time do next scheduled task.

File asyncthreads/threadpool.py

 __email__ = "gillis.andrewj@gmail.com"
 
 import threading
-import time
 from collections import Callable
 try:
     import queue
     """
     Resizable thread pool class.
 
-    Maintain a pool of threads that execute tasks given to queue_task().  The
-    number of worker whreads can be changed by calling resize_pool().
+    Maintain a pool of threads that execute tasks given to submit().  The
+    number of worker threads can be changed by calling resize_pool() to set the
+    number of workers explicitly, or by setting a max_size greater than
+    min_size and letting to pool automatically change its size.
 
     """
 
-    def __init__(self, pool_size, queue_size=None):
-        """
-        Initialize the thread pool and start the specified number of threads.
+    def __init__(self, min_size, max_size=None, queue_size=None):
+        """Initialize the thread pool and start the minimum number of threads.
+
+        If max_size is specified and is larger than min_size, then the pool
+        will automatically resize.
 
         Arguments:
-        pool_size     -- Number of threads to have in pool.
-        queue_size    -- Maximum items allowed in task queue.  None means no
-                         limit.
+        min_size    -- Minimum number of threads to have in pool.
+        max_size    -- Optional.  Maximum size pool can grow to.
+        queue_size  -- Maximum items allowed in task queue.  None means no
+                       limit.
 
         """
-        assert(isinstance(pool_size, int)), 'pool_size is not an int'
-        assert(queue_size is None or
-               isinstance(queue_size, int)), 'queue_size is not an int'
+        assert(isinstance(min_size, int)), 'min_size must be int'
+        assert(max_size is None or
+               isinstance(max_size, int)), 'max_size must be int'
 
-        self.__shutting_down = False
-        self.__threads = 0
-        self.__task_queue = queue.Queue(queue_size)
-        self.__resize_lock = threading.Condition(threading.Lock())
+        self._shutdown = False
+        self._threads = 0
+        self._task_queue = queue.Queue()
+        self._resize_lock = threading.Lock()
+        self._idle_lock = threading.Lock()
+        self._idle_count = 0
 
-        self.resize_pool(pool_size)
+        self._min_size = min_size
+        if max_size and max_size > min_size:
+            # Pool is automatically resizeable.
+            self._max_size = max_size
+        else:
+            # Pool is manually resizeable only.
+            self._max_size = min_size
 
 
-    def queue_task(self, function, args=None, callback=None):
-        """
-        Insert a task function into the queue.
+    def __len__(self):
+        """Return the number of threads in the pool."""
+        return self._threads
+
+    def submit(self, function, args=None, callback=None):
+        """Insert a task function into the queue.
 
         """
         assert(isinstance(function, Callable)), 'function is not callable'
         assert(callback is None or
                isinstance(callback, Callable)), 'callback is not callable'
 
-        if not self.__shutting_down:
-            self.__task_queue.put((function, args, callback), False)
+        with self._resize_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot submit tasks after shutdown')
 
-
-    def get_pool_size(self):
-        """Return the number of threads in the pool."""
-        return self.__threads
-
+            self._task_queue.put((function, args, callback), False)
+            self._adjust_thread_count()
 
     def get_queue_size(self):
         """Return the number of items currently in task queue."""
-        return self.__task_queue.qsize()
+        return self._task_queue.qsize()
 
+    def get_idle_threads(self):
+        return self._idle_count
 
     def shutdown(self, wait_for_threads=True, drop_queued_tasks=False):
-        """
-        Shutdown the thread pool.
+        """Shutdown the thread pool.
 
         Optionally wait for worker threads to exit, and optionally drop and
         queued tasks.
         True if exited successfully, False if shutdown was already called.
 
         """
-        if self.__shutting_down:
-            return False
+        with self._resize_lock:
+            if self._shutdown:
+                return False
 
-        self.__shutting_down = True
+            self._shutdown = True
 
-        # Discard remaining tasks on queue.
-        while drop_queued_tasks:
-            try:
-                self.__task_queue.get(False)
-                self.__task_queue.task_done()
-            except queue.Empty:
-                break
+            while drop_queued_tasks:
+                # Discard remaining tasks on queue.
+                try:
+                    self._task_queue.get(False)
+                    self._task_queue.task_done()
+                except queue.Empty:
+                    break
 
-        self.resize_pool(0)
-        if wait_for_threads:
-            self.__task_queue.join()
+            # Enqueue a shutdown request for every thread.
+            while self._threads:
+                self._threads -= 1
+                self._task_queue.put(None)
+
+            # Wait until all shutdown requests are processed.
+            if wait_for_threads:
+                self._task_queue.join()
 
         return True
 
+    def _adjust_thread_count(self):
+        """If more items in the queue than idle threads, add thread.
+
+        """
+        idle = self._idle_count
+        qsize = self._task_queue.qsize()
+
+        # Grow pool if there are more items in queue than there idle threads.
+        if qsize > idle:
+            if self._threads < self._max_size:
+                self._threads += 1
+                t = threading.Thread(target=_worker,
+                                     args=(self._task_queue, self._idle_count,
+                                           self._idle_lock))
+                t.daemon = True
+                t.start()
+
+        # Shrink pool if fewer items in queue than idle threads above minimum
+        # size, and more idle threads than the minimum pool size.
+        #elif idle > self._min_size and qsize < idle:
+        elif qsize < (idle - self._min_size):
+            self._threads -= 1
+            self._task_queue.put(None)
+
+        return self._threads
 
     def resize_pool(self, new_size):
-        """
-        Resize the thread pool to have the specified number of threads.
+        """Resize the thread pool to have the specified number of threads.
 
         """
         if new_size < 0:
             raise ValueError('pool size must be >= 0')
 
-        with self.__resize_lock:
-            if new_size > self.__threads:
+        with self._resize_lock:
+            if new_size > self._threads:
                 # Growing pool.
-                while self.__threads < new_size:
-                    new_thread = _ThreadPoolThread(self.__task_queue)
-                    new_thread.start()
-                    self.__threads += 1
+                args = (self._task_queue, self._idle_count, self._idle_lock)
+                while self._threads < new_size:
+                    self._threads += 1
+                    t = threading.Thread(target=_worker, args=args)
+                    t.daemon = True
+                    t.start()
             else:
                 # Shrinking pool.
-                while self.__threads > new_size:
-                    self.__task_queue.put(None)
-                    self.__threads -= 1
+                while self._threads > new_size:
+                    self._threads -= 1
+                    self._task_queue.put(None)
 
 
-class _ThreadPoolThread(threading.Thread):
+def _worker(task_queue, idle_count, idle_ctr_lock):
+    """Retrieve the next task and execute it, calling the callback if any.
 
     """
-    Pooled thread class.
-
-    """
-
-    def __init__(self, task_queue):
-        """
-        Initialize pool thread.
-
-        """
-        threading.Thread.__init__(self)
-        self.daemon = True
-        self.__task_queue = task_queue
-
-
-    def run(self):
-        """
-        Retrieve the next task and execute it, calling the callback if any.
-
-        """
-        task_queue = self.__task_queue
-        while True:
+    while True:
+        try:
+            task = task_queue.get(False)
+        except queue.Empty:
+            with idle_ctr_lock:
+                idle_count += 1
             # Wait forever.
             task = task_queue.get()
+            with idle_ctr_lock:
+                idle_count -= 1
 
-            # Exit if told to quit.
-            if task is None:
-                task_queue.task_done()
-                break
+        # Exit if told to quit.
+        if task is None:
+            task_queue.task_done()
+            break
 
-            # Execute task.
-            func, args, callback = task
-            try:
-                if args is None:
-                    ret = func()
-                elif isinstance(args, (tuple, list, set)):
-                    ret = func(*args)
-                elif isinstance(args, dict):
-                    ret = func(**args)
-                else:
-                    ret = func(args)
+        # Execute task.
+        func, args, callback = task
+        try:
+            if args is None:
+                ret = func()
+            elif isinstance(args, (tuple, list, set)):
+                ret = func(*args)
+            elif isinstance(args, dict):
+                ret = func(**args)
+            else:
+                ret = func(args)
 
-                if callback is not None:
-                    callback(ret)
-            except Exception as e:
-                print('ThreadPool task raised exception: '+str(e))
+            if callback is not None:
+                callback(ret)
+        except Exception as e:
+            print('ThreadPool task raised exception: '+str(e))
 
-            task_queue.task_done()
-
+        del task
+        task_queue.task_done()

File examples/xmlrpc_reactor.py

         self._shutdown_event = threading.Event()
 
         # Number of threads in pool.
-        pooled_threads = 4
+        min_th = 4
+        max_th = 16
+
         # Create Reactor.
-        self._reactor = reactor.Reactor(pooled_threads)
+        self._reactor = reactor.Reactor(threadpool.ThreadPool(min_th, max_th))
 
         # Create XML-RPC server
         self._server = ThreadedRPCServer(
+[easy_install]
+
 def main():
     setup(
         name='asyncthreads',
-        version= '1.3.2',
+        version= '1.4.0',
         author='Andrew Gillis',
         author_email='gillis.andrewj@gmail.com',
         url='http://bitbucket.org/agillis/asyncthreads',

File test/test_reactor.py

 except ImportError:
     import Queue as queue
 
+# Uncomment to import from repo instead of site-packages.
+#import os
+#import sys
+#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+#sys.path.insert(0, parentdir)
+
 from asyncthreads.reactor import Reactor
+from asyncthreads.threadpool import ThreadPool
 
 
 class TestReactor(object):
     def setup_class(cls):
         gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
-        cls.rktr = Reactor(0)
+        cls.rktr = Reactor(ThreadPool(2, 16))
         cls.called = False
         cls.cb_event = threading.Event()
         cls.cb_rsp = None
         print('Calling function with no args in main thread')
         self.reset_called()
         result = self.rktr.call(self._func1)
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'abc', 'wrong result'
 
         print('Calling function in main thread')
         self.reset_called()
         result = self.rktr.call(self._func2, ('hello', 'world'))
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
         self.reset_called()
         result = self.rktr.call(self._func2, ('hello', 'world'),
                                 self._callback)
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert self.cb_event.wait(3), 'cb_event should be set'
         assert ret == 'hello-world'
         print('Calling function in other thread')
         self.reset_called()
         result = self.rktr.call_in_thread(self._func2, ('hello', 'world'))
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
         self.reset_called()
         result = self.rktr.call_in_thread(self._func2, ('hello', 'world'),
                                           self._callback)
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert self.cb_event.wait(3), 'cb_event should be set'
         assert ret == 'hello-world'
         self.reset_called()
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
 
-        assert not result.has_result(), 'should not have result yet'
+        assert not result.done(), 'should not have result yet'
         time.sleep(4)
-        assert result.has_result(), 'should have result by now'
+        assert result.done(), 'should have result by now'
 
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'),
                                       self._callback)
 
-        assert not result.has_result(), 'should not have result yet'
+        assert not result.done(), 'should not have result yet'
         time.sleep(4)
-        assert result.has_result(), 'should have result by now'
+        assert result.done(), 'should have result by now'
 
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert self.cb_event.wait(3), 'cb_event should be set'
         assert ret == 'hello-world'
         result = self.rktr.call_in_thread_later(3, self._func2,
                                                 ('hello', 'world'))
 
-        assert not result.has_result(), 'should not have result yet'
+        assert not result.done(), 'should not have result yet'
         time.sleep(4)
-        assert result.has_result(), 'should have result by now'
+        assert result.done(), 'should have result by now'
 
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
         result = self.rktr.call_in_thread_later(
             3, self._func2, ('hello', 'world'), self._callback)
 
-        assert not result.has_result(), 'should not have result yet'
+        assert not result.done(), 'should not have result yet'
         time.sleep(4)
-        assert result.has_result(), 'should have result by now'
+        assert result.done(), 'should have result by now'
 
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert self.cb_event.wait(3), 'cb_event should be set'
         assert ret == 'hello-world'
         self.reset_called()
         print('Calling function in 3 seconds')
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
-        assert not result.has_result(), 'should not have result yet'
+        assert not result.done(), 'should not have result yet'
 
         assert self.rktr.cancel_scheduled(result),\
                'failed to find scheduled task'
         time.sleep(4)
-        rsp = result.get_result()
-        assert result.is_canceled(), 'failed to mark result as canceled'
+        rsp = result.result()
+        assert result.canceled(), 'failed to mark result as canceled'
 
         assert rsp is None, 'result should be None'
         assert not self.called, 'called should be False'
         # Check for expected excption
         ret = None
         with pytest.raises(Exception) as ex:
-            ret = result.get_result()
+            ret = result.result()
         assert str(ex).endswith('some big problem')
         assert self.called, 'called should be True'
         assert ret is None, 'return should be None'
-        assert len(result.get_traceback()) > 0, 'should be traceback info'
+        assert len(result.traceback()) > 0, 'should be traceback info'
 
         print('Calling function in other thread that raises exception')
         self.reset_called()
         # Check for expected excption
         ret = None
         with pytest.raises(Exception) as ex:
-            ret = result.get_result()
+            ret = result.result()
         assert str(ex).endswith('some big problem')
         assert self.called, 'called should be True'
         assert ret is None, 'return should be None'
-        assert len(result.get_traceback()) > 0, 'should be traceback info'
+        assert len(result.traceback()) > 0, 'should be traceback info'
 
 
     def test_exception_in_callback(self):
         self.reset_called()
         result = self.rktr.call(self._func1, (), self._callback_error)
         with pytest.raises(Exception) as ex:
-            result.get_result()
-        assert len(result.get_traceback()) > 0, 'should be traceback info'
+            result.result()
+        assert len(result.traceback()) > 0, 'should be traceback info'
         assert str(ex).endswith('problem in callback')
         assert self.called, 'called should be True'
 
         self.reset_called()
         result = self.rktr.call_in_thread(self._func1,(),self._callback_error)
         with pytest.raises(Exception) as ex:
-            result.get_result()
+            result.result()
         assert str(ex).endswith('problem in callback')
         assert self.called, 'called should be True'
-        assert len(result.get_traceback()) > 0, 'should be traceback info'
+        assert len(result.traceback()) > 0, 'should be traceback info'
 
 
     def test_defer_call(self):
         print('Calling func on reactor thread that defers processing to thread')
         self.reset_called()
         result = self.rktr.call(self._defer_func2, ('hello', 'world'))
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
               'thread and sets new callback')
         self.reset_called()
         result = self.rktr.call(self._defer2_func2, ('hello', 'world'))
-        ret = result.get_result()
+        ret = result.result()
         assert self.called, 'called should be True'
         assert self.new_cb_called,\
                'new callback not called by deferred thread'
         while count < 3:
             r = rq.get(True, 5)
             count += 1
-            ret = r.get_result()
+            ret = r.result()
             print('got result:', ret)
             if count == 1:
                 assert ret == 'delay-one'

File test/test_threadpool.py

 import time
 from random import randrange
 
+# Uncomment to import from repo instead of site-packages.
+#import os
+#import sys
+#parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+#sys.path.insert(0, parentdir)
+
 from asyncthreads.threadpool import ThreadPool
 
 # Sample task 1: given a start and end value, shuffle integers,
 # then sort them
 def sort_task(first, last):
-    print("SortTask starting for ", (first, last))
+    #print("SortTask starting for ", (first, last))
     numbers = list(range(first, last))
     for a in numbers:
         rnd = randrange(0, len(numbers) - 1)
         a, numbers[rnd] = numbers[rnd], a
-    print("SortTask sorting for ", (first, last))
+    #print("SortTask sorting for ", (first, last))
     numbers.sort()
-    print("SortTask done for ", (first, last))
+    #print("SortTask done for ", (first, last))
     return ("Sorter ", (first, last))
 
 # Sample task 2: just sleep for a number of seconds.
 def wait_task(data):
-    print("WaitTask starting for ", data)
-    print("WaitTask sleeping for %d seconds" % data)
+    #print("WaitTask starting for ", data)
+    #print("WaitTask sleeping for %d seconds" % data)
     time.sleep(data)
     return "Waiter", data
 
 # Both tasks use the same callback
 def task_callback(data):
-    print("Callback called for", data)
+    #print("Callback called for", data)
+    pass
 
+POOL_MIN = 4
+POOL_MAX = 24
 
 class TestThreadPool(object):
 
     def setup_class(cls):
         gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
-        cls.pool = ThreadPool(20, 256)
+        cls.pool = ThreadPool(POOL_MIN, POOL_MAX)
 
 
     def test_start(self):
-        assert self.pool.get_pool_size() == 20
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
-        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
+        self.pool.submit(wait_task, 15, task_callback)
         # Insert tasks into the queue and let them run
         print('start')
-        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
-        self.pool.queue_task(wait_task, 5, task_callback)
-        self.pool.queue_task(sort_task, (200, 200000), task_callback)
-        self.pool.queue_task(wait_task, 2, task_callback)
-        self.pool.queue_task(sort_task, (3, 30000), task_callback)
-        self.pool.queue_task(wait_task, 7, task_callback)
-        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
-        self.pool.queue_task(wait_task, 5, task_callback)
-        self.pool.queue_task(sort_task, (200, 200000), task_callback)
-        self.pool.queue_task(wait_task, 2, task_callback)
-        self.pool.queue_task(sort_task, (3, 30000), task_callback)
-        self.pool.queue_task(wait_task, 7, task_callback)
-        self.pool.queue_task(sort_task, (201, 200001), task_callback)
-        self.pool.queue_task(sort_task, (202, 200002), task_callback)
-        self.pool.queue_task(sort_task, (203, 200003), task_callback)
-        self.pool.queue_task(sort_task, (204, 200004), task_callback)
-        self.pool.queue_task(sort_task, (205, 200005), task_callback)
-        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
-        self.pool.queue_task(wait_task, 5, task_callback)
-        self.pool.queue_task(sort_task, (200, 200000), task_callback)
-        self.pool.queue_task(wait_task, 2, task_callback)
-        self.pool.queue_task(sort_task, (3, 30000), task_callback)
-        self.pool.queue_task(wait_task, 7, task_callback)
-        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
-        self.pool.queue_task(wait_task, 5, task_callback)
-        self.pool.queue_task(sort_task, (200, 200000), task_callback)
-        self.pool.queue_task(wait_task, 2, task_callback)
-        self.pool.queue_task(sort_task, (3, 30000), task_callback)
-        self.pool.queue_task(wait_task, 7, task_callback)
-        self.pool.queue_task(sort_task, (201, 200001), task_callback)
-        self.pool.queue_task(sort_task, (202, 200002), task_callback)
-        self.pool.queue_task(sort_task, (203, 200003), task_callback)
-        self.pool.queue_task(sort_task, (204, 200004), task_callback)
-        self.pool.queue_task(sort_task, (205, 200005), task_callback)
+        self.pool.submit(sort_task, (1000, 100000), task_callback)
+        self.pool.submit(wait_task, 5, task_callback)
+        self.pool.submit(sort_task, (200, 200000), task_callback)
+        self.pool.submit(wait_task, 2, task_callback)
+        self.pool.submit(sort_task, (3, 30000), task_callback)
+        self.pool.submit(wait_task, 7, task_callback)
+        self.pool.submit(sort_task, (1000, 100000), task_callback)
+        self.pool.submit(wait_task, 5, task_callback)
+        self.pool.submit(sort_task, (200, 200000), task_callback)
+        self.pool.submit(wait_task, 2, task_callback)
+        self.pool.submit(sort_task, (3, 30000), task_callback)
+        self.pool.submit(wait_task, 7, task_callback)
+        self.pool.submit(sort_task, (201, 200001), task_callback)
+        self.pool.submit(sort_task, (202, 200002), task_callback)
+        self.pool.submit(sort_task, (203, 200003), task_callback)
+        self.pool.submit(sort_task, (204, 200004), task_callback)
+        self.pool.submit(sort_task, (205, 200005), task_callback)
+        self.pool.submit(sort_task, (1000, 100000), task_callback)
+        self.pool.submit(wait_task, 5, task_callback)
+        self.pool.submit(sort_task, (200, 200000), task_callback)
+        self.pool.submit(wait_task, 2, task_callback)
+        self.pool.submit(sort_task, (3, 30000), task_callback)
+        self.pool.submit(wait_task, 7, task_callback)
+        self.pool.submit(sort_task, (1000, 100000), task_callback)
+        self.pool.submit(wait_task, 5, task_callback)
+        self.pool.submit(sort_task, (200, 200000), task_callback)
+        self.pool.submit(wait_task, 2, task_callback)
+        self.pool.submit(sort_task, (3, 30000), task_callback)
+        self.pool.submit(wait_task, 7, task_callback)
+        self.pool.submit(sort_task, (201, 200001), task_callback)
+        self.pool.submit(sort_task, (202, 200002), task_callback)
+        self.pool.submit(sort_task, (203, 200003), task_callback)
+        self.pool.submit(sort_task, (204, 200004), task_callback)
+        self.pool.submit(sort_task, (205, 200005), task_callback)
 
         for _ in range(3):
             print('---> queued items:', self.pool.get_queue_size())
+            print('---> idle threads:', self.pool.get_idle_threads())
+            print('---> thread count:', len(self.pool))
             time.sleep(2)
 
-        self.pool.resize_pool(80)
-        assert self.pool.get_pool_size() == 80
+        #self.pool.resize_pool(80)
+        #assert len(self.pool) == 80
 
         while self.pool.get_queue_size():
             print('---> queued items:', self.pool.get_queue_size())
+            print('---> idle threads:', self.pool.get_idle_threads())
+            print('---> thread count:', len(self.pool))
             time.sleep(5)
 
         print('finish')
         print('---> queued items:', self.pool.get_queue_size())
         while self.pool.get_queue_size():
             print('---> queued items:', self.pool.get_queue_size())
+            print('---> idle threads:', self.pool.get_idle_threads())
+            print('---> thread count:', len(self.pool))
             time.sleep(5)