Commits

gill...@gmail.com  committed e2c06d4

initial commit

  • Participants

Comments (0)

Files changed (8)

+syntax: glob
+*~
+*.pyc
+*.pyo
+*.egg
+*.swp
+*.egg-info
+build
+dist
+*.log
+*.tmp
+===================================================
+Asynchronous Threads Utility Module -- asyncthreads
+===================================================
+
+.. contents:: **Table of Contents**
+
+Overview
+========
+
+The asyncthreads module provides common threading design patterns and utilities
+needed for asynchronous and multithreaded programming.  This includes a thread
+pool and a reactor, which together combine to create a highly reliable
+concurrent event processing system.
+
+The ThreadPool maintains a pool of threads to be reused for processing work, to
+avoid the overhead of repeatedly creating new threads.  The Reactor implements
+the reactor pattern and is used to serially process work in an asynchronous
+thread.
+
+Combining Reactor and ThreadPool
+--------------------------------
+
+The Reactor is used to process events (queued function calls) serially.  Events
+are processed in the order queued and no event is precessed until the previous
+one completes.  Callbacks may be registered to handle the completion of event
+processing.
+
+Since the reactor blocks on any operation, it is necessary to ensure that the
+processing of events and their callbacks executes quickly and return control
+back to the reactor so that is can continue processing events.  Threads can be
+used to accomplish this, but creating and starting a thread takes time.  Using
+a thread pool avoids the overhead of thread creation, and executes additional
+processing and callbacks asynchronously without blocking the Reactor.
+
+Combining a thread pool with a Reactor results in an elegant solution to
+schedule and process work, serially where protection to shared resources is
+required and concurrently where work can be done in parallel.
+
+
+Project Details
+===============
+
+ - Project management on `bitbucket <https://bitbucket.org/agillis/asyncthreads>`_
+ - License http://www.opensource.org/licenses/mit-license.php
+
+
+Install
+=======
+
+``python setup.py install``
+
+
+Dependencies
+============
+
+ - Python 2.7 or greater
+
+Development Tools
+-----------------
+
+ - mercurial
+ - py.test
+ - pychecker
+
+
+ThreadPool
+==========
+
+ThreadPool, implemented in threadpool.py, is a re-sizable thread pool.
+
+ThreadPool maintains a pool of threads that execute tasks given to the
+``queue_task()`` method.  The number of worker threads can be changed by
+calling the ``resize_pool()`` method.
+
+When performing concurrent operations or interacting with a blocking API, using
+multiple threads is useful.  However, there is overhead associated with the
+creation of new threads.  If new threads are used frequently to do work, this
+overhead can become very expensive.
+
+Using a managed thread pool allows the use of multiple threads while avoiding
+the overhead of creating new threads every time a worker thread is needed.
+
+Initialization
+--------------
+
+When initializing a ThreadPool, the number of threads and the maximum size of
+the task queue are given.  It is legal to initialize the ThreadPool with zero
+threads, but no tasks will be executed until the pool is resized to have one or
+more threads.
+
+Reactor
+=======
+
+Reactor, implemented in reactor.py, is the reactor pattern to call queued
+functions in a separate thread.  The reactor design pattern is an event
+handling pattern for handling service requests delivered concurrently to a
+service handler by one or more inputs. The service handler then demultiplexes
+the incoming requests and dispatches them synchronously to the associated
+request handlers.  Asynchronous serial processing allows for execution of code
+that would other require thread locks to protect shared resources.
+
+Event handling functions can be queued to be called by the Reactor main thread,
+or by a thread in the Reactor's thread pool.  Functions can be be queued to be
+called immediately (as soon as calling thread is available), or queued to be
+called at a specified later time.
+
+The Reactor uses the ThreadPool class to implement its thread pool.
+
+Thread-safe code execution
+--------------------------
+
+Using the Reactor's main loop to execute code can provide thread safety and
+efficiency.
+
+Concurrent threads may need to perform some operations in a thread-safe manner,
+such as writing data to a transport.  This can be done by acquiring and
+releasing a lock around every write, or by scheduling writes to be done in the
+reactor main thread.  The second choice is often preferable as it can simplify
+code by eliminating the need to manage thread locks.  It can also free up the
+concurrent threads to do more work, which would otherwise be waiting on the
+thread lock for another thread to finish writing.
+
+To call functions from the main Reactor thread, use the ``call()` or
+``call_later()`` methods.
+
+Running code in threads
+-----------------------
+
+The Reactor allows code to be run in separate concurrent threads.  The methods
+``call_in_thread()`` and ``call_later_in_thread()`` queue functions to be run
+by the Reactor's thread pool.
+
+The caller can wait on the response using the returned Result object.  A thread
+from the Reactor's internal thread pool is used to avoid the overhead of
+creating a new thread.
+
+The Result object
+-----------------
+
+All of the Reactor's ``call_()`` methods return a Result object.  This means
+that that all functions executed by the Reactor have an associated Result.  The
+Result object allows the caller to wait for and retrieve the result of a
+function called by the Reactor.  This not only includes any return from the
+called function, but any exception as well.
+
+Calling the Result object's ``get_result()`` method will return whatever the
+called function returned, will raise an ``Exception`` if the called function
+raised an ``Exception``, or will wait until the scheduled function has been
+called.
+
+A Result object is used to identify a scheduled function to cancel.  It can
+also be used to check if a scheduled call has been canceled.
+
+Canceling Scheduled Execution
+------------------------------
+
+Calling the Reactor methods ``call_later()`` and ``call_in_thread_later()``
+schedules a function to be executed at a later time by the Reactor's main
+thread, or by one of the Reactor's thread pool threads respectively.  If the
+code has not yet been executed, its scheduled execution can be canceled by
+calling the ``cancel_scheduled()`` method and passing it the Result object
+returned by ``call_later()`` or ``call_in_thread_later()``.
+
+The ``cancel_scheduled()`` method returns ``True`` if the scheduled call was
+successfully canceled and ``False`` if scheduled call was not found (already
+executed) or already canceled.
+
+Defer additional execution from main thread to pooled thread
+------------------------------------------------------------
+
+Since the reactor main thread blocks on any operation, it is necessary to
+ensure that callbacks execute quickly and return control back to the reactor to
+continue processing events.  The Reactor's thread pool avoids the overhead of
+thread creation and executes additional processing and callbacks asynchronously
+without blocking the Reactor.
+
+When using a thread to execute additional processing and callbacks, the results
+of the additional processing must be retrieved using the original Result
+object.  This is the purpose of deferred processing.
+
+To use deferred processing, the function executing in the main thread calls
+``defer_to_thread()``, returning the object returned by ``defer_to_thread()``.
+The reactor recognizes the special return value, reuses the original Result
+object, and queues the given task to a pooled thread.
+
+The caller's Result object is reused to receive the result of calling the
+specified deferred function.
+

File asyncthreads/__init__.py

+__all__ = [
+    'threadpool',
+    'reactor'
+    ]

File asyncthreads/reactor.py

+"""
+Reactor pattern to call queued functions in a separate thread.
+
+Functions can be queued to be called by the reactor main thread, or by a thread
+in the reactor's thread pool.  Functions can be be queued to be called
+immediately (as soon as calling thread is available), or queued to be called at
+a specified later time.
+
+"""
+__author__ = "Andrew Gillis"
+__license__ = "http://www.opensource.org/licenses/mit-license.php"
+__maintainer__ = "Andrew Gillis"
+__email__ = "gillis.andrewj@gmail.com"
+
+import threading
+import Queue
+import traceback
+import heapq
+import time
+import threadpool
+
+
+class Result(object):
+
+    """
+    A Result object is returned by a Reactor's call_() methods.
+
+    The Result object allows the caller to wait for and retrieve the result of
+    a function called by the Reactor.  This not only includes any return from
+    the called function, but any exception as well.
+
+    A Result object is also used to identify a scheduled function to cancel,
+    and it can be used to check if a scheduled call has been canceled.
+
+    """
+
+    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id')
+    def __init__(self):
+        self._call_done = threading.Event()
+        self._rsp = None
+        self._exception = None
+        self._traceback_str = None
+
+        # For canceling scheduled tasks.
+        self._task_id = None
+
+
+    def get_result(self, timeout=None):
+        if self._call_done.wait(timeout):
+            if self._exception:
+                raise self._exception
+
+            return self._rsp
+        return None
+
+
+    def has_result(self):
+        return self._call_done.is_set()
+
+
+    def get_traceback(self):
+        return self._traceback_str
+
+
+    def is_canceled(self):
+        return self._task_id is False
+
+
+class _Deferred(object):
+    # Internal object used to transition from execution in main thread to
+    # execution in pool thread.
+    #
+    slots = ('action_tuple',)
+    def __init__(self, func, args=None, callback=None):
+        self.action_tuple = (func, args, callback)
+
+
+class Reactor(threading.Thread):
+
+    """
+    The Reactor executes queued functions in a single thread.  Functions can be
+    executed immediately, in the order they are enqueued, or they can be
+    executed at some later time.  Functions may also be executed, immediately
+    or later, in threads separate from the reactor's main thread.
+
+    All functions executed by the reactor have an associated Result object.
+    A Result object allows the caller to wait for and retrieve the result of a
+    function called by the reactor.
+
+    The reason to use a Reactor is to execute functions in a separate thread,
+    asynchronously, from the thread of the caller.
+
+    """
+
+    # How long to wait for thread to shutdown.
+    SHUTDOWN_FAILURE_TIMEOUT_SEC = 10
+
+
+    def __init__(self, thread_pool_size=0, queue_size=None):
+        """
+        Initialize Reactor thread object and internal thread pool (if used).
+
+        Arguments:
+        pool_size     -- Number of threads to have in pool.  If this is set to
+                         0 or None, then the reactor will not use a thread pool
+                         and will start threads directly.
+        queue_size    -- Maximum items allowed in task queue.  None means no
+                         limit.  This is ignored if thread pool not used.
+
+        """
+        threading.Thread.__init__(self, None, None, None)
+        self._call_queue = Queue.Queue()
+        self.idle_wake_sec = None
+        if thread_pool_size:
+            self._thread_pool = threadpool.ThreadPool(thread_pool_size,
+                                                      queue_size)
+        else:
+            self._thread_pool = None
+
+        self._task_heap = []
+
+
+    def call(self, func, args=None, callback=None):
+        """
+        Enqueue a function for the reactor to run in its main thread.
+
+        The caller can wait on the response using the returned Result object.
+        Running a function in the reactor's main thread means that no other
+        functions will be running in the reactor's main thread at the same
+        time.  This can be used in place of other synchronization.
+
+        Arguments:
+        func     -- Function to execute.
+        args     -- Argument tuple to pass to function.
+        callback -- Optional callback that takes a Result as its argument.
+
+        Return:
+        Result object.
+
+        """
+        assert(self.is_alive()), 'reactor is not started'
+        assert(callable(func)), 'function is not callable'
+        assert(callback is None or
+               callable(callback)),'callback is not callable'
+        result = Result()
+        self._call_queue.put((func, args, result, callback))
+        return result
+
+
+    def call_later(self, time_until_call, func, args=None, callback=None):
+        """
+        Run a function in the reactor's main thread at a later time.
+
+        The caller can wait on the response using the returned Result object.
+
+        Arguments:
+        time_until_call -- Seconds remaining until call.
+        func            -- Function to execute.
+        args            -- Argument tuple to pass to function.
+        callback        -- Optional callback that takes a Result as its
+                           argument.
+
+        Return:
+        Result object.
+
+        """
+        assert(self.is_alive()), 'reactor is not started'
+        assert(callable(func)), 'function is not callable'
+        assert(callback is None or
+               callable(callback)), 'callback is not callable'
+        if time_until_call:
+            action_time = time.time() + time_until_call
+        else:
+            action_time = 0
+
+        result = Result()
+        d_data = (func, args, result, callback)
+        self._call_queue.put((None, d_data, action_time, False))
+        return result
+
+
+    def call_in_thread(self, func, args=None, callback=None):
+        """
+        Run the given function in a separate thread.
+
+        The caller can wait on the response using the returned Result object.
+        A thread from the reactor's internal thread pool is used to avoid the
+        overhead of creating a new thread.
+
+        Arguments:
+        func     -- Function to execute.
+        args     -- Argument tuple to pass to function.
+        callback -- Optional callback that takes a Result as its argument.
+
+        Return:
+        Result object.
+
+        """
+        assert(self.is_alive()), 'reactor is not started'
+        assert(callable(func)), 'function is not callable'
+        assert(callback is None or
+               callable(callback)), 'callback is not callable'
+        result = Result()
+        if self._thread_pool:
+            self._thread_pool.queue_task(self._thread_wrapper,
+                                         (func, args, result, callback))
+        else:
+            th = threading.Thread(target=self._thread_wrapper,
+                                  args=(func, args, result, callback))
+            th.daemon = True
+            th.start()
+        return result
+
+
+    def call_in_thread_later(self, time_until_call, func, args=None,
+                             callback=None):
+        """
+        Run the given function in a separate thread, at a later time.
+
+        The caller can wait on the response using the returned Result object.
+        A thread from the reactor's internal thread pool is used to avoid the
+        overhead of creating a new thread.
+
+        Arguments:
+        time_until_call -- Seconds remaining until call.
+        func            -- Function to execute.
+        args            -- Argument tuple to pass to function.
+        callback        -- Optional callback that takes a Result as its
+                           argument.
+
+        Return:
+        Result object.
+
+        """
+        assert(self.is_alive()), 'reactor is not started'
+        assert(callable(func)), 'function is not callable'
+        assert(callback is None or
+               callable(callback)), 'callback is not callable'
+        if time_until_call:
+            action_time = time.time() + time_until_call
+        else:
+            action_time = 0
+
+        result = Result()
+        d_data = (func, args, result, callback)
+        self._call_queue.put((None, d_data, action_time, True))
+        return result
+
+
+    def cancel_scheduled(self, result):
+        """
+        Cancel a call scheduled to be called at a later time.
+
+        Arguments:
+        result -- Result object returned from call_later() or from
+                  call_in_thread_later().
+
+        Return:
+        True if scheduled call was successfully canceled.  False if scheduled
+        call was not found (already executed) or already canceled.
+
+        """
+        # The result's task_id cannot be checked in this method.  The request
+        # must be placed on the reactor's call queue to ensure that a scheduled
+        # task request is received by the reactor before this request to cancel
+        # it is received.
+        tmp_result = self.call(self._cancel_scheduled, result)
+        # Wait for reactor to remove task, and return whether or not the task
+        # was removed.
+        return tmp_result.get_result()
+
+
+    def shutdown(self):
+        """
+        Cause the reactor thread to exit gracefully.
+
+        """
+        if not self.is_alive():
+            #print "thread is not started"
+            return False
+
+        self._call_queue.put(None)
+        if self._thread_pool is not None:
+            self._thread_pool.shutdown()
+
+        # Wait for thread to exit.
+        self.join(Reactor.SHUTDOWN_FAILURE_TIMEOUT_SEC)
+        if self.is_alive():
+            #print '===> timed out waiting to join:', self.getName()
+            return False
+
+        #print '===> thread exited:', self.getName()
+        return True
+
+
+    def qsize(self):
+        """Return number of items in the message queue."""
+        return self._call_queue.qsize()
+
+
+    def empty(self):
+        """Return True is no items in the message queue."""
+        return self._call_queue.empty()
+
+
+    def defer_to_thread(self, func, args=None, callback=None):
+        """
+        Defer execution from main thread to pooled thread, and use the caller's
+        Result object to return the result of the additional execution.
+
+        Arguments:
+        func     -- Function to execute in pooled thread.
+        args     -- Arguments to pass to func.
+        callback -- Optional function to call with the Results of calling func.
+
+        Returns:
+        Special object recognized by Reactor to set up deferred call.
+
+        """
+        return _Deferred(func, args, callback)
+
+
+    #==========================================================================
+    # Private methods follow:
+    #==========================================================================
+
+    def _thread_wrapper(self, func, args, result, callback):
+        try:
+            if args is None:
+                rsp = func()
+            elif isinstance(args, (tuple, list, set)):
+                rsp = func(*args)
+            elif isinstance(args, dict):
+                rsp = func(**args)
+            else:
+                rsp = func(args)
+        except Exception as e:
+            rsp = None
+            result._exception = e
+            result._traceback_str = traceback.format_exc()
+
+        if isinstance(rsp, _Deferred):
+            func, args, new_callback = rsp.action_tuple
+            if new_callback is not None:
+                callback = new_callback
+            if self._thread_pool:
+                self._thread_pool.queue_task(self._thread_wrapper,
+                                             (func, args, result, callback))
+            else:
+                th = threading.Thread(target=self._thread_wrapper,
+                                      args=(func, args, result, callback))
+                th.daemon = True
+                th.start()
+            return
+
+        result._rsp = rsp
+        result._call_done.set()
+        if callback:
+            try:
+                callback(result)
+            except Exception as e:
+                rsp = None
+                result._exception = e
+                result._traceback_str = traceback.format_exc()
+
+
+    def _process_scheduled_queue(self):
+        """
+        Process all scheduled tasks that are currently due.
+
+        """
+        task_heap = self._task_heap
+        # Execute the first task without checking the time, because this method
+        # is only called when a scheduled task is due.
+        task_time, task_id, task_data, call_in_thread = heapq.heappop(
+            task_heap)
+
+        # If this task has been removed, then do nothing.
+        if task_data is None:
+            return
+
+        func, args, result, callback = task_data
+        if call_in_thread:
+            if self._thread_pool is not None:
+                # Call thread pool to execute scheduled function.
+                self._thread_pool.queue_task(self._thread_wrapper,
+                                             (func, args, result, callback))
+            else:
+                th = threading.Thread(target=self._thread_wrapper,
+                                      args=(func, args, result, callback))
+                th.daemon = True
+                th.start()
+        else:
+            #print '===> calling scheduled function'
+            self._thread_wrapper(func, args, result, callback)
+
+        if task_heap:
+            now = time.time()
+            next_time = task_heap[0][0]
+            while task_heap and (not next_time or next_time <= now):
+                task_time, task_id, task_data, call_in_thread = heapq.heappop(
+                    task_heap)
+                func, args, result, callback = task_data
+                if call_in_thread:
+                    #print '===> calling thread pool to do scheduled function'
+                    if self._thread_pool is not None:
+                        self._thread_pool.queue_task(
+                            self._thread_wrapper,
+                            (func, args, result, callback))
+                    else:
+                        th = threading.Thread(
+                            target=self._thread_wrapper,
+                            args=(func, args, result, callback))
+                        th.daemon = True
+                        th.start()
+                else:
+                    #print '===> calling scheduled function'
+                    self._thread_wrapper(func, args, result, callback)
+                if task_heap:
+                    next_time = task_heap[0][0]
+
+
+    def _cancel_scheduled(self, result):
+        """
+        Mark a scheduled call, in scheduled task heap, as canceled.
+
+        """
+        task_id = result._task_id
+        if task_id is None or task_id is False:
+            # Task is not scheduled or is already canceled.
+            return False
+        task_heap = self._task_heap
+        for idx, heap_item in enumerate(task_heap):
+            task_time, heap_task_id, task_data, call_in_thread = heap_item
+            if heap_task_id == task_id:
+                func, args, result, callback = task_data
+                result._rsp = None
+                result._task_id = False
+                result._call_done.set()
+                # Need to keep task_id to preserve heap invariant.
+                task_heap[idx] = (task_time, heap_task_id, None, None)
+                return True
+        return False
+
+
+    def run(self):
+        """
+        Function run in thread, started by the start() method.
+
+        """
+        call_queue = self._call_queue
+        task_heap = self._task_heap
+        next_task_id = 0
+        while True:
+            sleep_time = None
+            # If there are items on the scheduled task heap, then sleep until
+            # next scheduled task is due.
+            if task_heap:
+                now = time.time()
+                # Look at timer at top of heap.
+                action_time = task_heap[0][0]
+                if not action_time or action_time <= now:
+                    sleep_time = 0
+                else:
+                    sleep_time = action_time - now
+
+            try:
+                #print '===> sleeping for %s seconds, or until next call put on queue' % (sleep_time,)
+                # Get the next function from the call queue.
+                action_tuple = call_queue.get(True, sleep_time)
+                if action_tuple is None:
+                    break
+                func, args, result, callback = action_tuple
+
+                # If this is a new scheduled task.
+                if func is None:
+                    time_to_call = result
+                    call_in_thread = callback # callback is really thread flag
+                    f,a,r,c = args
+                    r._task_id = next_task_id
+                    heapq.heappush(task_heap,
+                                   (time_to_call, next_task_id, args,
+                                    call_in_thread))
+                    next_task_id += 1
+                else:
+                    self._thread_wrapper(func, args, result, callback)
+
+            except Queue.Empty:
+                # Woke up because it is time do next scheduled task.
+                #print '===> woke up to do next scheduled task'
+                self._process_scheduled_queue()
+                continue
+
+            except Exception:
+                print 'ERROR:', traceback.format_exc()
+
+        return True
+

File asyncthreads/threadpool.py

+"""
+Resizable thread pool.
+
+"""
+__author__ = "Andrew Gillis"
+__license__ = "http://www.opensource.org/licenses/mit-license.php"
+__maintainer__ = "Andrew Gillis"
+__email__ = "gillis.andrewj@gmail.com"
+
+import threading
+import Queue
+import time
+
+
+class ThreadPool(object):
+
+    """
+    Resizable thread pool class.
+
+    Maintain a pool of threads that execute tasks given to queue_task().  The
+    number of worker whreads can be changed by calling resize_pool().
+
+    """
+
+    def __init__(self, pool_size, queue_size=None):
+        """
+        Initialize the thread pool and start the specified number of threads.
+
+        Arguments:
+        pool_size     -- Number of threads to have in pool.
+        queue_size    -- Maximum items allowed in task queue.  None means no
+                         limit.
+
+        """
+        assert(isinstance(pool_size, int)), 'pool_size is not an int'
+        assert(queue_size is None or
+               isinstance(queue_size, int)), 'queue_size is not an int'
+
+        self.__shutting_down = False
+        self.__threads = 0
+        self.__task_queue = Queue.Queue(queue_size)
+        self.__resize_lock = threading.Condition(threading.Lock())
+
+        self.resize_pool(pool_size)
+
+
+    def queue_task(self, function, args=None, callback=None):
+        """
+        Insert a task function into the queue.
+
+        """
+        assert(callable(function)), 'function is not callable'
+        assert(callback is None or
+               callable(callback)), 'callback is not callable'
+
+        if not self.__shutting_down:
+            self.__task_queue.put((function, args, callback), False)
+
+
+    def get_pool_size(self):
+        """Return the number of threads in the pool."""
+        return self.__threads
+
+
+    def get_queue_size(self):
+        """Return the number of items currently in task queue."""
+        return self.__task_queue.qsize()
+
+
+    def shutdown(self, wait_for_threads=True, drop_queued_tasks=False):
+        """
+        Shutdown the thread pool.
+
+        Optionally wait for worker threads to exit, and optionally drop and
+        queued tasks.
+
+        Arguments:
+        wait_for_threads  -- Wait for all threads to complete.
+        drop_queued_tasks -- Discard any tasks that are on the task queue, but
+                             have not yet been processed.
+
+        Return:
+        True if exited successfully, False if shutdown was already called.
+
+        """
+        if self.__shutting_down:
+            return False
+
+        self.__shutting_down = True
+
+        # Discard remaining tasks on queue.
+        while drop_queued_tasks:
+            try:
+                self.__task_queue.get(False)
+                self.__task_queue.task_done()
+            except Queue.Empty:
+                break
+
+        self.resize_pool(0)
+        if wait_for_threads:
+            self.__task_queue.join()
+
+        return True
+
+
+    def resize_pool(self, new_size):
+        """
+        Resize the thread pool to have the specified number of threads.
+
+        """
+        if new_size < 0:
+            raise ValueError('pool size must be >= 0')
+
+        with self.__resize_lock:
+            if new_size > self.__threads:
+                # Growing pool.
+                while self.__threads < new_size:
+                    new_thread = _ThreadPoolThread(self.__task_queue)
+                    new_thread.start()
+                    self.__threads += 1
+            else:
+                # Shrinking pool.
+                while self.__threads > new_size:
+                    self.__task_queue.put(None)
+                    self.__threads -= 1
+
+
+class _ThreadPoolThread(threading.Thread):
+
+    """
+    Pooled thread class.
+
+    """
+
+    def __init__(self, task_queue):
+        """
+        Initialize pool thread.
+
+        """
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.__task_queue = task_queue
+
+
+    def run(self):
+        """
+        Retrieve the next task and execute it, calling the callback if any.
+
+        """
+        task_queue = self.__task_queue
+        while True:
+            # Wait forever.
+            task = task_queue.get()
+
+            # Exit if told to quit.
+            if task is None:
+                task_queue.task_done()
+                break
+
+            # Execute task.
+            func, args, callback = task
+            try:
+                if args is None:
+                    ret = func()
+                elif isinstance(args, (tuple, list, set)):
+                    ret = func(*args)
+                elif isinstance(args, dict):
+                    ret = func(**args)
+                else:
+                    ret = func(args)
+
+                if callback is not None:
+                    callback(ret)
+            except Exception as e:
+                print 'ThreadPool task raised exception:', str(e)
+
+            task_queue.task_done()
+
+import os, sys
+if sys.version_info >= (3,0):
+    from distribute_setup import use_setuptools
+    use_setuptools()
+from setuptools import setup
+
+
+def main():
+    setup(
+        name='asyncthreads',
+        version= '1.2.0',
+        author='Andrew Gillis',
+        author_email='gillis.andrewj@gmail.com',
+        url='http://bitbucket.org/agillis/asyncthreads',
+        description='asyncthreads: asynchronous thread utility objects',
+        long_description = open('README.txt').read(),
+        license='http://www.opensource.org/licenses/mit-license.php',
+        platforms=['unix', 'linux', 'osx', 'cygwin', 'win32'],
+        keywords='threads threadpool reactor',
+        classifiers=['Development Status :: 5 - Production/Stable',
+                     'Intended Audience :: Developers',
+                     'License :: OSI Approved :: MIT License',
+                     'Operating System :: POSIX',
+                     'Operating System :: Microsoft :: Windows',
+                     'Operating System :: MacOS :: MacOS X',
+                     'Topic :: Software Development :: Libraries',
+                     'Topic :: Utilities',
+                     'Programming Language :: Python',
+                     'Programming Language :: Python :: 2.7',
+                     'Programming Language :: Python :: 3'],
+        packages=['asyncthreads'],
+        zip_safe=True,
+        )
+
+
+if __name__ == '__main__':
+    main()

File test/test_reactor.py

+#
+# Run with py.test
+#
+
+import pytest
+import string
+import gc
+import time
+import threading
+
+from asyncthreads.reactor import Reactor
+
+
+class TestReactor(object):
+
+    @classmethod
+    def setup_class(cls):
+        gc.set_debug(
+            gc.DEBUG_UNCOLLECTABLE|gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
+
+        cls.rktr = Reactor(0)
+        cls.called = False
+        cls.cb_event = threading.Event()
+        cls.cb_rsp = None
+        cls.new_cb_called = False
+        cls.defer_called = False
+
+
+    def _func1(self):
+        TestReactor.called = True
+        return 'abc'
+
+    def _func2(self, arg1, arg2):
+        TestReactor.called = True
+        return '-'.join([arg1, arg2])
+
+    def _callback(self, result):
+        TestReactor.cb_rsp = result.get_result()
+        TestReactor.cb_event.set()
+
+    def _new_cb(self, result):
+        TestReactor.new_cb_called = True
+
+    def _func_error(self):
+        TestReactor.called = True
+        raise Exception('some big problem')
+
+    def _callback_error(self, result):
+        TestReactor.cb_event.set()
+        raise Exception('problem in callback')
+
+    def _defer_func2(self, arg1, arg2):
+        TestReactor.defer_called = True
+        d = self.rktr.defer_to_thread(self._func2, (arg1, arg2))
+        return d
+
+    def _defer2_func2(self, arg1, arg2):
+        TestReactor.defer_called = True
+        d = self.rktr.defer_to_thread(self._func2, (arg1, arg2),
+                                      self._new_cb)
+        return d
+
+    def reset_called(self):
+        TestReactor.called = False
+        TestReactor.cb_event.clear()
+        TestReactor.cb_rsp = None
+        TestReactor.defer_called = False
+        TestReactor.new_cb_called = False
+
+    def test_start(self):
+        print 'Starting reactor'
+        assert self.rktr is not None
+        assert not self.rktr.is_alive()
+        time.sleep(0)
+        self.rktr.start()
+        assert self.rktr.is_alive(), 'reactor should be alive'
+        print 'started'
+
+
+    def test_call(self):
+        """Run in main thread"""
+        print 'Calling function with no args in main thread'
+        self.reset_called()
+        result = self.rktr.call(self._func1)
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'abc', 'wrong result'
+
+        print 'Calling function in main thread'
+        self.reset_called()
+        result = self.rktr.call(self._func2, ('hello', 'world'))
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'hello-world'
+
+        print 'Calling function in main thread with callback'
+        self.reset_called()
+        result = self.rktr.call(self._func2, ('hello', 'world'),
+                                self._callback)
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert self.cb_event.wait(3), 'cb_event should be set'
+        assert ret == 'hello-world'
+        assert self.cb_rsp == 'hello-world'
+
+
+    def test_call_in_thread(self):
+        """Run in other thread"""
+        print 'Calling function in other thread'
+        self.reset_called()
+        result = self.rktr.call_in_thread(self._func2, ('hello', 'world'))
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'hello-world'
+
+        print 'Calling function in other thread with callback'
+        self.reset_called()
+        result = self.rktr.call_in_thread(self._func2, ('hello', 'world'),
+                                          self._callback)
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert self.cb_event.wait(3), 'cb_event should be set'
+        assert ret == 'hello-world'
+        assert self.cb_rsp == 'hello-world'
+
+
+    def test_call_scheduled(self):
+        """Run scheduled in main thread"""
+        print 'Calling function in main thread in 3 seconds'
+        self.reset_called()
+        result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
+
+        assert not result.has_result(), 'should not have result yet'
+        time.sleep(4)
+        assert result.has_result(), 'should have result by now'
+
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'hello-world'
+
+        print 'Calling function in main thread with callback in 3 seconds'
+        self.reset_called()
+        result = self.rktr.call_later(3, self._func2, ('hello', 'world'),
+                                      self._callback)
+
+        assert not result.has_result(), 'should not have result yet'
+        time.sleep(4)
+        assert result.has_result(), 'should have result by now'
+
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert self.cb_event.wait(3), 'cb_event should be set'
+        assert ret == 'hello-world'
+        assert self.cb_rsp == 'hello-world'
+
+
+    def test_call_in_thread_scheduled(self):
+        """Run in other thread"""
+        print 'Calling function in other thread in 3 seconds'
+        self.reset_called()
+        result = self.rktr.call_in_thread_later(3, self._func2,
+                                                ('hello', 'world'))
+
+        assert not result.has_result(), 'should not have result yet'
+        time.sleep(4)
+        assert result.has_result(), 'should have result by now'
+
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'hello-world'
+
+        print 'Calling function in other thread with callback in 3 seconds'
+        self.reset_called()
+        result = self.rktr.call_in_thread_later(
+            3, self._func2, ('hello', 'world'), self._callback)
+
+        assert not result.has_result(), 'should not have result yet'
+        time.sleep(4)
+        assert result.has_result(), 'should have result by now'
+
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert self.cb_event.wait(3), 'cb_event should be set'
+        assert ret == 'hello-world'
+        assert self.cb_rsp == 'hello-world'
+
+
+    def test_cancel_scheduled(self):
+        """Cancel scheduled call"""
+        self.reset_called()
+        print 'Calling function in 3 seconds'
+        result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
+        assert not result.has_result(), 'should not have result yet'
+
+        assert self.rktr.cancel_scheduled(result),\
+               'failed to find scheduled task'
+        time.sleep(4)
+        rsp = result.get_result()
+        assert result.is_canceled(), 'failed to mark result as canceled'
+
+        assert rsp is None, 'result should be None'
+        assert not self.called, 'called should be False'
+
+
+    def test_exception_in_call(self):
+        print 'Calling function in main thread that raises exception'
+        self.reset_called()
+        result = self.rktr.call(self._func_error)
+        # Check for expected excption
+        ret = None
+        with pytest.raises(Exception) as ex:
+            ret = result.get_result()
+            assert str(ex) == 'some big problem'
+        assert self.called, 'called should be True'
+        assert ret is None, 'return should be None'
+        assert len(result.get_traceback()) > 0, 'should be traceback info'
+
+        print 'Calling function in other thread that raises exception'
+        self.reset_called()
+        result = self.rktr.call_in_thread(self._func_error)
+
+        # Check for expected excption
+        ret = None
+        with pytest.raises(Exception) as ex:
+            ret = result.get_result()
+            assert str(ex) == 'some big problem'
+        assert self.called, 'called should be True'
+        assert ret is None, 'return should be None'
+        assert len(result.get_traceback()) > 0, 'should be traceback info'
+
+
+    def test_exception_in_callback(self):
+        print 'Calling function in main thread that raises exception'
+        self.reset_called()
+        result = self.rktr.call(self._func1, (), self._callback_error)
+        assert self.cb_event.wait(3), 'cb_event should be set'
+        with pytest.raises(Exception) as ex:
+            result.get_result()
+            assert str(ex) == 'problem in callback'
+        assert self.called, 'called should be True'
+        assert len(result.get_traceback()) > 0, 'should be traceback info'
+
+
+    def test_defer_call(self):
+        print 'Calling func on reactor thread that defers processing to thread'
+        self.reset_called()
+        result = self.rktr.call(self._defer_func2, ('hello', 'world'))
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert ret == 'hello-world'
+
+
+    def test_defer_call_with_new_cb(self):
+        print 'Calling func on reactor thread that defers processing to '\
+              'thread and sets new callback'
+        self.reset_called()
+        result = self.rktr.call(self._defer2_func2, ('hello', 'world'))
+        ret = result.get_result()
+        assert self.called, 'called should be True'
+        assert self.new_cb_called,\
+               'new callback not called by deferred thread'
+        assert ret == 'hello-world'
+
+    def test_shutdown(self):
+        print 'Shutting down reactor'
+        self.rktr.shutdown()
+        assert not self.rktr.is_alive(), 'reactor should not be alive'
+
+
+    def test_memory_leaks(self):
+        """Check for memory leaks"""
+        # Test for any objects that cannot be freed.
+        assert len(gc.garbage) == 0, 'uncollected garbage: '+str(gc.garbage)
+
+
+

File test/test_threadpool.py

+#
+# Run with py.test
+#
+
+import gc
+import time
+from random import randrange
+
+from asyncthreads.threadpool import ThreadPool
+
+# Sample task 1: given a start and end value, shuffle integers,
+# then sort them
+def sort_task(first, last):
+    print "SortTask starting for ", (first, last)
+    numbers = range(first, last)
+    for a in numbers:
+        rnd = randrange(0, len(numbers) - 1)
+        a, numbers[rnd] = numbers[rnd], a
+    print "SortTask sorting for ", (first, last)
+    numbers.sort()
+    print "SortTask done for ", (first, last)
+    return ("Sorter ", (first, last))
+
+# Sample task 2: just sleep for a number of seconds.
+def wait_task(data):
+    print "WaitTask starting for ", data
+    print "WaitTask sleeping for %d seconds" % data
+    time.sleep(data)
+    return "Waiter", data
+
+# Both tasks use the same callback
+def task_callback(data):
+    print "Callback called for", data
+
+
+class TestThreadPool(object):
+
+    @classmethod
+    def setup_class(cls):
+        gc.set_debug(
+            gc.DEBUG_UNCOLLECTABLE|gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
+
+        cls.pool = ThreadPool(20, 256)
+
+
+    def test_start(self):
+        assert self.pool.get_pool_size() == 20
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        self.pool.queue_task(wait_task, 15, task_callback)
+        # Insert tasks into the queue and let them run
+        print 'start'
+        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
+        self.pool.queue_task(wait_task, 5, task_callback)
+        self.pool.queue_task(sort_task, (200, 200000), task_callback)
+        self.pool.queue_task(wait_task, 2, task_callback)
+        self.pool.queue_task(sort_task, (3, 30000), task_callback)
+        self.pool.queue_task(wait_task, 7, task_callback)
+        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
+        self.pool.queue_task(wait_task, 5, task_callback)
+        self.pool.queue_task(sort_task, (200, 200000), task_callback)
+        self.pool.queue_task(wait_task, 2, task_callback)
+        self.pool.queue_task(sort_task, (3, 30000), task_callback)
+        self.pool.queue_task(wait_task, 7, task_callback)
+        self.pool.queue_task(sort_task, (201, 200001), task_callback)
+        self.pool.queue_task(sort_task, (202, 200002), task_callback)
+        self.pool.queue_task(sort_task, (203, 200003), task_callback)
+        self.pool.queue_task(sort_task, (204, 200004), task_callback)
+        self.pool.queue_task(sort_task, (205, 200005), task_callback)
+        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
+        self.pool.queue_task(wait_task, 5, task_callback)
+        self.pool.queue_task(sort_task, (200, 200000), task_callback)
+        self.pool.queue_task(wait_task, 2, task_callback)
+        self.pool.queue_task(sort_task, (3, 30000), task_callback)
+        self.pool.queue_task(wait_task, 7, task_callback)
+        self.pool.queue_task(sort_task, (1000, 100000), task_callback)
+        self.pool.queue_task(wait_task, 5, task_callback)
+        self.pool.queue_task(sort_task, (200, 200000), task_callback)
+        self.pool.queue_task(wait_task, 2, task_callback)
+        self.pool.queue_task(sort_task, (3, 30000), task_callback)
+        self.pool.queue_task(wait_task, 7, task_callback)
+        self.pool.queue_task(sort_task, (201, 200001), task_callback)
+        self.pool.queue_task(sort_task, (202, 200002), task_callback)
+        self.pool.queue_task(sort_task, (203, 200003), task_callback)
+        self.pool.queue_task(sort_task, (204, 200004), task_callback)
+        self.pool.queue_task(sort_task, (205, 200005), task_callback)
+
+        for _ in range(3):
+            print '---> queued items:', self.pool.get_queue_size()
+            time.sleep(2)
+
+        self.pool.resize_pool(80)
+        assert self.pool.get_pool_size() == 80
+
+        while self.pool.get_queue_size():
+            print '---> queued items:', self.pool.get_queue_size()
+            time.sleep(5)
+
+        print 'finish'
+
+
+    def test_shutdown(self):
+        # When all tasks are finished, allow the threads to terminate
+        print 'Shutting down thread pool'
+        self.pool.shutdown(False, False)
+
+        print '---> queued items:', self.pool.get_queue_size()
+        while self.pool.get_queue_size():
+            print '---> queued items:', self.pool.get_queue_size()
+            time.sleep(5)
+
+
+    def test_memory_leaks(self):
+        """Check for memory leaks"""
+        # Test for any objects that cannot be freed.
+        assert len(gc.garbage) == 0, 'uncollected garbage: '+str(gc.garbage)
+