Source

paste / paste / cgiapp.py

Full commit
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php

"""
Application that runs a CGI script.
"""
import os
import sys
import subprocess
import urllib
try:
    import select
except ImportError:
    select = None

from paste.util import converters

__all__ = ['CGIError', 'CGIApplication']

class CGIError(Exception):
    """
    Raised when the CGI script can't be found or doesn't
    act like a proper CGI script.
    """

class CGIApplication(object):

    """
    This object acts as a proxy to a CGI application.  You pass in the
    script path (``script``), an optional path to search for the
    script (if the name isn't absolute) (``path``).  If you don't give
    a path, then ``$PATH`` will be used.
    """

    def __init__(self,
                 global_conf,
                 script,
                 path=None,
                 include_os_environ=True,
                 query_string=None):
        if global_conf:
            raise NotImplemented(
                "global_conf is no longer supported for CGIApplication "
                "(use make_cgi_application); please pass None instead")
        self.script_filename = script
        if path is None:
            path = os.environ.get('PATH', '').split(':')
        self.path = path
        if '?' in script:
            assert query_string is None, (
                "You cannot have '?' in your script name (%r) and also "
                "give a query_string (%r)" % (script, query_string))
            script, query_string = script.split('?', 1)
        if os.path.abspath(script) != script:
            # relative path
            for path_dir in self.path:
                if os.path.exists(os.path.join(path_dir, script)):
                    self.script = os.path.join(path_dir, script)
                    break
            else:
                raise CGIError(
                    "Script %r not found in path %r"
                    % (script, self.path))
        else:
            self.script = script
        self.include_os_environ = include_os_environ
        self.query_string = query_string

    def __call__(self, environ, start_response):
        if 'REQUEST_URI' not in environ:
            environ['REQUEST_URI'] = (
                urllib.quote(environ.get('SCRIPT_NAME', ''))
                + urllib.quote(environ.get('PATH_INFO', '')))
        if self.include_os_environ:
            cgi_environ = os.environ.copy()
        else:
            cgi_environ = {}
        for name in environ:
            # Should unicode values be encoded?
            if (name.upper() == name
                and isinstance(environ[name], str)):
                cgi_environ[name] = environ[name]
        if self.query_string is not None:
            old = cgi_environ.get('QUERY_STRING', '')
            if old:
                old += '&'
            cgi_environ['QUERY_STRING'] = old + self.query_string
        cgi_environ['SCRIPT_FILENAME'] = self.script
        proc = subprocess.Popen(
            [self.script],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=cgi_environ,
            cwd=os.path.dirname(self.script),
            )
        writer = CGIWriter(environ, start_response)
        if select and sys.platform != 'win32':
            proc_communicate(
                proc,
                stdin=StdinReader.from_environ(environ),
                stdout=writer,
                stderr=environ['wsgi.errors'])
        else:
            stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read())
            if stderr:
                environ['wsgi.errors'].write(stderr)
            writer.write(stdout)
        if not writer.headers_finished:
            start_response(writer.status, writer.headers)
        return []

class CGIWriter(object):

    def __init__(self, environ, start_response):
        self.environ = environ
        self.start_response = start_response
        self.status = '200 OK'
        self.headers = []
        self.headers_finished = False
        self.writer = None
        self.buffer = ''

    def write(self, data):
        if self.headers_finished:
            self.writer(data)
            return
        self.buffer += data
        while '\n' in self.buffer:
            if '\r\n' in self.buffer and self.buffer.find('\r\n') < self.buffer.find('\n'):
                line1, self.buffer = self.buffer.split('\r\n', 1)
            else:
                line1, self.buffer = self.buffer.split('\n', 1)
            if not line1:
                self.headers_finished = True
                self.writer = self.start_response(
                    self.status, self.headers)
                self.writer(self.buffer)
                del self.buffer
                del self.headers
                del self.status
                break
            elif ':' not in line1:
                raise CGIError(
                    "Bad header line: %r" % line1)
            else:
                name, value = line1.split(':', 1)
                value = value.lstrip()
                name = name.strip()
                if name.lower() == 'status':
                    if ' ' not in value:
                        # WSGI requires this space, sometimes CGI scripts don't set it:
                        value = '%s General' % value
                    self.status = value
                else:
                    self.headers.append((name, value))

class StdinReader(object):

    def __init__(self, stdin, content_length):
        self.stdin = stdin
        self.content_length = content_length

    def from_environ(cls, environ):
        length = environ.get('CONTENT_LENGTH')
        if length:
            length = int(length)
        else:
            length = 0
        return cls(environ['wsgi.input'], length)

    from_environ = classmethod(from_environ)

    def read(self, size=None):
        if not self.content_length:
            return ''
        if size is None:
            text = self.stdin.read(self.content_length)
        else:
            text = self.stdin.read(min(self.content_length, size))
        self.content_length -= len(text)
        return text

def proc_communicate(proc, stdin=None, stdout=None, stderr=None):
    """
    Run the given process, piping input/output/errors to the given
    file-like objects (which need not be actual file objects, unlike
    the arguments passed to Popen).  Wait for process to terminate.

    Note: this is taken from the posix version of
    subprocess.Popen.communicate, but made more general through the
    use of file-like objects.
    """
    read_set = []
    write_set = []
    input_buffer = ''
    trans_nl = proc.universal_newlines and hasattr(open, 'newlines')

    if proc.stdin:
        # Flush stdio buffer.  This might block, if the user has
        # been writing to .stdin in an uncontrolled fashion.
        proc.stdin.flush()
        if input:
            write_set.append(proc.stdin)
        else:
            proc.stdin.close()
    else:
        assert stdin is None
    if proc.stdout:
        read_set.append(proc.stdout)
    else:
        assert stdout is None
    if proc.stderr:
        read_set.append(proc.stderr)
    else:
        assert stderr is None

    while read_set or write_set:
        rlist, wlist, xlist = select.select(read_set, write_set, [])

        if proc.stdin in wlist:
            # When select has indicated that the file is writable,
            # we can write up to PIPE_BUF bytes without risk
            # blocking.  POSIX defines PIPE_BUF >= 512
            next, input_buffer = input_buffer, ''
            next_len = 512-len(next)
            if next_len:
                next += stdin.read(next_len)
            if not next:
                proc.stdin.close()
                write_set.remove(proc.stdin)
            else:
                bytes_written = os.write(proc.stdin.fileno(), next)
                if bytes_written < len(next):
                    input_buffer = next[bytes_written:]

        if proc.stdout in rlist:
            data = os.read(proc.stdout.fileno(), 1024)
            if data == "":
                proc.stdout.close()
                read_set.remove(proc.stdout)
            if trans_nl:
                data = proc._translate_newlines(data)
            stdout.write(data)

        if proc.stderr in rlist:
            data = os.read(proc.stderr.fileno(), 1024)
            if data == "":
                proc.stderr.close()
                read_set.remove(proc.stderr)
            if trans_nl:
                data = proc._translate_newlines(data)
            stderr.write(data)

    try:
        proc.wait()
    except OSError, e:
        if e.errno != 10:
            raise

def make_cgi_application(global_conf, script, path=None, include_os_environ=None,
                         query_string=None):
    """
    Paste Deploy interface for :class:`CGIApplication`

    This object acts as a proxy to a CGI application.  You pass in the
    script path (``script``), an optional path to search for the
    script (if the name isn't absolute) (``path``).  If you don't give
    a path, then ``$PATH`` will be used.
    """
    if path is None:
        path = global_conf.get('path') or global_conf.get('PATH')
    include_os_environ = converters.asbool(include_os_environ)
    return CGIApplication(
        None,
        script, path=path, include_os_environ=include_os_environ,
        query_string=query_string)