Source

python-peps / pep-3148.txt

The default branch has multiple heads

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
PEP:               3148
Title:             futures - execute computations asynchronously
Version:           $Revision$
Last-Modified:     $Date$
Author:            Brian Quinlan <brian@sweetapp.com>
Status:            Final
Type:              Standards Track
Content-Type:      text/x-rst
Created:           16-Oct-2009
Python-Version:    3.2
Post-History:

========
Abstract
========

This PEP proposes a design for a package that facilitates the
evaluation of callables using threads and processes.

==========
Motivation
==========

Python currently has powerful primitives to construct multi-threaded
and multi-process applications but parallelizing simple operations
requires a lot of work i.e. explicitly launching processes/threads,
constructing a work/results queue, and waiting for completion or some
other termination condition (e.g.  failure, timeout).  It is also
difficult to design an application with a global process/thread limit
when each component invents its own parallel execution strategy.

=============
Specification
=============

Naming
------

The proposed package would be called "futures" and would live in a new
"concurrent" top-level package. The rationale behind pushing the
futures library into a "concurrent" namespace has multiple components.
The first, most simple one is to prevent any and all confusion with
the existing "from __future__ import x" idiom which has been in use
for a long time within Python. Additionally, it is felt that adding
the "concurrent" precursor to the name fully denotes what the library
is related to - namely concurrency - this should clear up any addition
ambiguity as it has been noted that not everyone in the community is
familiar with Java Futures, or the Futures term except as it relates
to the US stock market.

Finally; we are carving out a new namespace for the standard library -
obviously named "concurrent". We hope to either add, or move existing,
concurrency-related libraries to this in the future.  A prime example
is the multiprocessing.Pool work, as well as other "addons" included
in that module, which work across thread and process boundaries.

Interface
---------

The proposed package provides two core classes: `Executor` and
`Future`. An `Executor` receives asynchronous work requests (in terms
of a callable and its arguments) and returns a `Future` to represent
the execution of that work request.

Executor
''''''''

`Executor` is an abstract class that provides methods to execute calls
asynchronously.

``submit(fn, *args, **kwargs)``

    Schedules the callable to be executed as ``fn(*args, **kwargs)``
    and returns a `Future` instance representing the execution of the
    callable.

    This is an abstract method and must be implemented by Executor
    subclasses.

``map(func, *iterables, timeout=None)``

    Equivalent to ``map(func, *iterables)`` but func is executed 
    asynchronously and several calls to func may be made concurrently.
    The returned iterator raises a `TimeoutError` if `__next__()` is
    called and the result isn't available after *timeout* seconds from
    the original call to `map()`.  If *timeout* is not specified or
    `None` then there is no limit to the wait time.  If a call raises
    an exception then that exception will be raised when its value is
    retrieved from the iterator.

``shutdown(wait=True)``

    Signal the executor that it should free any resources that it is
    using when the currently pending futures are done executing.
    Calls to `Executor.submit` and `Executor.map` and made after
    shutdown will raise `RuntimeError`.
 
    If wait is `True` then this method will not return until all the
    pending futures are done executing and the resources associated
    with the executor have been freed. If wait is `False` then this
    method will return immediately and the resources associated with
    the executor will be freed when all pending futures are done
    executing. Regardless of the value of wait, the entire Python
    program will not exit until all pending futures are done
    executing.

| ``__enter__()``
| ``__exit__(exc_type, exc_val, exc_tb)``

    When using an executor as a context manager, `__exit__` will call
    ``Executor.shutdown(wait=True)``.


ProcessPoolExecutor
'''''''''''''''''''

The `ProcessPoolExecutor` class is an `Executor` subclass that uses a
pool of processes to execute calls asynchronously.  The callable
objects and arguments passed to `ProcessPoolExecutor.submit` must be
pickleable according to the same limitations as the multiprocessing
module.

Calling `Executor` or `Future` methods from within a callable
submitted to a `ProcessPoolExecutor` will result in deadlock.

``__init__(max_workers)``

    Executes calls asynchronously using a pool of a most *max_workers*
    processes.  If *max_workers* is ``None`` or not given then as many
    worker processes will be created as the machine has processors.

ThreadPoolExecutor
''''''''''''''''''

The `ThreadPoolExecutor` class is an `Executor` subclass that uses a
pool of threads to execute calls asynchronously.

Deadlock can occur when the callable associated with a `Future` waits
on the results of another `Future`.  For example::

    import time
    def wait_on_b():
        time.sleep(5)
        print(b.result())  # b will never complete because it is waiting on a.
        return 5

    def wait_on_a():
        time.sleep(5)
        print(a.result())  # a will never complete because it is waiting on b.
        return 6


    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)

And::

    def wait_on_future():
        f = executor.submit(pow, 5, 2)
        # This will never complete because there is only one worker thread and
        # it is executing this function.
        print(f.result())
    
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)

``__init__(max_workers)``

    Executes calls asynchronously using a pool of at most
    *max_workers* threads.

Future Objects
''''''''''''''

The `Future` class encapsulates the asynchronous execution of a
callable. `Future` instances are returned by `Executor.submit`.

``cancel()``

    Attempt to cancel the call.  If the call is currently being
    executed then it cannot be cancelled and the method will return
    `False`, otherwise the call will be cancelled and the method will
    return `True`.

``cancelled()``

    Return `True` if the call was successfully cancelled.

``running()``

    Return `True` if the call is currently being executed and cannot
    be cancelled.

``done()``

    Return `True` if the call was successfully cancelled or finished
    running.

``result(timeout=None)``

    Return the value returned by the call.  If the call hasn't yet
    completed then this method will wait up to *timeout* seconds.  If
    the call hasn't completed in *timeout* seconds then a
    `TimeoutError` will be raised.  If *timeout* is not specified or
    `None` then there is no limit to the wait time.
     
    If the future is cancelled before completing then `CancelledError`
    will be raised.
     
    If the call raised then this method will raise the same exception.

``exception(timeout=None)``

    Return the exception raised by the call.  If the call hasn't yet
    completed then this method will wait up to *timeout* seconds.  If
    the call hasn't completed in *timeout* seconds then a
    `TimeoutError` will be raised.  If *timeout* is not specified or
    ``None`` then there is no limit to the wait time.
     
    If the future is cancelled before completing then `CancelledError`
    will be raised.
     
    If the call completed without raising then `None` is returned.

``add_done_callback(fn)``

    Attaches a callable *fn* to the future that will be called when
    the future is cancelled or finishes running.  *fn* will be called
    with the future as its only argument.
     
    Added callables are called in the order that they were added and
    are always called in a thread belonging to the process that added
    them.  If the callable raises an `Exception` then it will be
    logged and ignored.  If the callable raises another
    `BaseException` then behavior is not defined.
     
    If the future has already completed or been cancelled then *fn*
    will be called immediately.

Internal Future Methods
^^^^^^^^^^^^^^^^^^^^^^^

The following `Future` methods are meant for use in unit tests and
`Executor` implementations.

``set_running_or_notify_cancel()``

    Should be called by `Executor` implementations before executing
    the work associated with the `Future`.
     
    If the method returns `False` then the `Future` was cancelled,
    i.e.  `Future.cancel` was called and returned `True`.  Any threads
    waiting on the `Future` completing (i.e. through `as_completed()`
    or `wait()`) will be woken up.
     
    If the method returns `True` then the `Future` was not cancelled
    and has been put in the running state, i.e. calls to
    `Future.running()` will return `True`.
     
    This method can only be called once and cannot be called after
    `Future.set_result()` or `Future.set_exception()` have been
    called.

``set_result(result)``

    Sets the result of the work associated with the `Future`.

``set_exception(exception)``

    Sets the result of the work associated with the `Future` to the
    given `Exception`.
   
Module Functions
''''''''''''''''

``wait(fs, timeout=None, return_when=ALL_COMPLETED)``

    Wait for the `Future` instances (possibly created by different
    `Executor` instances) given by *fs* to complete.  Returns a named
    2-tuple of sets.  The first set, named "done", contains the
    futures that completed (finished or were cancelled) before the
    wait completed.  The second set, named "not_done", contains
    uncompleted futures.
     
    *timeout* can be used to control the maximum number of seconds to
    wait before returning.  If timeout is not specified or None then
    there is no limit to the wait time.
     
    *return_when* indicates when the method should return.  It must be
    one of the following constants:
     
    ============================= ==================================================
     Constant                      Description
    ============================= ==================================================
    `FIRST_COMPLETED`             The method will return when any future finishes or
                                  is cancelled.
    `FIRST_EXCEPTION`             The method will return when any future finishes by
                                  raising an exception. If not future raises an
                                  exception then it is equivalent to ALL_COMPLETED.
    `ALL_COMPLETED`               The method will return when all calls finish.
    ============================= ==================================================

``as_completed(fs, timeout=None)``

    Returns an iterator over the `Future` instances given by *fs* that
    yields futures as they complete (finished or were cancelled).  Any
    futures that completed before `as_completed()` was called will be
    yielded first.  The returned iterator raises a `TimeoutError` if
    `__next__()` is called and the result isn't available after
    *timeout* seconds from the original call to `as_completed()`.  If
    *timeout* is not specified or `None` then there is no limit to the
    wait time.

    The `Future` instances can have been created by different
    `Executor` instances.

Check Prime Example
-------------------

::

    from concurrent import futures
    import math

    PRIMES = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419]

    def is_prime(n):
        if n % 2 == 0:
            return False

        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True

    def main():
        with futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime,
                                                          PRIMES)):
                print('%d is prime: %s' % (number, prime))

    if __name__ == '__main__':
        main()

Web Crawl Example
-----------------

::

    from concurrent import futures
    import urllib.request

    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']

    def load_url(url, timeout):
        return urllib.request.urlopen(url, timeout=timeout).read()

    def main():
        with futures.ThreadPoolExecutor(max_workers=5) as executor:
            future_to_url = dict(
                (executor.submit(load_url, url, 60), url)
                 for url in URLS)
    
            for future in futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    print('%r page is %d bytes' % (
                              url, len(future.result())))
                except Exception as e:
                    print('%r generated an exception: %s' % (
                              url, e))

    if __name__ == '__main__':
        main()

=========
Rationale
=========

The proposed design of this module was heavily influenced by the the
Java java.util.concurrent package [1]_.  The conceptual basis of the
module, as in Java, is the Future class, which represents the progress
and result of an asynchronous computation.  The Future class makes
little commitment to the evaluation mode being used e.g. it can be be
used to represent lazy or eager evaluation, for evaluation using
threads, processes or remote procedure call.

Futures are created by concrete implementations of the Executor class
(called ExecutorService in Java).  The reference implementation
provides classes that use either a process or a thread pool to eagerly
evaluate computations.

Futures have already been seen in Python as part of a popular Python
cookbook recipe [2]_ and have discussed on the Python-3000 mailing
list [3]_.

The proposed design is explicit, i.e. it requires that clients be
aware that they are consuming Futures.  It would be possible to design
a module that would return proxy objects (in the style of `weakref`)
that could be used transparently.  It is possible to build a proxy
implementation on top of the proposed explicit mechanism.

The proposed design does not introduce any changes to Python language
syntax or semantics.  Special syntax could be introduced [4]_ to mark
function and method calls as asynchronous.  A proxy result would be
returned while the operation is eagerly evaluated asynchronously, and
execution would only block if the proxy object were used before the
operation completed.

Anh Hai Trinh proposed a simpler but more limited API concept [5]_ and
the API has been discussed in some detail on stdlib-sig [6]_.

The proposed design was discussed on the Python-Dev mailing list [7]_.
Following those discussions, the following changes were made:

* The `Executor` class was made into an abstract base class
* The `Future.remove_done_callback` method was removed due to a lack
  of convincing use cases
* The `Future.add_done_callback` method was modified to allow the
  same callable to be added many times
* The `Future` class's mutation methods were better documented to
  indicate that they are private to the `Executor` that created them

========================
Reference Implementation
========================

The reference implementation [8]_ contains a complete implementation
of the proposed design.  It has been tested on Linux and Mac OS X.

==========
References
==========

.. [1]
   `java.util.concurrent` package documentation
   http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html

.. [2]
   Python Cookbook recipe 84317, "Easy threading with Futures"
   http://code.activestate.com/recipes/84317/

.. [3]
   `Python-3000` thread, "mechanism for handling asynchronous concurrency"
   http://mail.python.org/pipermail/python-3000/2006-April/000960.html

.. [4]
   `Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)"
   http://mail.python.org/pipermail/python-3000/2006-April/000970.html

.. [5]
   A discussion of `stream`, a similar concept proposed by Anh Hai Trinh
   http://www.mail-archive.com/stdlib-sig@python.org/msg00480.html

.. [6]
   A discussion of the proposed API on stdlib-sig
   http://mail.python.org/pipermail/stdlib-sig/2009-November/000731.html

.. [7]
   A discussion of the PEP on python-dev
   http://mail.python.org/pipermail/python-dev/2010-March/098169.html

.. [8]
   Reference `futures` implementation
   http://code.google.com/p/pythonfutures/source/browse/#svn/branches/feedback

=========
Copyright
=========

This document has been placed in the public domain.



..
   Local Variables:
   mode: indented-text
   indent-tabs-mode: nil
   sentence-end-double-space: t
   fill-column: 70
   coding: utf-8
   End: