This document is broken up into four sections. The first describes the public part of the API, which is visible to and used by all developers. The second section describes the private API, which is visible to all developers but never needs to be used by most (the exceptions are listed in that section). The third section discusses implementation aspects that are only of interest to developers at the lowest levels. Finally, the last section suggests some possible extensions to this API that need some discussion before being included.
- Asynchronous operation
- An operation that runs independently of the main thread, which typically requires blocking until a result is available.
- Unrelated to context managers, the context is the thread or stack where code is running. Functions like
gevent.getcurrent()are used to find the current context.
- Unrelated to context managers, the context is the thread or stack where code is running. Functions like
And note that this proposal is very (very!) similar to Twisted's Deferred and
inlineCallback classes. I didn't know this at the time I was working on it, not being a Twisted user, but if you feel a sense of deja vu as you read then that may be why.
The public API consists entirely of a class
Future, a function decorator
@async, an exception
CancelledError and class
CancellationSource. In some situations, a subclass of
Scheduler may need to be manually instantiated and started. Users should never expect to create or use instances of
Scheduler directly, since subclasses or compatible objects are more likely (that is,
type(obj) is Future and
isinstance(obj, Future) will often be
False, but the defined methods will be present).
class Future: # Only public members are shown here def done(self) def result(self) def exception(self) class Scheduler: # Only public members are shown here def __init__(self) def run(self, start_with, *args, **kwargs) class CancellationSource: def __init__(self) def __bool__(self) def cancel(self) def cancel_after(self, seconds)
Futures represent the result of an asynchronous operation that may not have completed. (These are compatible with the existing
concurrent.futures.Future class, but also allow for other implementations.) A
Future can be polled for completion by calling
done() or can be waited for by calling
result(). They are never created directly, but are obtained from functions that perform asynchronous operations.
def my_func(): # blocks and waits for the operation to complete result = some_other_operation().result() modified_result = make_modification(result) return modified_result
Callbacks for when
Futures complete are implemented using the
@async decorator. Applying
@async to a function(/generator) means that:
- When called, the wrapped function(/generator) will return a
Futurewill eventually be set to the inner function(/generator)'s return value
Futurethat is yielded from the generator will complete before the next part of the function is run.
The above example now looks like this:
@async def my_func_async(): result = yield some_other_operation() # does not block modified_result = make_modification(result) return modified_result
In the second case, two
Futures are created: one to represent
some_other_operation() and one for
my_func_async(). In both examples, lines 4 and 5 do not execute until line 3 is complete, but in
my_func_async() other tasks are able to run while it is waiting. Importantly, lines 4 and 5 are executed in the same context in both examples - it does not change because of the
(Slight digression: because
my_func_async is just a generator, it can defer to other generators using
yield from, provided they are only yielding
Futures. This provides a performance improvement by allocating less
Futures, but results in a segmented API surface ("you call this async function like this, but this async function like this"). In other words, it is an advanced optimisation whose effects should be localised by always using
@async at API boundaries.)
Three important (and perhaps unexpected) properties of
@async are that:
- Calling an
@asyncfunction runs the first step immediately.
- If the
@asyncfunction returns within the first step, the caller will continue immediately.
- Not every
@asyncfunction has to be a generator.
This both simplifies user code, by not requiring an explicit 'start' function, and allows
@async functions to efficiently perform value caching:
@async def get_value_async(): if _cached: return _cached _cached = yield get_uncached_value_async() return _cached
_cached has been set then
x = yield get_value_async() will return and the caller will continue immediately - the scheduler does not get involved and no other tasks have the opportunity to run. The intent is to only use the scheduler for blocking operations, and so if the function does not yield a
Future, it is not going to block (the same applies if a yielded
Future has already completed). A blank
yield (with no value) can be used to unconditionally defer to the scheduler if desired.
Cancellation is implemented cooperatively by passing a reference to a
CancellationSource object. Only operations supporting cancellation will offer a parameter for this object, and those not supporting it must complete regardless of whether the result will be used or not. The
CancellationSource object implements
__bool__, allowing it to be used directly in conditional statements. By convention,
CancelledError is raised, optionally providing a partial result:
@async def some_op(cancel=None): x = None try: x = yield some_other_op() if cancel: raise CancelledError() # from concurrent.futures y = yield yet_another_op(x) if cancel: # Since we have the final result, return it anyway. raise CancelledError(y) return y finally: if x: # Cancellations cannot interfere with cleanup yield x.cleanup()
This allows code to handle cancellation nicely and perform any cleanup necessary, as well as allowing the function to still return a meaningful value (if desired). Because it is an explicit parameter, cancellation is never unexpected, so developers need not be concerned with exceptions for non-failure states. Also, a single
CancellationSource can be used for many asynchronous operations, greatly simplifying the process of aborting numerous parallel tasks.
Because supporting cancellation is optional, the onus is on library developers to provide support.
Scheduler class should only need to be used programs that require a non-default scheduler that is not 'automatically' provided (say, as part of a GUI framework). The extent of interaction with a scheduler here is instantiation and calling
from library import ASuperSpecialScheduler ASuperSpecialScheduler().run(main)
The callable provided to
run() is necessary to ensure that its code runs after the scheduler starts and before it ends.
run() does not return until the
Future returned by the callable completes. (Since some asynchronous operations may not be known to the scheduler, it cannot determine for itself when everything is done.) This is easily achieved by decorating the called function (in this example,
The "private" API is not necessary for most developers. It is required for implementing event loops and the functions directly responsible for starting asynchronous operations. All methods described in this section should be documented as being unnecessary and unsafe for normal use and potentially should be renamed to include leading underscores.
class Scheduler: @staticmethod def get_current() @staticmethod def set_current(getter) def submit(self, callable, *args, **kwargs) def get_future_for(self, operation, *args, cancel_source=None, **kwargs) class Future: def __init__() def add_done_callback(self, callback) def set_result(self, value) def set_exception(self, exc_value) class CancellationSource: def add_cancel_callback(self, callable, *args, **kwargs)
Implementations of low-level operations need to be able to directly communicate with the
Scheduler, bypassing the 'yield' tree, as well as being able to obtain new
Futures and complete them.
The active scheduler must make itself available by providing a callable to
set_current(). This callable should return a reference to a scheduler suitable for the context in which it is called - the intent is to find a scheduler to get back to where the
get_current() call was made, not to find somewhere else to queue items. Multiple and/or thread-sensitive schedulers can be supported by returning different values from the callable (for example, based on
threading.get_ident()), though since only one callable can be set for the process it is not possible to simultaneously have incompatible schedulers active.
Once the active scheduler has been obtained, typically at the start of the operation (which always runs in the context the user calls it from), its
submit() function can be used to queue subsequent tasks with that scheduler. This is important for OS-level callbacks, which are not always guaranteed to occur on the same context the operation is started from.
# Note - not @async, though it could be and then it would be simpler. def run_on_thread_pool(callable, on_complete): scheduler = Scheduler.get_current() def thread_proc(): r = callable() scheduler.submit(on_complete, r) thread_pool.submit(thread_proc)
Futures significantly reduces the need for
set_exception() are intended to be called from anywhere and callers using
@async will not use that context for user code. Callbacks added using the
add_done_callback() function, however, will run in the context that
set_exception() is called from, and so should normally consist entirely of a call to
submit() using a closure over the target scheduler. (Note that
add_done_callback() internally, and so another callback is not guaranteed to execute before the user has seen the result.)
@async # now with the decorator, but the behaviour is unchanged def run_on_thread_pool(callable, on_complete): r = yield thread_pool.submit(callable) on_complete(r)
get_future_for() function allows schedulers to optimise for certain operations without requiring full prior specification. (This provides roughly equivalent support as Twisted's interfaces, though with quite a different approach.) The idea is that a call to
get_future_for() includes an unbound reference to a blocking function (such as
threading.Condition.wait) and the arguments it would be called with. The implementation of
get_future_for() will check the provided operation against those it supports. If the operation is supported, the scheduler will return a
Future that will be completed when the operation completes; otherwise, it returns
None and the caller has to achieve non-blocking behaviour in another manner.
For example, quite a large number of objects can be waited on for different purposes using
select(), and far more efficiently than using a thread for each individual object. So a scheduler may offer to pool together all pending waits into one (or a couple of) calls to
select(). The implementations may look like this:
@async def socket_read(sock, n): future = Scheduler.get_current().get_future_for(select.select, [sock], None, None) if not future: future = Scheduler.get_current().new_future() def _threadproc(fd): try: future.set_result(select.select([fd], , )) except BaseException as be: future.set_exception(be) Thread(_threadproc, args=(sock,)) rlist, _, _ = yield future # rlist == sock return sock.read(n) ... class MyCleverScheduler: def get_future_for(self, operation, *args, **kwargs): if operation is select.select: future = self.new_future() self.read_futures += [(fd, future) for fd in args] self.write_futures += [(fd, future) for fd in args] self.except_futures += [(fd, future) for fd in args] return future return None # Also requires special handling in the main loop. # See UnthreadedSocketScheduler.py for a more complete example
Other operations that could be more optimally implemented (compared to using a thread) by schedulers include
threading.Condition.wait. Thinking more wildly, some hardware modules may come with Python libraries, and this would allow them to provide hardware accelerated operations that "switch on" when you use the associated scheduler. Because each operation has a fallback, schedulers can be interchanged for different platforms or performance/debugging features without completely breaking a user's program, and since the returned value is a
Future, the operation itself can be implemented in any way that works.
get_future_for() may return
None even if it has returned a
Future for the same operation in the past, based on the state of the scheduler and the parameters provided. For example, if a file descriptor is provided twice for
select, the second time may return
None if the scheduler cannot wait on the same descriptor twice. A scheduler may also return
cancel_source is not None when it cannot support cancellation, though a better approach would be to emulate cancellation using
The implementation of the main loop of the scheduler is not defined, and is subject only to one constraint:
- If a callable is sent to
scheduler_instance.submit(), calls to
get_current()within that callable must return
Other than that requirement, schedulers can do anything they like, including passing submitted callables to another scheduler (such as an event loop provided by a GUI framework).
These are the complete interfaces of the new or revised types required by this proposal. Full sample implementations are available in the repository browser.
def async(fn) class Scheduler: @staticmethod def get_current() @staticmethod def set_current(getter) def __init__(self) def run(self, start_with, *args, **kwargs) def submit(self, callable, *args, **kwargs) def new_future(self) def get_future_for(self, operation, *args, cancel_source=None, **kwargs) class Future: def __init__(self) def done(self) def result(self) def exception(self) def add_done_callback(self, callback) def set_result(self, value) def set_exception(self, exc_value) class CancellationSource: def __init__(self) def __bool__(self) def cancel(self) def cancel_after(self, seconds)
_Awaiter class has not been discussed because it is purely implementation detail and should not need to be touched in order to add a new scheduler or operation. However, because it implements most of the 'smarts' of the system, it is worth a closer look. The full code is here, but simplified it looks like this:
class _Awaiter: '''Implements the callback behavior of functions wrapped with `async`.''' def __init__(self, generator, final_future) self.generator = generator self.final_future = final_future self.target = Scheduler.get_current() def first_step(self): '''Runs the first step of the task.''' next_future = next(self.generator) next_future.add_done_callback(self._dispatch_step) def _dispatch_step(self, prev_future): '''Dispatches the next step of the task to the correct scheduler. This should only be called as a result of being queued to another scheduler. ''' self.target.queue(self._step, prev_future) def _step(self, prev_future): '''Performs the next step of the task. This should only be called as a result of being queued to a scheduler.''' try: if prev_future.exception(): next_future = self.generator.throw(prev_future.exception()) else: next_future = self.generator.send(prev_future.result()) next_future.add_done_callback(self._dispatch_step) except StopIteration: self.final_future.set_result(...) except: self.final_future.set_exception(...) def async(fn): @functools.wraps(fn) def _Async(*args, **kwargs): final_future = Future() awaiter = _Awaiter(fn(*args, **kwargs), final_future) try: awaiter.first_step() except StopIteration: final_future.set_result(...) except: final_future.set_exception(...) return final_future return _Async
(Some obvious refactorings in this abbreviated listing are not actually there in the full code, though there is certainly scope for improving the design.)
One modification that I have not implemented but quite like is changing
_Awaiter to fulfil the
Future contract, which would avoid allocating a separate future in most cases. There are also legitimate reasons to change the function names, but since they are not part of the proposed API this is really irrelevant.
Shared Thread Pool
get_future_for() function on schedulers may return
None, every operation has to provide a fallback that does not block. It is expected that in most cases this will involve a thread or a thread pool. If the scheduler also makes a thread pool available, the number of active threads could be reduced by sharing them between all operations. This would also allow schedulers to collect and report statistics on thread usage, helping developers decide whether they should switch to another scheduler.
Use of the thread pool can be avoided by the user by selecting a scheduler that supports their main operation. For example, a socket-centric application would use a scheduler that supports socket operations, while the thread pool would only be used for comparatively rare operations.
Wrapping an API then looks like this:
@async def socket_read(sock, n): scheduler = Scheduler.get_current() future = scheduler.get_future_for(select.select, [sock], None, None) if not future: future = scheduler.get_thread_pool().submit(select.select, [sock], , ) rlist, _, _ = yield future # rlist == sock return sock.read(n)
get_future_for could be made to automatically use the thread pool and always return a
Future. However, since some operations may have better alternatives (such as querying another operation), this is not such a good option.
Another benefit is that user code can be easily sent to the thread pool. A
@task decorator could be added that will queue the entire method to the thread pool and return a
Future, making it compatible with
@async and simplifying thread parallelism. It seems very likely that users will want to use
@async with their own threaded code, so it may be worth encouraging them to do so in a consistent manner - ideally without requiring them to directly access the current
Some of the major differences between the
@task decorators include:
|Can ||Cannot |
|Should not make blocking calls||Can make blocking calls|
|Starts running immediately||Does not start running immediately|
|Can quickly return cached results||Cannot quickly return cached results|
|Always runs in current context||May run in another context|