Commits

Anonymous committed 9c233d4

- Result is not available until callback completes. This allows caller waiting on result to reliably handle exception in callback.
- Callback function takes return from called function as argument.
- Fix python3 compatibility issue on import.
- Add unit tests to test exception in callback.

Comments (0)

Files changed (5)

-recursive-include examples *.txt *.py
+recursive-include examples *.py
 Installation
 ============
 
-The asyncthreads package is installed from source using distutils in the usual way.  Download the `source distribution <http://pypi.python.org/pypi/asyncthreads>`_ first.  Un-tar the source tarball and run the following to install the package site-wide:
+**Using pip**
 
-``python setup.py install``
+    Make sure python-pip is installed on you system.  If you are using virtualenv, then pip is alredy installed into environments created by vertualenv.  Run pip to install asyncthreads:
+
+    ``pip install asyncthreads``
+
+**From Source**
+
+    The asyncthreads package is installed from source using distutils in the usual way.  Download the `source distribution <http://pypi.python.org/pypi/asyncthreads>`_ first.  Un-tar the source tarball and run the following to install the package site-wide:
+
+    ``python setup.py install``
 
 Usage
 =====

asyncthreads/reactor.py

 except ImportError:
     import Queue as queue
 
-import threadpool
+import asyncthreads.threadpool
 
 
 class Result(object):
         Arguments:
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
-        callback -- Optional callback that takes a Result as its argument.
+        callback -- Optional callback that takes return from func as argument.
         result_q -- Optional response queue to place complete Result on.
 
         Return:
         time_until_call -- Seconds remaining until call.
         func            -- Function to execute.
         args            -- Argument tuple to pass to function.
-        callback        -- Optional callback that takes a Result as its
+        callback        -- Optional callback that takes return from func as
                            argument.
         result_q        -- Optional response queue to place complete Result on.
 
         Arguments:
         func     -- Function to execute.
         args     -- Argument tuple to pass to function.
-        callback -- Optional callback that takes a Result as its argument.
+        callback -- Optional callback that takes return from func as argument.
         result_q -- Optional response queue to place complete Result on.
 
         Return:
         time_until_call -- Seconds remaining until call.
         func            -- Function to execute.
         args            -- Argument tuple to pass to function.
-        callback        -- Optional callback that takes a Result as its
+        callback        -- Optional callback that takes return from func as
                            argument.
         result_q        -- Optional response queue to place complete Result on.
 
         Arguments:
         func     -- Function to execute in pooled thread.
         args     -- Arguments to pass to func.
-        callback -- Optional function to call with the Results of calling func.
+        callback -- Optional callback that takes return from func as argument.
 
         Returns:
         Special object recognized by Reactor to set up deferred call.
                 th.start()
             return
 
-        result._rsp = rsp
-        result._call_done.set()
         if callback:
             try:
-                callback(result)
+                callback(rsp)
             except Exception as e:
                 rsp = None
                 result._exception = e
                 result._traceback_str = traceback.format_exc()
+
+        # Set call done only after callback has completed.  This is necessary
+        # for code waiting on a result to be able to reliably see an exception
+        # from a callback.
+        result._rsp = rsp
+        result._call_done.set()
+
         if result._result_q:
             result_q = result._result_q
             result._result_q = None

test/test_reactor.py

 
     @classmethod
     def setup_class(cls):
-        gc.set_debug(
-            gc.DEBUG_UNCOLLECTABLE|gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
+        gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
         cls.rktr = Reactor(0)
         cls.called = False
         TestReactor.called = True
         return '-'.join([arg1, arg2])
 
-    def _callback(self, result):
-        TestReactor.cb_rsp = result.get_result()
+    def _callback(self, rsp):
+        TestReactor.cb_rsp = rsp
         TestReactor.cb_event.set()
 
-    def _new_cb(self, result):
+    def _new_cb(self, rsp):
         TestReactor.new_cb_called = True
 
     def _func_error(self):
         TestReactor.called = True
         raise Exception('some big problem')
 
-    def _callback_error(self, result):
-        TestReactor.cb_event.set()
+    def _callback_error(self, rsp):
         raise Exception('problem in callback')
 
     def _defer_func2(self, arg1, arg2):
         TestReactor.new_cb_called = False
 
     def test_start(self):
-        print 'Starting reactor'
+        print('Starting reactor')
         assert self.rktr is not None
         assert not self.rktr.is_alive()
         time.sleep(0)
         self.rktr.start()
         assert self.rktr.is_alive(), 'reactor should be alive'
-        print 'started'
+        print('started')
 
 
     def test_call(self):
         """Run in main thread"""
-        print 'Calling function with no args in main thread'
+        print('Calling function with no args in main thread')
         self.reset_called()
         result = self.rktr.call(self._func1)
         ret = result.get_result()
         assert self.called, 'called should be True'
         assert ret == 'abc', 'wrong result'
 
-        print 'Calling function in main thread'
+        print('Calling function in main thread')
         self.reset_called()
         result = self.rktr.call(self._func2, ('hello', 'world'))
         ret = result.get_result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-        print 'Calling function in main thread with callback'
+        print('Calling function in main thread with callback')
         self.reset_called()
         result = self.rktr.call(self._func2, ('hello', 'world'),
                                 self._callback)
 
     def test_call_in_thread(self):
         """Run in other thread"""
-        print 'Calling function in other thread'
+        print('Calling function in other thread')
         self.reset_called()
         result = self.rktr.call_in_thread(self._func2, ('hello', 'world'))
         ret = result.get_result()
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-        print 'Calling function in other thread with callback'
+        print('Calling function in other thread with callback')
         self.reset_called()
         result = self.rktr.call_in_thread(self._func2, ('hello', 'world'),
                                           self._callback)
 
     def test_call_scheduled(self):
         """Run scheduled in main thread"""
-        print 'Calling function in main thread in 3 seconds'
+        print('Calling function in main thread in 3 seconds')
         self.reset_called()
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
 
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-        print 'Calling function in main thread with callback in 3 seconds'
+        print('Calling function in main thread with callback in 3 seconds')
         self.reset_called()
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'),
                                       self._callback)
 
     def test_call_in_thread_scheduled(self):
         """Run in other thread"""
-        print 'Calling function in other thread in 3 seconds'
+        print('Calling function in other thread in 3 seconds')
         self.reset_called()
         result = self.rktr.call_in_thread_later(3, self._func2,
                                                 ('hello', 'world'))
         assert self.called, 'called should be True'
         assert ret == 'hello-world'
 
-        print 'Calling function in other thread with callback in 3 seconds'
+        print('Calling function in other thread with callback in 3 seconds')
         self.reset_called()
         result = self.rktr.call_in_thread_later(
             3, self._func2, ('hello', 'world'), self._callback)
     def test_cancel_scheduled(self):
         """Cancel scheduled call"""
         self.reset_called()
-        print 'Calling function in 3 seconds'
+        print('Calling function in 3 seconds')
         result = self.rktr.call_later(3, self._func2, ('hello', 'world'))
         assert not result.has_result(), 'should not have result yet'
 
 
 
     def test_exception_in_call(self):
-        print 'Calling function in main thread that raises exception'
+        print('Calling function in main thread that raises exception')
         self.reset_called()
         result = self.rktr.call(self._func_error)
         # Check for expected excption
         ret = None
         with pytest.raises(Exception) as ex:
             ret = result.get_result()
-            assert str(ex) == 'some big problem'
+        assert str(ex).endswith('some big problem')
         assert self.called, 'called should be True'
         assert ret is None, 'return should be None'
         assert len(result.get_traceback()) > 0, 'should be traceback info'
 
-        print 'Calling function in other thread that raises exception'
+        print('Calling function in other thread that raises exception')
         self.reset_called()
         result = self.rktr.call_in_thread(self._func_error)
 
         ret = None
         with pytest.raises(Exception) as ex:
             ret = result.get_result()
-            assert str(ex) == 'some big problem'
+        assert str(ex).endswith('some big problem')
         assert self.called, 'called should be True'
         assert ret is None, 'return should be None'
         assert len(result.get_traceback()) > 0, 'should be traceback info'
 
 
     def test_exception_in_callback(self):
-        print 'Calling function in main thread that raises exception'
+        print('Calling function in main thread callback that raises exception')
         self.reset_called()
         result = self.rktr.call(self._func1, (), self._callback_error)
-        assert self.cb_event.wait(3), 'cb_event should be set'
         with pytest.raises(Exception) as ex:
             result.get_result()
-            assert str(ex) == 'problem in callback'
+        assert len(result.get_traceback()) > 0, 'should be traceback info'
+        assert str(ex).endswith('problem in callback')
+        assert self.called, 'called should be True'
+
+        print('Calling function in other thread callback that raises ',
+              'exception')
+        self.reset_called()
+        result = self.rktr.call_in_thread(self._func1,(),self._callback_error)
+        with pytest.raises(Exception) as ex:
+            result.get_result()
+        assert str(ex).endswith('problem in callback')
         assert self.called, 'called should be True'
         assert len(result.get_traceback()) > 0, 'should be traceback info'
 
 
     def test_defer_call(self):
-        print 'Calling func on reactor thread that defers processing to thread'
+        print('Calling func on reactor thread that defers processing to thread')
         self.reset_called()
         result = self.rktr.call(self._defer_func2, ('hello', 'world'))
         ret = result.get_result()
 
 
     def test_defer_call_with_new_cb(self):
-        print 'Calling func on reactor thread that defers processing to '\
-              'thread and sets new callback'
+        print('Calling func on reactor thread that defers processing to '\
+              'thread and sets new callback')
         self.reset_called()
         result = self.rktr.call(self._defer2_func2, ('hello', 'world'))
         ret = result.get_result()
 
 
     def test_result_queue(self):
-        print 'Calling 3 functions in pooled thread, with delay of 1, 2, and '\
+        print('Calling 3 functions in pooled thread, with delay of 1, 2, and '\
               '3 seconds for each thread, and waiting for all results on '\
-              'same result queue.'
+              'same result queue.')
         self.reset_called()
         rq = queue.Queue()
+        # Enqueue out of order and make sure they are called in correct order.
+        self.rktr.call_in_thread_later(3, self._func2, ('delay', 'three'),
+                                       None, rq)
         self.rktr.call_in_thread_later(1, self._func2, ('delay', 'one'), None,
                                        rq)
         self.rktr.call_in_thread_later(2, self._func2, ('delay', 'two'), None,
                                        rq)
-        self.rktr.call_in_thread_later(3, self._func2, ('delay', 'three'),
-                                       None, rq)
         count = 0
         while count < 3:
             r = rq.get(True, 5)
             count += 1
             ret = r.get_result()
-            print 'got result:', ret
+            print('got result:', ret)
             if count == 1:
                 assert ret == 'delay-one'
             elif count == 2:
 
 
     def test_shutdown(self):
-        print 'Shutting down reactor'
+        print('Shutting down reactor')
         self.rktr.shutdown()
         assert not self.rktr.is_alive(), 'reactor should not be alive'
 

test/test_threadpool.py

 # Sample task 1: given a start and end value, shuffle integers,
 # then sort them
 def sort_task(first, last):
-    print "SortTask starting for ", (first, last)
-    numbers = range(first, last)
+    print("SortTask starting for ", (first, last))
+    numbers = list(range(first, last))
     for a in numbers:
         rnd = randrange(0, len(numbers) - 1)
         a, numbers[rnd] = numbers[rnd], a
-    print "SortTask sorting for ", (first, last)
+    print("SortTask sorting for ", (first, last))
     numbers.sort()
-    print "SortTask done for ", (first, last)
+    print("SortTask done for ", (first, last))
     return ("Sorter ", (first, last))
 
 # Sample task 2: just sleep for a number of seconds.
 def wait_task(data):
-    print "WaitTask starting for ", data
-    print "WaitTask sleeping for %d seconds" % data
+    print("WaitTask starting for ", data)
+    print("WaitTask sleeping for %d seconds" % data)
     time.sleep(data)
     return "Waiter", data
 
 # Both tasks use the same callback
 def task_callback(data):
-    print "Callback called for", data
+    print("Callback called for", data)
 
 
 class TestThreadPool(object):
 
     @classmethod
     def setup_class(cls):
-        gc.set_debug(
-            gc.DEBUG_UNCOLLECTABLE|gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
+        gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
         cls.pool = ThreadPool(20, 256)
 
         self.pool.queue_task(wait_task, 15, task_callback)
         self.pool.queue_task(wait_task, 15, task_callback)
         # Insert tasks into the queue and let them run
-        print 'start'
+        print('start')
         self.pool.queue_task(sort_task, (1000, 100000), task_callback)
         self.pool.queue_task(wait_task, 5, task_callback)
         self.pool.queue_task(sort_task, (200, 200000), task_callback)
         self.pool.queue_task(sort_task, (205, 200005), task_callback)
 
         for _ in range(3):
-            print '---> queued items:', self.pool.get_queue_size()
+            print('---> queued items:', self.pool.get_queue_size())
             time.sleep(2)
 
         self.pool.resize_pool(80)
         assert self.pool.get_pool_size() == 80
 
         while self.pool.get_queue_size():
-            print '---> queued items:', self.pool.get_queue_size()
+            print('---> queued items:', self.pool.get_queue_size())
             time.sleep(5)
 
-        print 'finish'
+        print('finish')
 
 
     def test_shutdown(self):
         # When all tasks are finished, allow the threads to terminate
-        print 'Shutting down thread pool'
+        print('Shutting down thread pool')
         self.pool.shutdown(False, False)
 
-        print '---> queued items:', self.pool.get_queue_size()
+        print('---> queued items:', self.pool.get_queue_size())
         while self.pool.get_queue_size():
-            print '---> queued items:', self.pool.get_queue_size()
+            print('---> queued items:', self.pool.get_queue_size())
             time.sleep(5)