Source

txwsgi / txwsgi / greenlet.py

"""I{Green threads} support for asynchronous
U{Web Server Gateway Interface<http://www.python.org/dev/peps/pep-0333/>}
implementations.

Code is experimental.

THIS SOFTWARE IS UNDER MIT LICENSE.
Copyright (c) 2010 Manlio Perillo (manlio.perillo@gmail.com)

Read LICENSE file for more informations.
"""

from __future__ import absolute_import

from greenlet import getcurrent, greenlet


__all__ = [ 'write_adapter', 'green_adapter',
            'GTimedout',
            'gstate', 'gsuspend', 'gresume', 'gcontinue', 'gthrow' ]


class GState(tuple):
    def __new__(cls, data, action):
        return tuple.__new__(cls, (data, action))


GCONTINUE = 0
GSUSPEND = 1


#
# Per green thread state dictionary
# TODO: keys are greenlet memory address, this may not be safe
#

_gthread_state = {}


#
# API to control greenlets scheduling
#

class GTimedout(RuntimeError):
    pass


def gstate():
    """Return state dictionary for the current WSGI request thread.
    """

    gp = getcurrent()
    return _gthread_state[gp]

def gsuspend(timeout=None):
    """Suspend execution of the current WSGI request thread.

    Return the data specified by the L{gresume} function.
    If C{timeout} is specified, and expired, a L{GTimedout} exception
    is raised in the current WSGI request thread.
    """

    gp = getcurrent()
    return gp.parent.switch(GState(timeout, GSUSPEND))

def gresume(state, data=None, exc_info=None):
    """Resume execution of the current WSGI request thread.

    This function should usually be called in a callback function, and
    not in the normal request thread.

    The passed C{data} will be the return value of L{gsuspend}.
    If C{exc_info} is not None, L{gsuspend} will raise an exception
    using supplied C{exc_info}.
    """

    # TODO: in case of error, it will not be logged
    resume = state['resume']
    resumed = resume()
    if not resumed:
        # Ignore
        # TODO: add logging?
        return False

    if exc_info is not None:
        state['exc_info'] = exc_info
    else:
        state['data'] = data

    return True

def gcontinue(buf):
    """Continue current WSGI execution thread, yielding the passed
    buffer to the WSGI gateway.

    B{WARNING}:
        this function may cause data to be yielded to the WSGI
        gateway *before* the WSGI application returns.
        This is not an issue with WSGI 1.0, where it is used to
        implement the I{write} callable, but can not be supported in
        WSGI 2.0.

    XXX: this should not be exposed to WSGI applications
    """

    gp = getcurrent()
    gp.parent.switch(GState(buf, GCONTINUE))

def gthrow(exc):
    """Throw C{exc} exception to the current WSGI request thread.

    XXX: this should not be exposed to WSGI applications
    """

    gp = getcurrent()
    gp.throw(exc)


class green_adapter(object):
    """An adapter used as a glue between an asynchronous I{WSGI}
    implementation and a I{WSGI} application using greenlets.

    B{BUG}:
        this adapter does not support WSGI applications that return a
        generator.
    """

    def __init__(self, application):
        self.app = application

    def __call__(self, environ, start_response):
        def _start_response(status, headers):
            start_response(status, headers)

            # Since write callable can not be implemented in an
            # asynchronous WSGI gateway
            return self.write

        # Initialize greenlet and state
        gp = greenlet(self.app)

        state = {'environ': environ}
        _gthread_state[gp] = state

        log = environ['wsgi.errors']
        suspend = environ['x-wsgiorg.suspend']
        suspend_status = environ['x-wsgiorg.suspend_status']
        self.in_app_iter = False

        r = gp.switch(environ, _start_response)
        while isinstance(r, GState):
            data, action = r
            log.write('action: %s' % action)

            if action == GSUSPEND:
                resume = suspend(data)
                state['resume'] = resume

                yield ''

                # Check suspend status, and return control to the
                # gsuspend function
                status = suspend_status()
                log.write('suspend status: %s' % status)
                assert status

                if status == -1:
                    r = gp.throw(GTimedout)
                else:
                    exc_info = state.pop('exc_info', None)
                    if exc_info is not None:
                        r = gp.throw(*exc_info)
                    else:
                        data = state.pop('data', None)
                        r = gp.switch(data)

                continue
            elif action == GCONTINUE:
                yield data
            else:
                raise ValueError('unknown action %d' % action)

            r = gp.switch()

        # The application returned the app_iter object
        log.write('in app iter')
        self.in_app_iter = True
        for buf in r:
            yield buf

    def write(self, buf):
        """WSGI "safe" write callable.
        """

        if self.in_app_iter:
            exc = ValueError(
                'write callable called from application iterator')
            gthrow(exc)
        else:
            gcontinue(buf)


class write_adapter(object):
    """A WSGI adapter (middleware) that use I{greenlet} to provide
    I{write} callable support for asynchronous WSGI implementations.

    WSGI I{write} callable B{can not} be implemented in a pure (that
    is, threads are not use) asynchronous server.
    """

    def __init__(self, application):
        self.app = application

    def __call__(self, environ, start_response):
        def _start_response(status, headers):
            start_response(status, headers)
            return self.write

        # Initialize greenlet and state
        gp = greenlet(self.app)
        self.in_app_iter = False

        r = gp.switch(environ, _start_response)
        while isinstance(r, str):
            # Got data from the write callable
            yield r

            r = gp.switch()

        # The application returned the app_iter object
        # From now on, it can no more call the write callable
        # XXX check me
        self.in_app_iter = True
        for buf in r:
            yield buf

    def write(self, buf):
        gp = getcurrent()

        if self.in_app_iter:
            exc = ValueError(
                'write callable called from application iterator')
            gp.throw(exc)
        else:
            gp.parent.switch(buf)