Source

asyncthreads /

Filename Size Date modified Message
asyncthreads
test
74 B
7.4 KB
1.4 KB

Asynchronous Threads Utility Module -- asyncthreads

Overview

The asyncthreads module provides common threading design patterns and utilities needed for asynchronous and multithreaded programming. This includes a thread pool and a reactor, which together combine to create a highly reliable concurrent event processing system.

The ThreadPool maintains a pool of threads to be reused for processing work, to avoid the overhead of repeatedly creating new threads. The Reactor implements the reactor pattern and is used to serially process work in an asynchronous thread.

Combining Reactor and ThreadPool

The Reactor is used to process events (queued function calls) serially. Events are processed in the order queued and no event is precessed until the previous one completes. Callbacks may be registered to handle the completion of event processing.

Since the reactor blocks on any operation, it is necessary to ensure that the processing of events and their callbacks executes quickly and return control back to the reactor so that is can continue processing events. Threads can be used to accomplish this, but creating and starting a thread takes time. Using a thread pool avoids the overhead of thread creation, and executes additional processing and callbacks asynchronously without blocking the Reactor.

Combining a thread pool with a Reactor results in an elegant solution to schedule and process work, serially where protection to shared resources is required and concurrently where work can be done in parallel.

Install

python setup.py install

Dependencies

  • Python 2.7 or greater

Development Tools

  • mercurial
  • py.test
  • pychecker

ThreadPool

ThreadPool, implemented in threadpool.py, is a re-sizable thread pool.

ThreadPool maintains a pool of threads that execute tasks given to the queue_task() method. The number of worker threads can be changed by calling the resize_pool() method.

When performing concurrent operations or interacting with a blocking API, using multiple threads is useful. However, there is overhead associated with the creation of new threads. If new threads are used frequently to do work, this overhead can become very expensive.

Using a managed thread pool allows the use of multiple threads while avoiding the overhead of creating new threads every time a worker thread is needed.

Initialization

When initializing a ThreadPool, the number of threads and the maximum size of the task queue are given. It is legal to initialize the ThreadPool with zero threads, but no tasks will be executed until the pool is resized to have one or more threads.

Reactor

Reactor, implemented in reactor.py, is the reactor pattern to call queued functions in a separate thread. The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers. Asynchronous serial processing allows for execution of code that would other require thread locks to protect shared resources.

Event handling functions can be queued to be called by the Reactor main thread, or by a thread in the Reactor's thread pool. Functions can be be queued to be called immediately (as soon as calling thread is available), or queued to be called at a specified later time.

The Reactor uses the ThreadPool class to implement its thread pool.

Thread-safe code execution

Using the Reactor's main loop to execute code can provide thread safety and efficiency.

Concurrent threads may need to perform some operations in a thread-safe manner, such as writing data to a transport. This can be done by acquiring and releasing a lock around every write, or by scheduling writes to be done in the reactor main thread. The second choice is often preferable as it can simplify code by eliminating the need to manage thread locks. It can also free up the concurrent threads to do more work, which would otherwise be waiting on the thread lock for another thread to finish writing.

To call functions from the main Reactor thread, use the call()` or ``call_later() methods.

Running code in threads

The Reactor allows code to be run in separate concurrent threads. The methods call_in_thread() and call_later_in_thread() queue functions to be run by the Reactor's thread pool.

The caller can wait on the response using the returned Result object. A thread from the Reactor's internal thread pool is used to avoid the overhead of creating a new thread.

The Result object

All of the Reactor's call_() methods return a Result object. This means that that all functions executed by the Reactor have an associated Result. The Result object allows the caller to wait for and retrieve the result of a function called by the Reactor. This not only includes any return from the called function, but any exception as well.

Calling the Result object's get_result() method will return whatever the called function returned, will raise an Exception if the called function raised an Exception, or will wait until the scheduled function has been called.

A Result object is used to identify a scheduled function to cancel. It can also be used to check if a scheduled call has been canceled.

Canceling Scheduled Execution

Calling the Reactor methods call_later() and call_in_thread_later() schedules a function to be executed at a later time by the Reactor's main thread, or by one of the Reactor's thread pool threads respectively. If the code has not yet been executed, its scheduled execution can be canceled by calling the cancel_scheduled() method and passing it the Result object returned by call_later() or call_in_thread_later().

The cancel_scheduled() method returns True if the scheduled call was successfully canceled and False if scheduled call was not found (already executed) or already canceled.

Defer additional execution from main thread to pooled thread

Since the reactor main thread blocks on any operation, it is necessary to ensure that callbacks execute quickly and return control back to the reactor to continue processing events. The Reactor's thread pool avoids the overhead of thread creation and executes additional processing and callbacks asynchronously without blocking the Reactor.

When using a thread to execute additional processing and callbacks, the results of the additional processing must be retrieved using the original Result object. This is the purpose of deferred processing.

To use deferred processing, the function executing in the main thread calls defer_to_thread(), returning the object returned by defer_to_thread(). The reactor recognizes the special return value, reuses the original Result object, and queues the given task to a pooled thread.

The caller's Result object is reused to receive the result of calling the specified deferred function.