Asynchronous Threads Utility Module
The asyncthreads module provides common threading design patterns and utilities needed for asynchronous and multithreaded programming. This includes a thread pool and a reactor, which together combine to create a highly reliable concurrent event processing system.
The ThreadPool maintains a pool of threads to be reused for processing work, to avoid the overhead of repeatedly creating new threads. The Reactor implements the reactor pattern and is used to serially process tasks in an asynchronous thread, dispatch tasks to concurrent threads, or a combination of both.
One Tool for Many Applications
In addition to providing serial and concurrent task processing, the reactor also provides scheduled tasks. Scheduled tasks are tasks that are executed at a specific time. A task that is initially processed serially can also be transferred to a concurrent thread.
Callers can wait for a single task to complete by waiting on a Result object to be ready. Callers can wait for multiple tasks to complete by waiting for Result objects to be placed in a specified result queue. Callers can optionally specify callback functions to be called after a task has been executed.
These features allow the same Reactor to provide:
- Serialized execution
- Concurrent execution
- Scheduled tasks (timer events)
- Periodic tasks
- Serial-to-concurrent transition
- Task callbacks
- Event-based, queue-based, and combined task completion notification.
No more need to implement separate code for handling these situations. The same reactor instance handles all of these.
Combining Reactor and ThreadPool
The Reactor is used to process events (queued function calls) serially. Events are processed in the order queued and no event is processed until the previous one completes. Callbacks may be registered to handle the completion of event processing.
Since the reactor blocks on any operation, it is necessary to ensure that the processing of events and their callbacks executes quickly and return control back to the reactor so that is can continue processing events. Threads can be used to accomplish this, but creating and starting a thread takes time. Using a thread pool avoids the overhead of thread creation, and executes additional processing and callbacks asynchronously without blocking the Reactor.
Combining a thread pool with a Reactor results in an elegant solution to schedule and process work, serially where protection to shared resources is required and concurrently where work can be done in parallel.
ThreadPool, implemented in threadpool.py, is a re-sizable thread pool.
ThreadPool maintains a pool of threads that execute tasks given to the
queue_task() method. The number of worker threads can be changed by calling the
When performing concurrent operations or interacting with a blocking API, using multiple threads is useful. However, there is overhead associated with the creation of new threads. If new threads are used frequently to do work, this overhead can become very expensive.
Using a managed thread pool allows the use of multiple threads while avoiding the overhead of creating new threads every time a worker thread is needed.
When initializing a ThreadPool, the number of threads and the maximum size of the task queue are given. It is legal to initialize the ThreadPool with zero threads, but no tasks will be executed until the pool is resized to have one or more threads.
Reactor, implemented in reactor.py, is the reactor pattern to call queued functions in a separate thread. The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers. Asynchronous serial processing allows for execution of code that would other require thread locks to protect shared resources.
Event handling functions can be queued to be called by the Reactor main thread, or by a thread in the Reactor's thread pool. Functions can be be queued to be called immediately (as soon as calling thread is available), or queued to be called at a specified later time.
The Reactor uses the ThreadPool class to implement its thread pool.
Thread-safe code execution
Using the Reactor's main loop to execute code can provide thread safety and efficiency.
Concurrent threads may need to perform some operations in a thread-safe manner, such as writing data to a transport. This can be done by acquiring and releasing a lock around every write, or by scheduling writes to be done in the reactor main thread. The second choice is often preferable as it can simplify code by eliminating the need to manage thread locks. It can also free up the concurrent threads to do more work, which would otherwise be waiting on the thread lock for another thread to finish writing.
To call functions from the main Reactor thread, use the
Running code in threads
The Reactor allows code to be run in separate concurrent threads. The methods
call_later_in_thread() queue functions to be run by the Reactor's thread pool.
The caller can wait on the response using the returned Result object. A thread from the Reactor's internal thread pool is used to avoid the overhead of creating a new thread.
The Result object
All of the Reactor's
call_() methods return a Result object. This means that that all functions executed by the Reactor have an associated Result. The Result object allows the caller to wait for and retrieve the result of a function called by the Reactor. This not only includes any return from the called function, but any exception as well.
Calling the Result object's
get_result() method will return whatever the called function returned, will raise an ``Exception`` if the called function raised an ``Exception``, or will wait until the scheduled function has been called.
A Result object is used to identify a scheduled function to cancel. It can also be used to check if a scheduled call has been canceled.
Waiting for Results
After telling the reactor to execute a function, the caller may need to wait for the function to complete. There are a two ways to do this:
- Call the Result object's
- Wait on a Queue object that was passed to the Reactor's
Calling a Result object's
get_result() method waits on that Result's internal Event object to be set before returning the result. This allows the caller to wait on a specific Result object that was returned from the Reactor's
If the caller wants to wait for any of number of reactor-executed functions to complete, then the caller passes the same Queue object to the Reactor's
call_() method invocations. The caller can then wait on the Queue. When a function executed by one of the Reactor's
call_() methods completes, the Reactor places the associated Result object onto the Queue, and then the caller retrieves it.
Canceling Scheduled Execution
Calling the Reactor methods
call_in_thread_later() schedules a function to be executed at a later time by the Reactor's main thread, or by one of the Reactor's thread pool threads respectively. If the code has not yet been executed, its scheduled execution can be canceled by calling the ``cancel_scheduled()`` method and passing it the Result object returned by
cancel_scheduled() method returns
True if the scheduled call was successfully canceled and
False if scheduled call was not found (already executed) or already canceled.
Defer additional execution from main thread to pooled thread
Since the reactor main thread blocks on any operation, it is necessary to ensure that callbacks execute quickly and return control back to the reactor to continue processing events. The Reactor's thread pool avoids the overhead of thread creation and executes additional processing and callbacks asynchronously without blocking the Reactor.
When using a thread to execute additional processing and callbacks, the results of the additional processing must be retrieved using the original Result object. This is the purpose of deferred processing.
To use deferred processing, the function executing in the main thread calls
defer_to_thread() , returning the object returned by
defer_to_thread() . The reactor recognizes the special return value, reuses the original Result object, and queues the given task to a pooled thread.
The caller's Result object is reused to receive the result of calling the specified deferred function.