Wiki

Clone wiki

wattle / Proposal

This document is broken up into four sections. The first describes the public part of the API, which is visible to and used by all developers. The second section describes the private API, which is visible to all developers but never needs to be used by most (the exceptions are listed in that section). The third section discusses implementation aspects that are only of interest to developers at the lowest levels. Finally, the last section suggests some possible extensions to this API that need some discussion before being included.

Some definitions:

  • Asynchronous operation
    • An operation that runs independently of the main thread, which typically requires blocking until a result is available.
  • Context
    • Unrelated to context managers, the context is the thread or stack where code is running. Functions like threading.get_ident() or gevent.getcurrent() are used to find the current context.

And note that this proposal is very (very!) similar to Twisted's Deferred and inlineCallback classes. I didn't know this at the time I was working on it, not being a Twisted user, but if you feel a sense of deja vu as you read then that may be why.

Public API

The public API consists entirely of a class Future, a function decorator @​async, an exception CancelledError and class CancellationSource. In some situations, a subclass of Scheduler may need to be manually instantiated and started. Users should never expect to create or use instances of Future or Scheduler directly, since subclasses or compatible objects are more likely (that is, type(obj) is Future and isinstance(obj, Future) will often be False, but the defined methods will be present).

class Future:
    # Only public members are shown here
    def done(self)
    def result(self)
    def exception(self)

class Scheduler:
    # Only public members are shown here
    def __init__(self)
    def run(self, start_with, *args, **kwargs)

class CancellationSource:
    def __init__(self)
    def __bool__(self)
    def cancel(self)
    def cancel_after(self, seconds)

Futures represent the result of an asynchronous operation that may not have completed. (These are compatible with the existing concurrent.futures.Future class, but also allow for other implementations.) A Future can be polled for completion by calling done() or can be waited for by calling result(). They are never created directly, but are obtained from functions that perform asynchronous operations.

def my_func():
    # blocks and waits for the operation to complete
    result = some_other_operation().result()    
    modified_result = make_modification(result)
    return modified_result

Callbacks for when Futures complete are implemented using the @​async decorator. Applying @​async to a function(/generator) means that:

  1. When called, the wrapped function(/generator) will return a Future
  2. That Future will eventually be set to the inner function(/generator)'s return value
  3. Any Future that is yielded from the generator will complete before the next part of the function is run.

The above example now looks like this:

@async
def my_func_async():
    result = yield some_other_operation()       # does not block
    modified_result = make_modification(result)
    return modified_result

In the second case, two Futures are created: one to represent some_other_operation() and one for my_func_async(). In both examples, lines 4 and 5 do not execute until line 3 is complete, but in my_func_async() other tasks are able to run while it is waiting. Importantly, lines 4 and 5 are executed in the same context in both examples - it does not change because of the yield.

(Slight digression: because my_func_async is just a generator, it can defer to other generators using yield from, provided they are only yielding Futures. This provides a performance improvement by allocating less Futures, but results in a segmented API surface ("you call this async function like this, but this async function like this"). In other words, it is an advanced optimisation whose effects should be localised by always using @​async at API boundaries.)

Three important (and perhaps unexpected) properties of @​async are that:

  • Calling an @​async function runs the first step immediately.
  • If the @​async function returns within the first step, the caller will continue immediately.
  • Not every @​async function has to be a generator.

This both simplifies user code, by not requiring an explicit 'start' function, and allows @​async functions to efficiently perform value caching:

@async
def get_value_async():
    if _cached:
        return _cached
    _cached = yield get_uncached_value_async()
    return _cached

If _cached has been set then x = yield get_value_async() will return and the caller will continue immediately - the scheduler does not get involved and no other tasks have the opportunity to run. The intent is to only use the scheduler for blocking operations, and so if the function does not yield a Future, it is not going to block (the same applies if a yielded Future has already completed). A blank yield (with no value) can be used to unconditionally defer to the scheduler if desired.

Cancellation is implemented cooperatively by passing a reference to a CancellationSource object. Only operations supporting cancellation will offer a parameter for this object, and those not supporting it must complete regardless of whether the result will be used or not. The CancellationSource object implements __bool__, allowing it to be used directly in conditional statements. By convention, CancelledError is raised, optionally providing a partial result:

@async
def some_op(cancel=None):
    x = None
    try:
        x = yield some_other_op()
        if cancel:
            raise CancelledError() # from concurrent.futures

        y = yield yet_another_op(x)
        if cancel:
            # Since we have the final result, return it anyway.
            raise CancelledError(y)
        return y
    finally:
        if x:
            # Cancellations cannot interfere with cleanup
            yield x.cleanup()

This allows code to handle cancellation nicely and perform any cleanup necessary, as well as allowing the function to still return a meaningful value (if desired). Because it is an explicit parameter, cancellation is never unexpected, so developers need not be concerned with exceptions for non-failure states. Also, a single CancellationSource can be used for many asynchronous operations, greatly simplifying the process of aborting numerous parallel tasks.

Because supporting cancellation is optional, the onus is on library developers to provide support.

The Scheduler class should only need to be used programs that require a non-default scheduler that is not 'automatically' provided (say, as part of a GUI framework). The extent of interaction with a scheduler here is instantiation and calling run():

from library import ASuperSpecialScheduler
ASuperSpecialScheduler().run(main)

The callable provided to run() is necessary to ensure that its code runs after the scheduler starts and before it ends. run() does not return until the Future returned by the callable completes. (Since some asynchronous operations may not be known to the scheduler, it cannot determine for itself when everything is done.) This is easily achieved by decorating the called function (in this example, main) with @​async.

Private API

The "private" API is not necessary for most developers. It is required for implementing event loops and the functions directly responsible for starting asynchronous operations. All methods described in this section should be documented as being unnecessary and unsafe for normal use and potentially should be renamed to include leading underscores.

class Scheduler:
    @staticmethod
    def get_current()
    @staticmethod
    def set_current(getter)

    def submit(self, callable, *args, **kwargs)
    def get_future_for(self, operation, *args, cancel_source=None, **kwargs)

class Future:
    def __init__()
    def add_done_callback(self, callback)
    def set_result(self, value)
    def set_exception(self, exc_value)

class CancellationSource:
    def add_cancel_callback(self, callable, *args, **kwargs)

Implementations of low-level operations need to be able to directly communicate with the Scheduler, bypassing the 'yield' tree, as well as being able to obtain new Futures and complete them.

The active scheduler must make itself available by providing a callable to set_current(). This callable should return a reference to a scheduler suitable for the context in which it is called - the intent is to find a scheduler to get back to where the get_current() call was made, not to find somewhere else to queue items. Multiple and/or thread-sensitive schedulers can be supported by returning different values from the callable (for example, based on threading.get_ident()), though since only one callable can be set for the process it is not possible to simultaneously have incompatible schedulers active.

Once the active scheduler has been obtained, typically at the start of the operation (which always runs in the context the user calls it from), its submit() function can be used to queue subsequent tasks with that scheduler. This is important for OS-level callbacks, which are not always guaranteed to occur on the same context the operation is started from.

# Note - not @async, though it could be and then it would be simpler.
def run_on_thread_pool(callable, on_complete):
    scheduler = Scheduler.get_current()
    
    def thread_proc():
        r = callable()
        scheduler.submit(on_complete, r)
    
    thread_pool.submit(thread_proc)

However, using Futures significantly reduces the need for submit(). Both set_result() and set_exception() are intended to be called from anywhere and callers using result() or @​async will not use that context for user code. Callbacks added using the add_done_callback() function, however, will run in the context that set_result() or set_exception() is called from, and so should normally consist entirely of a call to submit() using a closure over the target scheduler. (Note that @​async uses add_done_callback() internally, and so another callback is not guaranteed to execute before the user has seen the result.)

@async  # now with the decorator, but the behaviour is unchanged
def run_on_thread_pool(callable, on_complete):
    r = yield thread_pool.submit(callable)
    on_complete(r)

The get_future_for() function allows schedulers to optimise for certain operations without requiring full prior specification. (This provides roughly equivalent support as Twisted's interfaces, though with quite a different approach.) The idea is that a call to get_future_for() includes an unbound reference to a blocking function (such as select.select or threading.Condition.wait) and the arguments it would be called with. The implementation of get_future_for() will check the provided operation against those it supports. If the operation is supported, the scheduler will return a Future that will be completed when the operation completes; otherwise, it returns None and the caller has to achieve non-blocking behaviour in another manner.

For example, quite a large number of objects can be waited on for different purposes using select(), and far more efficiently than using a thread for each individual object. So a scheduler may offer to pool together all pending waits into one (or a couple of) calls to select(). The implementations may look like this:

@async
def socket_read(sock, n):
    future = Scheduler.get_current().get_future_for(select.select, [sock], None, None)

    if not future:
        future = Scheduler.get_current().new_future()
        def _threadproc(fd):
            try:
                future.set_result(select.select([fd], [], []))
            except BaseException as be:
                future.set_exception(be)
        Thread(_threadproc, args=(sock,))

    rlist, _, _ = yield future
    # rlist[0] == sock
    return sock.read(n)

...

class MyCleverScheduler:
    def get_future_for(self, operation, *args, **kwargs):
        if operation is select.select:
            future = self.new_future()
            self.read_futures += [(fd, future) for fd in args[0]]
            self.write_futures += [(fd, future) for fd in args[1]]
            self.except_futures += [(fd, future) for fd in args[2]]
            return future
        return None
    
    # Also requires special handling in the main loop.
    # See UnthreadedSocketScheduler.py for a more complete example

Other operations that could be more optimally implemented (compared to using a thread) by schedulers include time.sleep and threading.Condition.wait. Thinking more wildly, some hardware modules may come with Python libraries, and this would allow them to provide hardware accelerated operations that "switch on" when you use the associated scheduler. Because each operation has a fallback, schedulers can be interchanged for different platforms or performance/debugging features without completely breaking a user's program, and since the returned value is a Future, the operation itself can be implemented in any way that works.

get_future_for() may return None even if it has returned a Future for the same operation in the past, based on the state of the scheduler and the parameters provided. For example, if a file descriptor is provided twice for select, the second time may return None if the scheduler cannot wait on the same descriptor twice. A scheduler may also return None if cancel_source is not None when it cannot support cancellation, though a better approach would be to emulate cancellation using add_cancel_callback.

The implementation of the main loop of the scheduler is not defined, and is subject only to one constraint:

  • If a callable is sent to scheduler_instance.submit(), calls to get_current() within that callable must return scheduler_instance.

Other than that requirement, schedulers can do anything they like, including passing submitted callables to another scheduler (such as an event loop provided by a GUI framework).

Implementation

These are the complete interfaces of the new or revised types required by this proposal. Full sample implementations are available in the repository browser.

def async(fn)

class Scheduler:
    @staticmethod
    def get_current()
    @staticmethod
    def set_current(getter)

    def __init__(self)
    def run(self, start_with, *args, **kwargs)
    def submit(self, callable, *args, **kwargs)
    def new_future(self)
    def get_future_for(self, operation, *args, cancel_source=None, **kwargs)

class Future:
    def __init__(self)
    def done(self)
    def result(self)
    def exception(self)
    def add_done_callback(self, callback)
    def set_result(self, value)
    def set_exception(self, exc_value)

class CancellationSource:
    def __init__(self)
    def __bool__(self)
    def cancel(self)
    def cancel_after(self, seconds)

The _Awaiter class has not been discussed because it is purely implementation detail and should not need to be touched in order to add a new scheduler or operation. However, because it implements most of the 'smarts' of the system, it is worth a closer look. The full code is here, but simplified it looks like this:

class _Awaiter:
    '''Implements the callback behavior of functions wrapped with `async`.'''
    def __init__(self, generator, final_future)
        self.generator = generator
        self.final_future = final_future
        self.target = Scheduler.get_current()
    
    def first_step(self):
        '''Runs the first step of the task.'''
        next_future = next(self.generator)
        next_future.add_done_callback(self._dispatch_step)
    
    def _dispatch_step(self, prev_future):
        '''Dispatches the next step of the task to the correct scheduler. This
        should only be called as a result of being queued to another scheduler.
        '''
        self.target.queue(self._step, prev_future)
    
    def _step(self, prev_future):
        '''Performs the next step of the task. This should only be called as a
        result of being queued to a scheduler.'''
        try:
            if prev_future.exception():
                next_future = self.generator.throw(prev_future.exception())
            else:
                next_future = self.generator.send(prev_future.result())
            
            next_future.add_done_callback(self._dispatch_step)
        except StopIteration:
            self.final_future.set_result(...)
        except:
            self.final_future.set_exception(...)

def async(fn):
    @functools.wraps(fn)
    def _Async(*args, **kwargs):
        final_future = Future()
        awaiter = _Awaiter(fn(*args, **kwargs), final_future)
        try:
            awaiter.first_step()
        except StopIteration:
            final_future.set_result(...)
        except:
            final_future.set_exception(...)
        return final_future
    return _Async

(Some obvious refactorings in this abbreviated listing are not actually there in the full code, though there is certainly scope for improving the design.)

One modification that I have not implemented but quite like is changing _Awaiter to fulfil the Future contract, which would avoid allocating a separate future in most cases. There are also legitimate reasons to change the function names, but since they are not part of the proposed API this is really irrelevant.

Other Ideas

Shared Thread Pool

Because the get_future_for() function on schedulers may return None, every operation has to provide a fallback that does not block. It is expected that in most cases this will involve a thread or a thread pool. If the scheduler also makes a thread pool available, the number of active threads could be reduced by sharing them between all operations. This would also allow schedulers to collect and report statistics on thread usage, helping developers decide whether they should switch to another scheduler.

Use of the thread pool can be avoided by the user by selecting a scheduler that supports their main operation. For example, a socket-centric application would use a scheduler that supports socket operations, while the thread pool would only be used for comparatively rare operations.

Wrapping an API then looks like this:

@async
def socket_read(sock, n):
    scheduler = Scheduler.get_current()
    future = scheduler.get_future_for(select.select, [sock], None, None)
    if not future:
        future = scheduler.get_thread_pool().submit(select.select, [sock], [], [])
    rlist, _, _ = yield future
    # rlist[0] == sock
    return sock.read(n)

Alternatively, get_future_for could be made to automatically use the thread pool and always return a Future. However, since some operations may have better alternatives (such as querying another operation), this is not such a good option.

Another benefit is that user code can be easily sent to the thread pool. A @​task decorator could be added that will queue the entire method to the thread pool and return a Future, making it compatible with @​async and simplifying thread parallelism. It seems very likely that users will want to use @​async with their own threaded code, so it may be worth encouraging them to do so in a consistent manner - ideally without requiring them to directly access the current Scheduler instance.

Some of the major differences between the @​async and @​task decorators include:

@​async@​task
Can yield futuresCannot yield futures
Should not make blocking callsCan make blocking calls
Starts running immediatelyDoes not start running immediately
Can quickly return cached resultsCannot quickly return cached results
Always runs in current contextMay run in another context

Updated