Commits

Anonymous committed cb84040

Submitting task to ThreadPool returns a completion object, same as with submitting a task to the Reactor.

Optional result queue can be specified when submitting task to ThreadPool.

New Task class to hold task execution data.

Same Task.execute() method used for both Reactor and ThreadPool task execution. Replaces thread_wrapper in Reactor.

Removed redundant asserts.

Do not allow replacing callback when defering reactor call.

  • Participants
  • Parent commits be8e564

Comments (0)

Files changed (5)

asyncthreads/completion.py

+"""
+Wait for an get results of asynchronous function call.
+
+"""
+__author__ = "Andrew Gillis"
+__license__ = "http://www.opensource.org/licenses/mit-license.php"
+__maintainer__ = "Andrew Gillis"
+__email__ = "gillis.andrewj@gmail.com"
+
+import threading
+try:
+    import queue
+except ImportError:
+    import Queue as queue
+from collections import Callable
+import traceback
+
+
+class Completion(object):
+
+    """
+    Completion represents the result of an asynchronous computation.
+
+    A Completion object is returned by a Reactor's call_() methods.  It holds a
+    value that may become available at some point.  The Completion class is
+    equivalent to the Future class offered by other concurrency frameworks.
+
+    The Completion object allows the caller to wait for and retrieve the result
+    of a function called by the Reactor.  This not only includes any return
+    from the called function, but any exception as well.
+
+    A Completion object is also used to identify a scheduled function to
+    cancel, and it can be used to check if a scheduled call has been canceled.
+
+    """
+
+    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id')
+    def __init__(self):
+        self._call_done = threading.Event()
+        self._rsp = None
+        self._exception = None
+        self._traceback_str = None
+
+        # For canceling scheduled tasks.  This is kept here to allow task to be
+        # retrieved using Completion object.
+        self._task_id = None
+
+    def result(self, timeout=None):
+        """Return the value returned by the call.
+
+        If the call has not yet completed then wait up to timeout seconds or
+        until call complete if timeout is None.  It timeout occurs, then raise
+        a RuntimeError exception.  If canceled before completion, then a
+        ThreadError exception is raised.
+
+        If the call raised an exception, then this methods returns the same
+        exception.
+
+        """
+        if self._call_done.wait(timeout):
+            if self._exception:
+                raise self._exception
+
+            return self._rsp
+
+        if self.canceled():
+            # Canceled before completed.
+            raise threading.ThreadError('call canceled before completion')
+
+        # Timeout waiting for completion.
+        raise RuntimeError('timeout waiting for result')
+
+    def done(self):
+        """Return True if call completed or canceled."""
+        return self._call_done.is_set()
+
+    def traceback(self):
+        """Return the traceback string from exception in call."""
+        return self._traceback_str
+
+    def canceled(self):
+        return self._task_id is False
+
+
+class _Task(object):
+
+    """
+    Internal container for task execution data.
+
+    Task objects hold information needed for an executor (ThreadPool, Reactor)
+    to execute a user-specified function, return a result, and notify the
+    caller about the completion of the function call.
+
+    """
+
+    slots = ('future', 'func', 'args', 'callback', 'result_q')
+    def __init__(self, future, func, args=None, callback=None, result_q=None):
+        """Initialize Task object.
+
+        Arguments:
+        future   -- Completion object used to return result and notify caller
+                    that function call has completed.
+        func     -- Function to execute.
+        args     -- Argument tuple to pass to function.
+        callback -- Optional callback that takes return from func as argument.
+                    Function call is not completed until after callback
+                    function has returned.
+        result_q -- Optional response queue to place Completion on.
+
+        """
+        assert(future is None or
+               isinstance(future, Completion)), 'first arg must be Completion'
+        assert(isinstance(func, Callable)), 'function is not callable'
+        assert(callback is None or
+               isinstance(callback, Callable)),'callback is not callable'
+        assert(result_q is None or
+               isinstance(result_q, queue.Queue)), 'result_q is not Queue'
+
+        self.future = future
+        self.func = func
+        self.args = args
+        self.callback = callback
+        self.result_q = result_q
+
+    def execute(self):
+        """Execute task function and handle result.
+
+        This method is executed by the executor to which this Task object was
+        submitted.
+
+        """
+        # Execute task.
+        args = self.args
+        future = self.future
+        func = self.func
+        try:
+            if args is None:
+                ret = func()
+            elif isinstance(args, (tuple, list, set)):
+                ret = func(*args)
+            elif isinstance(args, dict):
+                ret = func(**args)
+            else:
+                ret = func(args)
+
+            if isinstance(ret, _Task):
+                defer_func = ret.callback # ret.callback holds defer function.
+                ret.future = future # use original Completion
+                ret.callback = self.callback # use original callback
+                ret.result_q = self.result_q # use original result queue
+                defer_func(ret)
+                return
+
+            if self.callback is not None:
+                self.callback(ret)
+        except Exception as e:
+            ret = None
+            future._exception = e
+            future._traceback_str = traceback.format_exc()
+
+        # Set call done only after callback has completed.  This is necessary
+        # for code waiting on a Completion to be able to reliably see an
+        # exception from a callback.
+        future._rsp = ret
+        future._call_done.set()
+
+        if self.result_q:
+            # If a result queue specified, then put finished Completion object
+            # onto result queue.
+            try:
+                self.result_q.put(future)
+            except queue.Full:
+                pass

asyncthreads/reactor.py

 import traceback
 import heapq
 import time
-from collections import Callable
 try:
     import queue
 except ImportError:
     import Queue as queue
 
-
-class Completion(object):
-
-    """
-    Completion represents the result of an asynchronous computation.
-
-    A Completion object is returned by a Reactor's call_() methods.  It holds a
-    value that may become available at some point.  The Completion class is
-    equivalent to the Future class offered by other concurrency frameworks.
-
-    The Completion object allows the caller to wait for and retrieve the result
-    of a function called by the Reactor.  This not only includes any return
-    from the called function, but any exception as well.
-
-    A Completion object is also used to identify a scheduled function to
-    cancel, and it can be used to check if a scheduled call has been canceled.
-
-    """
-
-    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id',
-             '_result_q')
-    def __init__(self, result_q):
-        self._call_done = threading.Event()
-        self._rsp = None
-        self._exception = None
-        self._traceback_str = None
-
-        # For canceling scheduled tasks.
-        self._task_id = None
-
-        # Optional queue to put this Completion on when completed.
-        self._result_q = result_q
-
-    def result(self, timeout=None):
-        """Return the value returned by the call.
-
-        If the call has not yet completed then wait up to timeout seconds or
-        until call complete if timeout is None.  It timeout occurs, then raise
-        a RuntimeError exception.  If canceled before completion, then a
-        ThreadError exception is raised.
-
-        If the call raised an exception, then this methods returns the same
-        exception.
-
-        """
-        if self._call_done.wait(timeout):
-            if self._exception:
-                raise self._exception
-
-            return self._rsp
-
-        if self.canceled():
-            # Canceled before completed.
-            raise threading.ThreadError('call canceled before completion')
-
-        # Timeout waiting for completion.
-        raise RuntimeError('timeout waiting for result')
-
-    def done(self):
-        """Return True if call completed or canceled."""
-        return self._call_done.is_set()
-
-    def traceback(self):
-        """Return the traceback string from exception in call."""
-        return self._traceback_str
-
-    def canceled(self):
-        return self._task_id is False
-
-
-class _Deferred(object):
-    # Internal object used to transition from execution in main thread to
-    # execution in pool thread.
-    #
-    slots = ('action_tuple',)
-    def __init__(self, func, args=None, callback=None):
-        self.action_tuple = (func, args, callback)
+from asyncthreads import completion
 
 
 class Reactor(threading.Thread):
 
         """
         assert(self.is_alive()), 'reactor is not started'
-        assert(isinstance(func, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)),'callback is not callable'
-        assert(result_q is None or
-               isinstance(result_q, queue.Queue)), 'result_q is not Queue'
-        future = Completion(result_q)
-        self._call_queue.put((func, args, future, callback, None, None))
+        future = completion.Completion()
+        self._call_queue.put(
+            (completion._Task(future, func, args, callback, result_q),
+             None, None))
         return future
 
     def call_later(self, delay, func, args=None, callback=None, result_q=None):
         Completion object.
 
         """
-        assert(self.is_alive()), 'reactor is not started'
-        assert(isinstance(func, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)), 'callback is not callable'
         if delay:
             when = time.time() + delay
         else:
 
         """
         assert(self.is_alive()), 'reactor is not started'
-        assert(isinstance(func, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)), 'callback is not callable'
-        future = Completion(result_q)
-        self._call_queue.put((func, args, future, callback, when, None))
+        future = completion.Completion()
+        self._call_queue.put(
+            (completion._Task(future, func, args, callback, result_q),
+            when, None))
         return future
 
     def call_in_thread(self, func, args=None, callback=None, result_q=None):
 
         """
         assert(self.is_alive()), 'reactor is not started'
-        assert(isinstance(func, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)), 'callback is not callable'
-        future = Completion(result_q)
-        args = (func, args, future, callback)
+        future = completion.Completion()
+        task = completion._Task(future, func, args, callback, result_q)
         if self._thread_pool:
-            self._thread_pool.submit(self._thread_wrapper, args)
+            self._thread_pool.submit_task(task)
         else:
-            th = threading.Thread(target=self._thread_wrapper, args=args)
+            th = threading.Thread(target=task.execute)
             th.daemon = True
             th.start()
         return future
 
         """
         assert(self.is_alive()), 'reactor is not started'
-        assert(isinstance(func, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)), 'callback is not callable'
         if time_until_call:
             action_time = time.time() + time_until_call
         else:
             action_time = 0
 
-        future = Completion(result_q)
-        self._call_queue.put((func, args, future, callback, action_time, True))
+        future = completion.Completion()
+        self._call_queue.put(
+            (completion._Task(future, func, args, callback, result_q),
+             action_time, True))
         return future
 
     def cancel_scheduled(self, future):
 
         """
         if not self.is_alive():
-            #print "thread is not started"
             return False
 
         self._call_queue.put(None)
         """Return True is no items in the message queue."""
         return self._call_queue.empty()
 
-    def defer_to_thread(self, func, args=None, callback=None):
-        """
-        Defer execution from main thread to pooled thread, and use the caller's
-        Completion object to return the result of the additional execution.
+    def defer_to_thread(self, func, args=None):
+        """Defer execution from main thread to pooled thread.
+
+        Allows execution to continue on a separate threads while using the
+        caller's Completion object to return the result of the additional
+        execution.  The original callback and result queue, if previously
+        provided, are also used.
 
         Arguments:
         func     -- Function to execute in pooled thread.
         args     -- Arguments to pass to func.
-        callback -- Optional callback that takes return from func as argument.
 
         Returns:
         Special object recognized by Reactor to set up deferred call.
 
         """
-        return _Deferred(func, args, callback)
+        return completion._Task(None, func, args, self._deferred_thread)
+
+    def defer(self, func, args=None):
+        """Defer execution to process other tasks on main thread.
+
+        Allows execution to continue as separate task while using the caller's
+        original Completion object to return the result of the additional
+        execution.  The original callback and result queue, if previously
+        provided, are also used.
+
+        Arguments:
+        func     -- Function to execute in separate call on main thread.
+        args     -- Arguments to pass to func.
+
+        Returns:
+        Special object recognized by Reactor to set up deferred call.
+
+        """
+        return completion._Task(None, func, args, self._deferred_call)
 
     #==========================================================================
     # Private methods follow:
     #==========================================================================
 
-    def _thread_wrapper(self, func, args, future, callback):
-        try:
-            if args is None:
-                rsp = func()
-            elif isinstance(args, (tuple, list, set)):
-                rsp = func(*args)
-            elif isinstance(args, dict):
-                rsp = func(**args)
-            else:
-                rsp = func(args)
-        except Exception as e:
-            rsp = None
-            future._exception = e
-            future._traceback_str = traceback.format_exc()
+    def _deferred_thread(self, task):
+        if self._thread_pool:
+            self._thread_pool.submit_task(task)
+        else:
+            th = threading.Thread(target=task.execute)
+            th.daemon = True
+            th.start()
 
-        if isinstance(rsp, _Deferred):
-            func, args, new_callback = rsp.action_tuple
-            if new_callback is not None:
-                callback = new_callback
-            if self._thread_pool:
-                self._thread_pool.submit(
-                    self._thread_wrapper, (func, args, future, callback))
-            else:
-                th = threading.Thread(
-                    target=self._thread_wrapper,
-                    args=(func, args, future, callback))
-                th.daemon = True
-                th.start()
-            return
-
-        if callback:
-            try:
-                callback(rsp)
-            except Exception as e:
-                rsp = None
-                future._exception = e
-                future._traceback_str = traceback.format_exc()
-
-        # Set call done only after callback has completed.  This is necessary
-        # for code waiting on a Completion to be able to reliably see an
-        # exception from a callback.
-        future._rsp = rsp
-        future._call_done.set()
-
-        if future._result_q:
-            result_q = future._result_q
-            future._result_q = None
-            try:
-                result_q.put(future)
-            except queue.Full:
-                pass
+    def _deferred_call(self, task):
+        self._call_queue.put((task, None, None))
 
     def _process_scheduled_queue(self):
         """Process all scheduled tasks that are currently due.
         if task_data is None:
             return
 
-        def do_sched_task(func, args, future, callback, call_at, in_thread):
+        def do_sched_task(task, _, in_thread):
             if in_thread:
                 if self._thread_pool is not None:
                     # Call thread pool to execute scheduled function.
-                    self._thread_pool.submit(
-                        self._thread_wrapper, (func, args, future, callback))
+                    self._thread_pool.submit_task(task)
                 else:
                     # Create new thread to execute scheduled function.
-                    th = threading.Thread(
-                        target=self._thread_wrapper,
-                        args=(func, args, future, callback))
+                    th = threading.Thread(target=task.execute)
                     th.daemon = True
                     th.start()
             else:
-                #print '===> calling scheduled function'
-                self._thread_wrapper(func, args, future, callback)
+                #print('===> calling scheduled function')
+                task.execute()
 
         do_sched_task(*task_data)
         if task_heap:
         for idx, heap_item in enumerate(task_heap):
             task_time, heap_task_id, task_data = heap_item
             if heap_task_id == task_id:
-                func, args, future, callback, call_at, in_thread = task_data
+                task, call_at, in_thread = task_data
+                future = task.future
                 future._rsp = None
                 future._task_id = False
                 future._call_done.set()
                     sleep_time = action_time - now
 
             try:
-                #print '===> sleeping for %s seconds' % (sleep_time,)
+                #print('===> sleeping for %s seconds' % (sleep_time,))
                 # Get the next function from the call queue.
                 work_item = call_queue.get(True, sleep_time)
                 if work_item is None:
                     break
-                func, args, future, callback, call_at, in_thread = work_item
+                task, call_at, in_thread = work_item
 
                 # If this is a new scheduled task.
                 if call_at:
-                    future._task_id = next_task_id
+                    task.future._task_id = next_task_id
                     heapq.heappush(task_heap,
                                    (call_at, next_task_id, work_item))
                     next_task_id += 1
                 else:
-                    self._thread_wrapper(func, args, future, callback)
+                    task.execute()
+
+                del task
 
             except queue.Empty:
                 # Woke up because it is time do next scheduled task.
-                #print '===> woke up to do next scheduled task'
+                #print('===> woke up to do next scheduled task')
                 self._process_scheduled_queue()
                 continue
 

asyncthreads/threadpool.py

 __email__ = "gillis.andrewj@gmail.com"
 
 import threading
-from collections import Callable
 try:
     import queue
 except ImportError:
     import Queue as queue
 
+from asyncthreads import completion
+
 
 class ThreadPool(object):
 
 
     """
 
-    def __init__(self, min_size, max_size=None, queue_size=None):
+    def __init__(self, min_size, max_size=None, queue_size=0):
         """Initialize the thread pool and start the minimum number of threads.
 
         If max_size is specified and is larger than min_size, then the pool
         Arguments:
         min_size    -- Minimum number of threads to have in pool.
         max_size    -- Optional.  Maximum size pool can grow to.
-        queue_size  -- Maximum items allowed in task queue.  None means no
-                       limit.
+        queue_size  -- Maximum items allowed in task queue.   If queue_size is
+                       less than or equal to zero, the queue size is infinite.
 
         """
         assert(isinstance(min_size, int)), 'min_size must be int'
-        assert(max_size is None or
-               isinstance(max_size, int)), 'max_size must be int'
+        assert(isinstance(max_size, int)), 'max_size must be int'
 
         self._shutdown = False
         self._threads = 0
-        self._task_queue = queue.Queue()
+        self._task_queue = queue.Queue(queue_size)
         self._resize_lock = threading.Lock()
         self._idle_counter = ThreadCounter()
 
             # Pool is manually resizeable only.
             self._max_size = min_size
 
-    def submit(self, function, args=None, callback=None):
+    def submit(self, function, args=None, callback=None, result_q=None):
         """Insert a task function into the queue.
 
+        Arguments:
+        function -- Function to execute.
+        args     -- Argument tuple to pass to function.
+        callback -- Optional callback that takes return from func as argument.
+        result_q -- Optional response queue to place Completion on.
+
+        Return:
+        Completion object to wait on and retrieve result of function.
+
         """
-        assert(isinstance(function, Callable)), 'function is not callable'
-        assert(callback is None or
-               isinstance(callback, Callable)), 'callback is not callable'
+        future = completion.Completion()
+        self.submit_task(
+            completion._Task(future, function, args, callback, result_q))
+        return future
 
+    def submit_task(self, task):
+        """Insert a Task object into the queue.
+
+        Useful for submitting task when Task object already exists.
+
+        """
         with self._resize_lock:
             if self._shutdown:
                 raise RuntimeError('cannot submit tasks after shutdown')
 
-            self._task_queue.put((function, args, callback), False)
+            self._task_queue.put(task, False)
             self._adjust_thread_count()
 
     def threads(self):
         return self._threads
 
     def idle_threads(self):
+        """Return the current number of idle threads."""
         return int(self._idle_counter)
 
     def qsize(self):
     def shutdown(self, wait_for_threads=True, drop_queued_tasks=False):
         """Shutdown the thread pool.
 
-        Optionally wait for worker threads to exit, and optionally drop and
+        Optionally wait for worker threads to exit, and optionally drop any
         queued tasks.
 
         Arguments:
             break
 
         # Execute task.
-        func, args, callback = task
-        try:
-            if args is None:
-                ret = func()
-            elif isinstance(args, (tuple, list, set)):
-                ret = func(*args)
-            elif isinstance(args, dict):
-                ret = func(**args)
-            else:
-                ret = func(args)
-
-            if callback is not None:
-                callback(ret)
-        except Exception as e:
-            print('ThreadPool task raised exception: '+str(e))
+        task.execute()
 
         del task
         task_queue.task_done()
 def main():
     setup(
         name='asyncthreads',
-        version= '1.4.4',
+        version= '1.5.0',
         author='Andrew Gillis',
         author_email='gillis.andrewj@gmail.com',
         url='http://bitbucket.org/agillis/asyncthreads',

test/test_reactor.py

         cls.called = False
         cls.cb_event = threading.Event()
         cls.cb_rsp = None
-        cls.new_cb_called = False
         cls.defer_called = False
 
     def _func1(self):
         TestReactor.cb_rsp = rsp
         TestReactor.cb_event.set()
 
-    def _new_cb(self, rsp):
-        TestReactor.new_cb_called = True
-
     def _func_error(self):
         TestReactor.called = True
         raise Exception('some big problem')
         d = self.rktr.defer_to_thread(self._func2, (arg1, arg2))
         return d
 
-    def _defer2_func2(self, arg1, arg2):
-        TestReactor.defer_called = True
-        d = self.rktr.defer_to_thread(self._func2, (arg1, arg2),
-                                      self._new_cb)
-        return d
-
     def reset_called(self):
         TestReactor.called = False
         TestReactor.cb_event.clear()
         TestReactor.cb_rsp = None
         TestReactor.defer_called = False
-        TestReactor.new_cb_called = False
 
     def test_start(self):
         print('Starting reactor')
         assert ret == 'hello-world'
         assert self.cb_rsp == 'hello-world'
 
-        print 'Threads in pool:', TestReactor.tp.threads()
-        assert TestReactor.tp.thread_count() > 0
+        print('Threads in pool: %s' % TestReactor.tp.threads())
+        assert TestReactor.tp.threads() > 0
 
     def test_call_scheduled(self):
         """Run scheduled in main thread"""
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-    def test_defer_call_with_new_cb(self):
-        print('Calling func on reactor thread that defers processing to '\
-              'thread and sets new callback')
-        self.reset_called()
-        result = self.rktr.call(self._defer2_func2, ('hello', 'world'))
-        ret = result.result()
-        assert self.called, 'called should be True'
-        assert self.new_cb_called,\
-               'new callback not called by deferred thread'
-        assert ret == 'hello-world'
-
     def test_result_queue(self):
         print('Calling 3 functions in pooled thread, with delay of 1, 2, and '\
               '3 seconds for each thread, and waiting for all results on '\