features/pep-420 / Lib /

The default branch has multiple heads

Full commit
"""Utilities for with-statement contexts.  See PEP 343."""

import sys
from collections import deque
from functools import wraps

__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack"]

class ContextDecorator(object):
    "A base class or mixin that enables context managers to work as decorators."

    def _recreate_cm(self):
        """Return a recreated instance of self.

        Allows an otherwise one-shot context manager like
        _GeneratorContextManager to support use as
        a decorator via implicit recreation.

        This is a private interface just for _GeneratorContextManager.
        See issue #11647 for details.
        return self

    def __call__(self, func):
        def inner(*args, **kwds):
            with self._recreate_cm():
                return func(*args, **kwds)
        return inner

class _GeneratorContextManager(ContextDecorator):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, *args, **kwds):
        self.gen = func(*args, **kwds)
        self.func, self.args, self.kwds = func, args, kwds

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, *self.args, **self.kwds)

    def __enter__(self):
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield")

    def __exit__(self, type, value, traceback):
        if type is None:
            except StopIteration:
                raise RuntimeError("generator didn't stop")
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
                self.gen.throw(type, value, traceback)
                raise RuntimeError("generator didn't stop after throw()")
            except StopIteration as exc:
                # Suppress the exception *unless* it's the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed
                return exc is not value
                # only re-raise if it's *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                if sys.exc_info()[1] is not value:

def contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        def some_generator(<arguments>):
                yield <value>

    This makes this:

        with some_generator(<arguments>) as <variable>:

    equivalent to this:

            <variable> = <value>

    def helper(*args, **kwds):
        return _GeneratorContextManager(func, *args, **kwds)
    return helper

class closing(object):
    """Context to automatically close something at the end of a block.

    Code like this:

        with closing(<module>.open(<arguments>)) as f:

    is equivalent to this:

        f = <module>.open(<arguments>)

    def __init__(self, thing):
        self.thing = thing
    def __enter__(self):
        return self.thing
    def __exit__(self, *exc_info):

# Inspired by discussions on
class ExitStack(object):
    """Context manager for dynamic management of a stack of exit callbacks

    For example:

        with ExitStack() as stack:
            files = [stack.enter_context(open(fname)) for fname in filenames]
            # All opened files will automatically be closed at the end of
            # the with statement, even if attempts to open files later
            # in the list throw an exception

    def __init__(self):
        self._exit_callbacks = deque()

    def pop_all(self):
        """Preserve the context stack by transferring it to a new instance"""
        new_stack = type(self)()
        new_stack._exit_callbacks = self._exit_callbacks
        self._exit_callbacks = deque()
        return new_stack

    def _push_cm_exit(self, cm, cm_exit):
        """Helper to correctly register callbacks to __exit__ methods"""
        def _exit_wrapper(*exc_details):
            return cm_exit(cm, *exc_details)
        _exit_wrapper.__self__ = cm

    def push(self, exit):
        """Registers a callback with the standard __exit__ method signature

        Can suppress exceptions the same way __exit__ methods can.

        Also accepts any object with an __exit__ method (registering a call
        to the method instead of the object itself)
        # We use an unbound method rather than a bound method to follow
        # the standard lookup behaviour for special methods
        _cb_type = type(exit)
            exit_method = _cb_type.__exit__
        except AttributeError:
            # Not a context manager, so assume its a callable
            self._push_cm_exit(exit, exit_method)
        return exit # Allow use as a decorator

    def callback(self, callback, *args, **kwds):
        """Registers an arbitrary callback and arguments.

        Cannot suppress exceptions.
        def _exit_wrapper(exc_type, exc, tb):
            callback(*args, **kwds)
        # We changed the signature, so using @wraps is not appropriate, but
        # setting __wrapped__ may still help with introspection
        _exit_wrapper.__wrapped__ = callback
        return callback # Allow use as a decorator

    def enter_context(self, cm):
        """Enters the supplied context manager

        If successful, also pushes its __exit__ method as a callback and
        returns the result of the __enter__ method.
        # We look up the special methods on the type to match the with statement
        _cm_type = type(cm)
        _exit = _cm_type.__exit__
        result = _cm_type.__enter__(cm)
        self._push_cm_exit(cm, _exit)
        return result

    def close(self):
        """Immediately unwind the context stack"""
        self.__exit__(None, None, None)

    def __enter__(self):
        return self

    def __exit__(self, *exc_details):
        if not self._exit_callbacks:
        # This looks complicated, but it is really just
        # setting up a chain of try-expect statements to ensure
        # that outer callbacks still get invoked even if an
        # inner one throws an exception
        def _invoke_next_callback(exc_details):
            # Callbacks are removed from the list in FIFO order
            # but the recursion means they're invoked in LIFO order
            cb = self._exit_callbacks.popleft()
            if not self._exit_callbacks:
                # Innermost callback is invoked directly
                return cb(*exc_details)
            # More callbacks left, so descend another level in the stack
                suppress_exc = _invoke_next_callback(exc_details)
                suppress_exc = cb(*sys.exc_info())
                # Check if this cb suppressed the inner exception
                if not suppress_exc:
                # Check if inner cb suppressed the original exception
                if suppress_exc:
                    exc_details = (None, None, None)
                suppress_exc = cb(*exc_details) or suppress_exc
            return suppress_exc
        # Kick off the recursive chain
        return _invoke_next_callback(exc_details)