Commits

Anonymous committed 5d85e35

In reactor call_ methods, optionally specify a queue to put Result objects one when the associated function has completed.

  • Participants
  • Parent commits ef4cfd2

Comments (0)

Files changed (3)

File asyncthreads/reactor.py

 
     """
 
-    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id')
-    def __init__(self):
+    slots = ('_call_done', '_rsp', '_exception', '_traceback_str', '_task_id',
+             '_result_q')
+    def __init__(self, result_q):
         self._call_done = threading.Event()
         self._rsp = None
         self._exception = None
         # For canceling scheduled tasks.
         self._task_id = None
 
+        # Optional queue to put this Result on when completed.
+        self._result_q = result_q
+
 
     def get_result(self, timeout=None):
         if self._call_done.wait(timeout):
         self._task_heap = []
 
 
-    def call(self, func, args=None, callback=None):
+    def call(self, func, args=None, callback=None, result_q=None):
         """
         Enqueue a function for the reactor to run in its main thread.
 
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
         callback -- Optional callback that takes a Result as its argument.
+        result_q -- Optional response queue to place complete Result on.
 
         Return:
         Result object.
         assert(isinstance(func, Callable)), 'function is not callable'
         assert(callback is None or
                isinstance(callback, Callable)),'callback is not callable'
-        result = Result()
+        assert(result_q is None or
+               isinstance(result_q, queue.Queue)), 'result_q is not Queue'
+        result = Result(result_q)
         self._call_queue.put((func, args, result, callback))
         return result
 
 
-    def call_later(self, time_until_call, func, args=None, callback=None):
+    def call_later(self, time_until_call, func, args=None, callback=None,
+                   result_q=None):
         """
         Run a function in the reactor's main thread at a later time.
 
         args            -- Argument tuple to pass to function.
         callback        -- Optional callback that takes a Result as its
                            argument.
+        result_q        -- Optional response queue to place complete Result on.
 
         Return:
         Result object.
         else:
             action_time = 0
 
-        result = Result()
+        result = Result(result_q)
         d_data = (func, args, result, callback)
         self._call_queue.put((None, d_data, action_time, False))
         return result
 
 
-    def call_in_thread(self, func, args=None, callback=None):
+    def call_in_thread(self, func, args=None, callback=None, result_q=None):
         """
         Run the given function in a separate thread.
 
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
         callback -- Optional callback that takes a Result as its argument.
+        result_q -- Optional response queue to place complete Result on.
 
         Return:
         Result object.
         assert(isinstance(func, Callable)), 'function is not callable'
         assert(callback is None or
                isinstance(callback, Callable)), 'callback is not callable'
-        result = Result()
+        result = Result(result_q)
         if self._thread_pool:
             self._thread_pool.queue_task(self._thread_wrapper,
                                          (func, args, result, callback))
 
 
     def call_in_thread_later(self, time_until_call, func, args=None,
-                             callback=None):
+                             callback=None, result_q=None):
         """
         Run the given function in a separate thread, at a later time.
 
         args            -- Argument tuple to pass to function.
         callback        -- Optional callback that takes a Result as its
                            argument.
+        result_q        -- Optional response queue to place complete Result on.
 
         Return:
         Result object.
         else:
             action_time = 0
 
-        result = Result()
+        result = Result(result_q)
         d_data = (func, args, result, callback)
         self._call_queue.put((None, d_data, action_time, True))
         return result
             if new_callback is not None:
                 callback = new_callback
             if self._thread_pool:
-                self._thread_pool.queue_task(self._thread_wrapper,
-                                             (func, args, result, callback))
+                self._thread_pool.queue_task(
+                    self._thread_wrapper, (func, args, result, callback))
             else:
-                th = threading.Thread(target=self._thread_wrapper,
-                                      args=(func, args, result, callback))
+                th = threading.Thread(
+                    target=self._thread_wrapper,
+                    args=(func, args, result, callback))
                 th.daemon = True
                 th.start()
             return
                 rsp = None
                 result._exception = e
                 result._traceback_str = traceback.format_exc()
+        if result._result_q:
+            result_q = result._result_q
+            result._result_q = None
+            try:
+                result_q.put(result)
+            except queue.Full:
+                pass
 
 
     def _process_scheduled_queue(self):
         if task_data is None:
             return
 
-        func, args, result, callback = task_data
-        if call_in_thread:
-            if self._thread_pool is not None:
-                # Call thread pool to execute scheduled function.
-                self._thread_pool.queue_task(self._thread_wrapper,
-                                             (func, args, result, callback))
+        def do_sched_task(func, args, result, callback):
+            if call_in_thread:
+                if self._thread_pool is not None:
+                    # Call thread pool to execute scheduled function.
+                    self._thread_pool.queue_task(
+                        self._thread_wrapper, (func, args, result, callback))
+                else:
+                    # Create new thread to execute scheduled function.
+                    th = threading.Thread(
+                        target=self._thread_wrapper,
+                        args=(func, args, result, callback))
+                    th.daemon = True
+                    th.start()
             else:
-                th = threading.Thread(target=self._thread_wrapper,
-                                      args=(func, args, result, callback))
-                th.daemon = True
-                th.start()
-        else:
-            #print '===> calling scheduled function'
-            self._thread_wrapper(func, args, result, callback)
+                #print '===> calling scheduled function'
+                self._thread_wrapper(func, args, result, callback)
 
+
+        do_sched_task(*task_data)
         if task_heap:
             now = time.time()
             next_time = task_heap[0][0]
             while task_heap and (not next_time or next_time <= now):
                 task_time, task_id, task_data, call_in_thread = heapq.heappop(
                     task_heap)
-                func, args, result, callback = task_data
-                if call_in_thread:
-                    #print '===> calling thread pool to do scheduled function'
-                    if self._thread_pool is not None:
-                        self._thread_pool.queue_task(
-                            self._thread_wrapper,
-                            (func, args, result, callback))
-                    else:
-                        th = threading.Thread(
-                            target=self._thread_wrapper,
-                            args=(func, args, result, callback))
-                        th.daemon = True
-                        th.start()
-                else:
-                    #print '===> calling scheduled function'
-                    self._thread_wrapper(func, args, result, callback)
+                do_sched_task(*task_data)
                 if task_heap:
                     next_time = task_heap[0][0]
 
 def main():
     setup(
         name='asyncthreads',
-        version= '1.2.3',
+        version= '1.3.0',
         author='Andrew Gillis',
         author_email='gillis.andrewj@gmail.com',
         url='http://bitbucket.org/agillis/asyncthreads',

File test/test_reactor.py

 import gc
 import time
 import threading
+try:
+    import queue
+except ImportError:
+    import Queue as queue
 
 from asyncthreads.reactor import Reactor
 
                'new callback not called by deferred thread'
         assert ret == 'hello-world'
 
+
+    def test_result_queue(self):
+        print 'Calling 3 functions in pooled thread, with delay of 1, 2, and '\
+              '3 seconds for each thread, and waiting for all results on '\
+              'same result queue.'
+        self.reset_called()
+        rq = queue.Queue()
+        self.rktr.call_in_thread_later(1, self._func2, ('delay', 'one'), None,
+                                       rq)
+        self.rktr.call_in_thread_later(2, self._func2, ('delay', 'two'), None,
+                                       rq)
+        self.rktr.call_in_thread_later(3, self._func2, ('delay', 'three'),
+                                       None, rq)
+        count = 0
+        while count < 3:
+            r = rq.get(True, 5)
+            count += 1
+            ret = r.get_result()
+            print 'got result:', ret
+            if count == 1:
+                assert ret == 'delay-one'
+            elif count == 2:
+                assert ret == 'delay-two'
+            else:
+                assert ret == 'delay-three'
+
+
     def test_shutdown(self):
         print 'Shutting down reactor'
         self.rktr.shutdown()