cpython-withatomic / Lib / CGIHTTPServer.py

The branch 'legacy-trunk' does not exist.
"""CGI-savvy HTTP Server.

This module builds on SimpleHTTPServer by implementing GET and POST
requests to cgi-bin scripts.

"""


__version__ = "0.3"


import os
import sys
import time
import socket
import string
import urllib
import BaseHTTPServer
import SimpleHTTPServer


class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):

    """Complete HTTP server with GET, HEAD and POST commands.

    GET and HEAD also support running CGI scripts.

    The POST command is *only* implemented for CGI scripts.

    """

    def do_POST(self):
	"""Serve a POST request.

	This is only implemented for CGI scripts.

	"""

	if self.is_cgi():
	    self.run_cgi()
	else:
	    self.send_error(501, "Can only POST to CGI scripts")

    def send_head(self):
	"""Version of send_head that support CGI scripts"""
	if self.is_cgi():
	    return self.run_cgi()
	else:
	    return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)

    def is_cgi(self):
	"""test whether PATH corresponds to a CGI script.

	Return a tuple (dir, rest) if PATH requires running a
	CGI script, None if not.  Note that rest begins with a
	slash if it is not empty.

	The default implementation tests whether the path
	begins with one of the strings in the list
	self.cgi_directories (and the next character is a '/'
	or the end of the string).

	"""

	path = self.path

	for x in self.cgi_directories:
	    i = len(x)
	    if path[:i] == x and (not path[i:] or path[i] == '/'):
		self.cgi_info = path[:i], path[i+1:]
		return 1
	return 0

    cgi_directories = ['/cgi-bin', '/htbin']

    def run_cgi(self):
	"""Execute a CGI script."""
	dir, rest = self.cgi_info
	i = string.rfind(rest, '?')
	if i >= 0:
	    rest, query = rest[:i], rest[i+1:]
	else:
	    query = ''
	i = string.find(rest, '/')
	if i >= 0:
	    script, rest = rest[:i], rest[i:]
	else:
	    script, rest = rest, ''
	scriptname = dir + '/' + script
	scriptfile = self.translate_path(scriptname)
	if not os.path.exists(scriptfile):
	    self.send_error(404, "No such CGI script (%s)" % `scriptname`)
	    return
	if not os.path.isfile(scriptfile):
	    self.send_error(403, "CGI script is not a plain file (%s)" %
			    `scriptname`)
	    return
	if not executable(scriptfile):
	    self.send_error(403, "CGI script is not executable (%s)" %
			    `scriptname`)
	    return
	nobody = nobody_uid()
	self.send_response(200, "Script output follows")
	self.wfile.flush() # Always flush before forking
	pid = os.fork()
	if pid != 0:
	    # Parent
	    pid, sts = os.waitpid(pid, 0)
	    if sts:
		self.log_error("CGI script exit status x%x" % sts)
	    return
	# Child
	try:
	    # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
	    # XXX Much of the following could be prepared ahead of time!
	    env = {}
	    env['SERVER_SOFTWARE'] = self.version_string()
	    env['SERVER_NAME'] = self.server.server_name
	    env['GATEWAY_INTERFACE'] = 'CGI/1.1'
	    env['SERVER_PROTOCOL'] = self.protocol_version
	    env['SERVER_PORT'] = str(self.server.server_port)
	    env['REQUEST_METHOD'] = self.command
	    uqrest = urllib.unquote(rest)
	    env['PATH_INFO'] = uqrest
	    env['PATH_TRANSLATED'] = self.translate_path(uqrest)
	    env['SCRIPT_NAME'] = scriptname
	    if query:
		env['QUERY_STRING'] = query
	    host = self.address_string()
	    if host != self.client_address[0]:
		env['REMOTE_HOST'] = host
	    env['REMOTE_ADDR'] = self.client_address[0]
	    # AUTH_TYPE
	    # REMOTE_USER
	    # REMOTE_IDENT
	    env['CONTENT_TYPE'] = self.headers.type
	    length = self.headers.getheader('content-length')
	    if length:
		env['CONTENT_LENGTH'] = length
	    accept = []
	    for line in self.headers.getallmatchingheaders('accept'):
		if line[:1] in string.whitespace:
		    accept.append(string.strip(line))
		else:
		    accept = accept + string.split(line[7:])
	    env['HTTP_ACCEPT'] = string.joinfields(accept, ',')
	    ua = self.headers.getheader('user-agent')
	    if ua:
		env['HTTP_USER_AGENT'] = ua
	    # XXX Other HTTP_* headers
	    import regsub
	    decoded_query = regsub.gsub('+', ' ', query)
	    try:
		os.setuid(nobody)
	    except os.error:
		pass
	    os.dup2(self.rfile.fileno(), 0)
	    os.dup2(self.wfile.fileno(), 1)
	    print scriptfile, script, decoded_query
	    os.execve(scriptfile,
		      [script, decoded_query],
		      env)
	except:
	    self.server.handle_error(self.request, self.client_address)
	    os._exit(127)


nobody = None

def nobody_uid():
    """Internal routine to get nobody's uid"""
    global nobody
    if nobody:
	return nobody
    import pwd
    try:
	nobody = pwd.getpwnam('nobody')[2]
    except pwd.error:
	nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
    return nobody


def executable(path):
    """Test for executable file."""
    try:
	st = os.stat(path)
    except os.error:
	return 0
    return st[0] & 0111 != 0


def test(HandlerClass = CGIHTTPRequestHandler,
	 ServerClass = BaseHTTPServer.HTTPServer):
    import sys
    if sys.argv[1:2] == ['-r']:
	db = MyArchive()
	db.regenindices()
	return
    SimpleHTTPServer.test(HandlerClass, ServerClass)


if __name__ == '__main__':
    test()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.