Commits

Anonymous committed 3e74dcb

Some win/py2.3 fixes

  • Participants
  • Parent commits 251745c

Comments (0)

Files changed (3)

cheroot/_compat.py

-"""Compatibility code for using Cheroot with various versions of Python.
-
-Cheroot 3.3 is compatible with Python versions 2.3+. This module provides a
-useful abstraction over the differences between Python versions, sometimes by
-preferring a newer idiom, sometimes an older one, and sometimes a custom one.
-
-In particular, Python 2 uses str and '' for byte strings, while Python 3
-uses str and '' for unicode strings. We will call each of these the 'native
-string' type for each version. Because of this major difference, this module
-provides new 'bytestr', 'unicodestr', and 'nativestr' attributes, as well as
-two functions: 'ntob', which translates native strings (of type 'str') into
-byte strings regardless of Python version, and 'ntou', which translates native
-strings to unicode strings.
-"""
-import os
-import re
-import sys
-
-if sys.version_info >= (3, 0):
-    py3k = True
-    bytestr = bytes
-    unicodestr = str
-    nativestr = unicodestr
-    basestring = (bytes, str)
-    def ntob(n, encoding='ISO-8859-1'):
-        """Return the given native string as a byte string in the given encoding."""
-        # In Python 3, the native string type is unicode
-        return n.encode(encoding)
-    def ntou(n, encoding='ISO-8859-1'):
-        """Return the given native string as a unicode string with the given encoding."""
-        # In Python 3, the native string type is unicode
-        return n
-    def tonative(n, encoding='ISO-8859-1'):
-        """Return the given string as a native string in the given encoding."""
-        # In Python 3, the native string type is unicode
-        if isinstance(n, bytes):
-            return n.decode(encoding)
-        return n
-    # type("")
-    from io import StringIO
-    # bytes:
-    from io import BytesIO as BytesIO
-else:
-    # Python 2
-    py3k = False
-    bytestr = str
-    unicodestr = unicode
-    nativestr = bytestr
-    basestring = basestring
-    def ntob(n, encoding='ISO-8859-1'):
-        """Return the given native string as a byte string in the given encoding."""
-        # In Python 2, the native string type is bytes. Assume it's already
-        # in the given encoding, which for ISO-8859-1 is almost always what
-        # was intended.
-        return n
-    def ntou(n, encoding='ISO-8859-1'):
-        """Return the given native string as a unicode string with the given encoding."""
-        # In Python 2, the native string type is bytes.
-        # First, check for the special encoding 'escape'. The test suite uses this
-        # to signal that it wants to pass a string with embedded \uXXXX escapes,
-        # but without having to prefix it with u'' for Python 2, but no prefix
-        # for Python 3.
-        if encoding == 'escape':
-            return unicode(
-                re.sub(r'\\u([0-9a-zA-Z]{4})',
-                       lambda m: unichr(int(m.group(1), 16)),
-                       n.decode('ISO-8859-1')))
-        # Assume it's already in the given encoding, which for ISO-8859-1 is almost
-        # always what was intended.
-        return n.decode(encoding)
-    def tonative(n, encoding='ISO-8859-1'):
-        """Return the given string as a native string in the given encoding."""
-        # In Python 2, the native string type is bytes.
-        if isinstance(n, unicode):
-            return n.encode(encoding)
-        return n
-    try:
-        # type("")
-        from cStringIO import StringIO
-    except ImportError:
-        # type("")
-        from StringIO import StringIO
-    # bytes:
-    BytesIO = StringIO
-
-try:
-    set = set
-except NameError:
-    from sets import Set as set
-
-if py3k:
-    from urllib.request import urlopen
-    PERCENT = ntob('%')
-    EMPTY = ntob('')
-    def unquote(path):
-        """takes quoted byte string and unquotes % encoded values""" 
-        res = path.split(PERCENT)
-        for i in range(1, len(res)):
-            item = res[i]
-            res[i] = bytes([int(item[:2], 16)]) + item[2:]
-        return EMPTY.join(res)
-else:
-    from urllib import urlopen
-    from urllib import unquote
-
-try:
-    # Python 2.
-    from httplib import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
-    from BaseHTTPServer import BaseHTTPRequestHandler
-except ImportError:
-    # Python 3
-    from http.client import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
-    from http.server import BaseHTTPRequestHandler
-
-try:
-    # Python 2.
-    from httplib import HTTPSConnection
-except ImportError:
-    try:
-        # Python 3
-        from http.client import HTTPSConnection
-    except ImportError:
-        # Some platforms which don't have SSL don't expose HTTPSConnection
-        HTTPSConnection = None
-
-try:
-    from email.utils import formatdate
-    def HTTPDate(timeval=None):
-        return formatdate(timeval, usegmt=True).encode('ISO-8859-1')
-except ImportError:
-    from rfc822 import formatdate as HTTPDate
-
-try:
-    # Python 2.4+
-    from traceback import format_exc
-except ImportError:
-    def format_exc(limit=None):
-        """Like print_exc() but return a string. Backport for Python 2.3."""
-        try:
-            etype, value, tb = sys.exc_info()
-            return ''.join(traceback.format_exception(etype, value, tb, limit))
-        finally:
-            etype = value = tb = None
-
-
+"""Compatibility code for using Cheroot with various versions of Python.
+
+Cheroot 3.3 is compatible with Python versions 2.3+. This module provides a
+useful abstraction over the differences between Python versions, sometimes by
+preferring a newer idiom, sometimes an older one, and sometimes a custom one.
+
+In particular, Python 2 uses str and '' for byte strings, while Python 3
+uses str and '' for unicode strings. We will call each of these the 'native
+string' type for each version. Because of this major difference, this module
+provides new 'bytestr', 'unicodestr', and 'nativestr' attributes, as well as
+two functions: 'ntob', which translates native strings (of type 'str') into
+byte strings regardless of Python version, and 'ntou', which translates native
+strings to unicode strings.
+"""
+import os
+import re
+import sys
+
+if sys.version_info >= (3, 0):
+    py3k = True
+    bytestr = bytes
+    unicodestr = str
+    nativestr = unicodestr
+    basestring = (bytes, str)
+    def ntob(n, encoding='ISO-8859-1'):
+        """Return the given native string as a byte string in the given encoding."""
+        # In Python 3, the native string type is unicode
+        return n.encode(encoding)
+    def ntou(n, encoding='ISO-8859-1'):
+        """Return the given native string as a unicode string with the given encoding."""
+        # In Python 3, the native string type is unicode
+        return n
+    def tonative(n, encoding='ISO-8859-1'):
+        """Return the given string as a native string in the given encoding."""
+        # In Python 3, the native string type is unicode
+        if isinstance(n, bytes):
+            return n.decode(encoding)
+        return n
+    # type("")
+    from io import StringIO
+    # bytes:
+    from io import BytesIO as BytesIO
+else:
+    # Python 2
+    py3k = False
+    bytestr = str
+    unicodestr = unicode
+    nativestr = bytestr
+    basestring = basestring
+    def ntob(n, encoding='ISO-8859-1'):
+        """Return the given native string as a byte string in the given encoding."""
+        # In Python 2, the native string type is bytes. Assume it's already
+        # in the given encoding, which for ISO-8859-1 is almost always what
+        # was intended.
+        return n
+    def ntou(n, encoding='ISO-8859-1'):
+        """Return the given native string as a unicode string with the given encoding."""
+        # In Python 2, the native string type is bytes.
+        # First, check for the special encoding 'escape'. The test suite uses this
+        # to signal that it wants to pass a string with embedded \uXXXX escapes,
+        # but without having to prefix it with u'' for Python 2, but no prefix
+        # for Python 3.
+        if encoding == 'escape':
+            return unicode(
+                re.sub(r'\\u([0-9a-zA-Z]{4})',
+                       lambda m: unichr(int(m.group(1), 16)),
+                       n.decode('ISO-8859-1')))
+        # Assume it's already in the given encoding, which for ISO-8859-1 is almost
+        # always what was intended.
+        return n.decode(encoding)
+    def tonative(n, encoding='ISO-8859-1'):
+        """Return the given string as a native string in the given encoding."""
+        # In Python 2, the native string type is bytes.
+        if isinstance(n, unicode):
+            return n.encode(encoding)
+        return n
+    try:
+        # type("")
+        from cStringIO import StringIO
+    except ImportError:
+        # type("")
+        from StringIO import StringIO
+    # bytes:
+    BytesIO = StringIO
+
+try:
+    set = set
+except NameError:
+    from sets import Set as set
+
+if py3k:
+    from urllib.request import urlopen
+    PERCENT = ntob('%')
+    EMPTY = ntob('')
+    def unquote(path):
+        """takes quoted byte string and unquotes % encoded values""" 
+        res = path.split(PERCENT)
+        for i in range(1, len(res)):
+            item = res[i]
+            res[i] = bytes([int(item[:2], 16)]) + item[2:]
+        return EMPTY.join(res)
+else:
+    from urllib import urlopen
+    from urllib import unquote
+
+try:
+    # Python 2.
+    from httplib import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
+    from BaseHTTPServer import BaseHTTPRequestHandler
+except ImportError:
+    # Python 3
+    from http.client import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
+    from http.server import BaseHTTPRequestHandler
+
+try:
+    # Python 2.
+    from httplib import HTTPSConnection
+except ImportError:
+    try:
+        # Python 3
+        from http.client import HTTPSConnection
+    except ImportError:
+        # Some platforms which don't have SSL don't expose HTTPSConnection
+        HTTPSConnection = None
+
+try:
+    from email.utils import formatdate
+    def HTTPDate(timeval=None):
+        return formatdate(timeval, usegmt=True).encode('ISO-8859-1')
+except ImportError:
+    from rfc822 import formatdate as HTTPDate
+
+try:
+    # Python 2.4+
+    from traceback import format_exc
+except ImportError:
+    import traceback
+    def format_exc(limit=None):
+        """Like print_exc() but return a string. Backport for Python 2.3."""
+        try:
+            etype, value, tb = sys.exc_info()
+            return ''.join(traceback.format_exception(etype, value, tb, limit))
+        finally:
+            etype = value = tb = None
+
+

cheroot/test/test_core.py

-"""Basic tests for the Cheroot server: request handling."""
-
-import socket
-import time
-
-from cheroot._compat import HTTPConnection, HTTPSConnection, ntob, tonative
-from cheroot.test import helper, webtest
-
-
-class CoreRequestHandlingTest(helper.CherootWebCase):
-
-    def setup_server(cls):
-        class Root(helper.Controller):
-
-            def hello(self, req, resp):
-                return "hello"
-
-            def echo(self, req, resp):
-                output = req.environ['wsgi.input'].read()
-                return output.decode("ISO-8859-1")
-
-            def echo_lines(self, req, resp):
-                f = req.environ['wsgi.input']
-
-                output = []
-                while True:
-                    line = f.readline().decode("ISO-8859-1")
-                    if not line:
-                        break
-                    output.append(line)
-
-                if hasattr(f, 'read_trailer_lines'):
-                    for line in f.read_trailer_lines():
-                        k, v = line.split(ntob(":"), 1)
-                        k = tonative(k.strip())
-                        v = tonative(v.strip())
-                        resp.headers[k] = v
-
-                return output
-
-            def normal(self, req, resp):
-                return "normal"
-            
-            def blank(self, req, resp):
-                resp.status = ""
-                return ""
-            
-            # According to RFC 2616, new status codes are OK as long as they
-            # are between 100 and 599.
-            
-            # Here is an illegal code...
-            def illegal(self, req, resp):
-                resp.status = '781'
-                return "oops"
-            
-            # ...and here is an unknown but legal code.
-            def unknown(self, req, resp):
-                resp.status = "431 My custom error"
-                return "funky"
-            
-            # Non-numeric code
-            def bad(self, req, resp):
-                resp.status = "error"
-                return "bad news"
-
-            def header_list(self, req, resp):
-                # helper.Controller.__call__ will transform this into
-                # multiple headers with the same name, which is what
-                # we're trying to test
-                resp.headers['WWW-Authenticate'] = [
-                    'Negotiate','Basic realm="foo"']
-                return ""
-            
-            def commas(self, req, resp):
-                resp.headers['WWW-Authenticate'] = 'Negotiate,Basic realm="foo"'
-                return ""
-
-            def start_response_error(self, req, resp):
-                resp.headers[2] = 3
-                return "salud!"
-
-        cls.httpserver.wsgi_app = Root()
-        cls.httpserver.max_request_body_size = 1000
-    setup_server = classmethod(setup_server)
-
-    def test_status_normal(self):
-        self.getPage("/normal")
-        self.assertBody('normal')
-        self.assertStatus(200)
-
-    def test_status_blank(self):
-        self.getPage("/blank")
-        self.assertStatus(200)
-
-    def test_status_illegal(self):
-        self.getPage("/illegal")
-        self.assertStatus(500)
-        self.assertInBody(
-            "Illegal response status from server (781 is out of range).")
-
-    def test_status_unknown(self):
-        self.getPage("/unknown")
-        self.assertBody('funky')
-        self.assertStatus(431)
-
-    def test_status_syntax_error(self):
-        self.getPage("/bad")
-        self.assertStatus(500)
-        self.assertStatus(500)
-        self.assertInBody(
-            "Illegal response status from server (%s is non-numeric)." %
-            (repr(ntob('error'))))
-
-    def test_multiple_headers(self):
-        self.getPage('/header_list')
-        self.assertEqual([(k, v) for k, v in self.headers if k == 'WWW-Authenticate'],
-                         [('WWW-Authenticate', 'Negotiate'),
-                          ('WWW-Authenticate', 'Basic realm="foo"'),
-                          ])
-        self.getPage('/commas')
-        self.assertHeader('WWW-Authenticate', 'Negotiate,Basic realm="foo"')
-
-    def test_start_response_error(self):
-        self.getPage("/start_response_error")
-        self.assertStatus(500)
-        self.assertInBody("TypeError: WSGI response header key 2 is not of type str.")
-
-    def test_max_body(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo")
-        body = ntob("x" * 1001)
-        c.putheader("Content-Length", len(body))
-        c.endheaders()
-        c.send(body)
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(413)
-        self.assertBody(
-            "The entity sent with the request exceeds "
-            "the maximum allowed bytes.")
-
-    def test_request_payload(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo")
-        body = ntob("I am a request body")
-        c.putheader("Content-Length", len(body))
-        c.endheaders()
-        c.send(body)
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(200)
-        self.assertBody(body)
-
-    def test_request_payload_readline(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo_lines")
-        body = ntob("I am a\nrequest body")
-        c.putheader("Content-Length", len(body))
-        c.endheaders()
-        c.send(body)
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(200)
-        self.assertBody(body)
-
-    def test_chunked_request_payload(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo")
-        c.putheader("Transfer-Encoding", "chunked")
-        c.endheaders()
-        c.send(ntob("13\r\nI am a request body\r\n0\r\n\r\n"))
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(200)
-        self.assertBody("I am a request body")
-
-    def test_chunked_request_payload_readline(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo_lines")
-        c.putheader("Transfer-Encoding", "chunked")
-        c.endheaders()
-        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n\r\n"))
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(200)
-        self.assertBody("I am a\nrequest body")
-
-    def test_chunked_request_payload_trailer(self):
-        if self.scheme == "https":
-            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
-        else:
-            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("POST", "/echo_lines")
-        c.putheader("Transfer-Encoding", "chunked")
-        c.endheaders()
-        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n"
-                    "Content-Type: application/json\r\n\r\n"))
-        response = c.getresponse()
-        self.status, self.headers, self.body = webtest.shb(response)
-        c.close()
-        self.assertStatus(200)
-        self.assertBody("I am a\nrequest body")
-        self.assertHeader("Content-Type", "application/json")
-
-
-class ServerInterruptTest(helper.CherootWebCase):
-
-    trap_kbint = True
-
-    def setup_server(cls):
-        class Root(helper.Controller):
-
-            def hello(self, req, resp):
-                return "hello"
-
-            def kbint(self, req, resp):
-                cls.httpserver.interrupt = KeyboardInterrupt()
-                return "hello"
-
-        cls.httpserver.wsgi_app = Root()
-    setup_server = classmethod(setup_server)
-
-    def test_kbint(self):
-        self.getPage("/kbint")
-        # Note that our request thread will complete normally even though
-        # the server is shutting down, which is *usually* a nice thing
-        # but not always.
-        self.assertStatus(200)
-        self.assertBody("hello")
-        # Give the server accept() thread time to shut down
-        time.sleep(1)
-        self.assertInLog("Keyboard Interrupt: shutting down")
-
-
-class UnixDomainSocketTest(helper.CherootWebCase):
-
-    config = {"bind_addr": "/tmp/cheroot_test"}
-
-    def setup_server(cls):
-        class Root(helper.Controller):
-
-            def hello(self, req, resp):
-                return "hello"
-
-        cls.httpserver.wsgi_app = Root()
-    setup_server = classmethod(setup_server)
-
-    def test_normal(self):
-        self.getPage("/hello")
-        self.assertBody('hello')
-        self.assertStatus(200)
-
-
-class SSLTest(helper.CherootWebCase):
-
-    def setup_server(cls):
-        class Root(helper.Controller):
-
-            def hello(self, req, resp):
-                return "hello"
-
-        cls.httpserver.wsgi_app = Root()
-        cls.httpserver.ssl_adapter = helper.get_default_ssl_adapter()
-        cls.HTTP_CONN = HTTPSConnection
-        cls.scheme = 'https'
-
-    setup_server = classmethod(setup_server)
-
-    def test_normal(self):
-        self.getPage("/hello")
-        self.assertBody('hello')
-        self.assertStatus(200)
-
-    def test_http_to_https(self):
-        # Test what happens when a client tries to speak HTTP to an HTTPS server
-        msg = ("The client sent a plain HTTP request, but this "
-               "server only speaks HTTPS on this port.")
-
-        c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c.putrequest("GET", "/hello")
-        c.endheaders()
-        try:
-            response = c.getresponse()
-        except socket.error:
-            pass
-        else:
-            self.status, self.headers, self.body = webtest.shb(response)
-            c.close()
-            self.assertStatus(400)
-            self.assertBody(msg)
-        self.assertInLog(msg)
-
+"""Basic tests for the Cheroot server: request handling."""
+
+import socket
+import time
+
+from cheroot._compat import HTTPConnection, HTTPSConnection, ntob, tonative
+from cheroot.test import helper, webtest
+
+
+class CoreRequestHandlingTest(helper.CherootWebCase):
+
+    def setup_server(cls):
+        class Root(helper.Controller):
+
+            def hello(self, req, resp):
+                return "hello"
+
+            def echo(self, req, resp):
+                output = req.environ['wsgi.input'].read()
+                return output.decode("ISO-8859-1")
+
+            def echo_lines(self, req, resp):
+                f = req.environ['wsgi.input']
+
+                output = []
+                while True:
+                    line = f.readline().decode("ISO-8859-1")
+                    if not line:
+                        break
+                    output.append(line)
+
+                if hasattr(f, 'read_trailer_lines'):
+                    for line in f.read_trailer_lines():
+                        k, v = line.split(ntob(":"), 1)
+                        k = tonative(k.strip())
+                        v = tonative(v.strip())
+                        resp.headers[k] = v
+
+                return output
+
+            def normal(self, req, resp):
+                return "normal"
+            
+            def blank(self, req, resp):
+                resp.status = ""
+                return ""
+            
+            # According to RFC 2616, new status codes are OK as long as they
+            # are between 100 and 599.
+            
+            # Here is an illegal code...
+            def illegal(self, req, resp):
+                resp.status = '781'
+                return "oops"
+            
+            # ...and here is an unknown but legal code.
+            def unknown(self, req, resp):
+                resp.status = "431 My custom error"
+                return "funky"
+            
+            # Non-numeric code
+            def bad(self, req, resp):
+                resp.status = "error"
+                return "bad news"
+
+            def header_list(self, req, resp):
+                # helper.Controller.__call__ will transform this into
+                # multiple headers with the same name, which is what
+                # we're trying to test
+                resp.headers['WWW-Authenticate'] = [
+                    'Negotiate','Basic realm="foo"']
+                return ""
+            
+            def commas(self, req, resp):
+                resp.headers['WWW-Authenticate'] = 'Negotiate,Basic realm="foo"'
+                return ""
+
+            def start_response_error(self, req, resp):
+                resp.headers[2] = 3
+                return "salud!"
+
+        cls.httpserver.wsgi_app = Root()
+        cls.httpserver.max_request_body_size = 1000
+    setup_server = classmethod(setup_server)
+
+    def test_status_normal(self):
+        self.getPage("/normal")
+        self.assertBody('normal')
+        self.assertStatus(200)
+
+    def test_status_blank(self):
+        self.getPage("/blank")
+        self.assertStatus(200)
+
+    def test_status_illegal(self):
+        self.getPage("/illegal")
+        self.assertStatus(500)
+        self.assertInBody(
+            "Illegal response status from server (781 is out of range).")
+
+    def test_status_unknown(self):
+        self.getPage("/unknown")
+        self.assertBody('funky')
+        self.assertStatus(431)
+
+    def test_status_syntax_error(self):
+        self.getPage("/bad")
+        self.assertStatus(500)
+        self.assertStatus(500)
+        self.assertInBody(
+            "Illegal response status from server (%s is non-numeric)." %
+            (repr(ntob('error'))))
+
+    def test_multiple_headers(self):
+        self.getPage('/header_list')
+        self.assertEqual([(k, v) for k, v in self.headers if k == 'WWW-Authenticate'],
+                         [('WWW-Authenticate', 'Negotiate'),
+                          ('WWW-Authenticate', 'Basic realm="foo"'),
+                          ])
+        self.getPage('/commas')
+        self.assertHeader('WWW-Authenticate', 'Negotiate,Basic realm="foo"')
+
+    def test_start_response_error(self):
+        self.getPage("/start_response_error")
+        self.assertStatus(500)
+        self.assertInBody("TypeError: WSGI response header key 2 is not of type str.")
+
+    def test_max_body(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo")
+        body = ntob("x" * 1001)
+        c.putheader("Content-Length", len(body))
+        c.endheaders()
+        c.send(body)
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(413)
+        self.assertBody(
+            "The entity sent with the request exceeds "
+            "the maximum allowed bytes.")
+
+    def test_request_payload(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo")
+        body = ntob("I am a request body")
+        c.putheader("Content-Length", len(body))
+        c.endheaders()
+        c.send(body)
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(200)
+        self.assertBody(body)
+
+    def test_request_payload_readline(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo_lines")
+        body = ntob("I am a\nrequest body")
+        c.putheader("Content-Length", len(body))
+        c.endheaders()
+        c.send(body)
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(200)
+        self.assertBody(body)
+
+    def test_chunked_request_payload(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo")
+        c.putheader("Transfer-Encoding", "chunked")
+        c.endheaders()
+        c.send(ntob("13\r\nI am a request body\r\n0\r\n\r\n"))
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(200)
+        self.assertBody("I am a request body")
+
+    def test_chunked_request_payload_readline(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo_lines")
+        c.putheader("Transfer-Encoding", "chunked")
+        c.endheaders()
+        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n\r\n"))
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(200)
+        self.assertBody("I am a\nrequest body")
+
+    def test_chunked_request_payload_trailer(self):
+        if self.scheme == "https":
+            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
+        else:
+            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("POST", "/echo_lines")
+        c.putheader("Transfer-Encoding", "chunked")
+        c.endheaders()
+        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n"
+                    "Content-Type: application/json\r\n\r\n"))
+        response = c.getresponse()
+        self.status, self.headers, self.body = webtest.shb(response)
+        c.close()
+        self.assertStatus(200)
+        self.assertBody("I am a\nrequest body")
+        self.assertHeader("Content-Type", "application/json")
+
+
+class ServerInterruptTest(helper.CherootWebCase):
+
+    trap_kbint = True
+
+    def setup_server(cls):
+        class Root(helper.Controller):
+
+            def hello(self, req, resp):
+                return "hello"
+
+            def kbint(self, req, resp):
+                cls.httpserver.interrupt = KeyboardInterrupt()
+                return "hello"
+
+        cls.httpserver.wsgi_app = Root()
+    setup_server = classmethod(setup_server)
+
+    def test_kbint(self):
+        self.getPage("/kbint")
+        # Note that our request thread will complete normally even though
+        # the server is shutting down, which is *usually* a nice thing
+        # but not always.
+        self.assertStatus(200)
+        self.assertBody("hello")
+        # Give the server accept() thread time to shut down
+        time.sleep(1)
+        self.assertInLog("Keyboard Interrupt: shutting down")
+
+
+if hasattr(socket, "AF_UNIX"):
+    class UnixDomainSocketTest(helper.CherootWebCase):
+
+        config = {"bind_addr": "/tmp/cheroot_test"}
+
+        def setup_server(cls):
+            class Root(helper.Controller):
+
+                def hello(self, req, resp):
+                    return "hello"
+
+            cls.httpserver.wsgi_app = Root()
+        setup_server = classmethod(setup_server)
+
+        def test_normal(self):
+            self.getPage("/hello")
+            self.assertBody('hello')
+            self.assertStatus(200)
+
+
+class SSLTest(helper.CherootWebCase):
+
+    def setup_server(cls):
+        class Root(helper.Controller):
+
+            def hello(self, req, resp):
+                return "hello"
+
+        cls.httpserver.wsgi_app = Root()
+        cls.httpserver.ssl_adapter = helper.get_default_ssl_adapter()
+        cls.HTTP_CONN = HTTPSConnection
+        cls.scheme = 'https'
+
+    setup_server = classmethod(setup_server)
+
+    def test_normal(self):
+        self.getPage("/hello")
+        self.assertBody('hello')
+        self.assertStatus(200)
+
+    def test_http_to_https(self):
+        # Test what happens when a client tries to speak HTTP to an HTTPS server
+        msg = ("The client sent a plain HTTP request, but this "
+               "server only speaks HTTPS on this port.")
+
+        c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
+        c.putrequest("GET", "/hello")
+        c.endheaders()
+        try:
+            response = c.getresponse()
+        except socket.error:
+            pass
+        else:
+            self.status, self.headers, self.body = webtest.shb(response)
+            c.close()
+            self.assertStatus(400)
+            self.assertBody(msg)
+        self.assertInLog(msg)
+
-"""WSGI gateways for the Cheroot HTTP server."""
-
-import sys
-
-from cheroot.server import HTTPServer, Gateway
-from cheroot._compat import basestring, ntob, ntou, tonative, py3k, unicodestr
-
-
-class WSGIServer(HTTPServer):
-    """A subclass of HTTPServer which calls a WSGI application."""
-    
-    def __init__(self, bind_addr, gateway=None, **kwargs):
-        self.wsgi_app = kwargs.pop("wsgi_app", None)
-        if gateway is None:
-            gateway = WSGIGateway_10
-        HTTPServer.__init__(self, bind_addr, gateway=gateway, **kwargs)
-
-
-class WSGIGateway(Gateway):
-    """A base class to interface HTTPServer with WSGI."""
-    
-    def __init__(self, req):
-        self.req = req
-        self.started_response = False
-        self.env = self.get_environ()
-        self.remaining_bytes_out = None
-    
-    def get_environ(self):
-        """Return a new environ dict targeting the given wsgi.version"""
-        raise NotImplemented
-
-    def respond(self):
-        """Process the current request."""
-        response = self.req.server.wsgi_app(self.env, self.start_response)
-        try:
-            for chunk in response:
-                # "The start_response callable must not actually transmit
-                # the response headers. Instead, it must store them for the
-                # server or gateway to transmit only after the first
-                # iteration of the application return value that yields
-                # a NON-EMPTY string, or upon the application's first
-                # invocation of the write() callable." (PEP 333)
-                if chunk:
-                    if isinstance(chunk, unicodestr):
-                        chunk = chunk.encode('ISO-8859-1')
-                    self.write(chunk)
-        finally:
-            if hasattr(response, "close"):
-                response.close()
-
-    def start_response(self, status, headers, exc_info=None):
-        """WSGI callable to begin the HTTP response."""
-        # "The application may call start_response more than once,
-        # if and only if the exc_info argument is provided."
-        if self.started_response and not exc_info:
-            raise AssertionError("WSGI start_response called a second "
-                                 "time with no exc_info.")
-        self.started_response = True
-        
-        # "if exc_info is provided, and the HTTP headers have already been
-        # sent, start_response must raise an error, and should raise the
-        # exc_info tuple."
-        if (exc_info is not None) and self.req.sent_headers:
-            try:
-                if py3k:
-                    raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
-                else:
-                    raise (exc_info[0], exc_info[1], exc_info[2])
-            finally:
-                exc_info = None
-
-        # According to PEP 3333, when using Python 3, the response status
-        # and headers must be bytes masquerading as unicode; that is, they
-        # must be of type "str" but are restricted to code points in the
-        # "latin-1" set.
-        if not isinstance(status, str):
-            raise TypeError("WSGI response status is not of type str.")
-        self.req.status = ntob(status)
-
-        for k, v in headers:
-            if not isinstance(k, str):
-                raise TypeError(
-                    "WSGI response header key %s is not of type str." %
-                    repr(k))
-            if not isinstance(v, str):
-                raise TypeError(
-                    "WSGI response header value %s is not of type str." %
-                    repr(v))
-            if k.lower() == 'content-length':
-                self.remaining_bytes_out = int(v)
-            self.req.outheaders.append((ntob(k), ntob(v)))
-        
-        return self.write
-
-    def write(self, chunk):
-        """WSGI callable to write unbuffered data to the client.
-        
-        This method is also used internally by start_response (to write
-        data from the iterable returned by the WSGI application).
-        """
-        if not self.started_response:
-            raise AssertionError("WSGI write called before start_response.")
-        
-        chunklen = len(chunk)
-        rbo = self.remaining_bytes_out
-        if rbo is not None and chunklen > rbo:
-            if not self.req.sent_headers:
-                # Whew. We can send a 500 to the client.
-                self.req.simple_response("500 Internal Server Error",
-                    "The requested resource returned more bytes than the "
-                    "declared Content-Length.")
-            else:
-                # Dang. We have probably already sent data. Truncate the chunk
-                # to fit (so the client doesn't hang) and raise an error later.
-                chunk = chunk[:rbo]
-        
-        if not self.req.sent_headers:
-            self.req.sent_headers = True
-            self.req.send_headers()
-        
-        if self.req.allow_message_body:
-            self.req.write(chunk)
-        
-        if rbo is not None:
-            rbo -= chunklen
-            if rbo < 0:
-                raise ValueError(
-                    "Response body exceeds the declared Content-Length.")
-
-
-class WSGIGateway_10(WSGIGateway):
-    """A Gateway class to interface HTTPServer with WSGI 1.0.x."""
-    
-    def get_environ(self):
-        """Return a new environ dict targeting the given wsgi.version"""
-        req = self.req
-        env = {
-            # set a non-standard environ entry so the WSGI app can know what
-            # the *real* server protocol is (and what features to support).
-            # See http://www.faqs.org/rfcs/rfc2145.html.
-            'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
-            'PATH_INFO': tonative(req.path),
-            'QUERY_STRING': tonative(req.qs),
-            'REMOTE_ADDR': req.conn.remote_addr or '',
-            'REMOTE_PORT': str(req.conn.remote_port or ''),
-            'REQUEST_METHOD': tonative(req.method),
-            'REQUEST_URI': req.uri,
-            'SCRIPT_NAME': '',
-            'SERVER_NAME': req.server.server_name,
-            # Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
-            'SERVER_PROTOCOL': tonative(req.request_protocol),
-            'SERVER_SOFTWARE': req.server.software,
-            'wsgi.errors': sys.stderr,
-            'wsgi.input': req.rfile,
-            'wsgi.multiprocess': False,
-            'wsgi.multithread': True,
-            'wsgi.run_once': False,
-            'wsgi.url_scheme': tonative(req.scheme),
-            'wsgi.version': (1, 0),
-            }
-        
-        if isinstance(req.server.bind_addr, basestring):
-            # AF_UNIX. This isn't really allowed by WSGI, which doesn't
-            # address unix domain sockets. But it's better than nothing.
-            env["SERVER_PORT"] = ""
-        else:
-            env["SERVER_PORT"] = str(req.server.bind_addr[1])
-        
-        # Request headers
-        for k, v in req.inheaders.items():
-            k = tonative(k).upper().replace("-", "_")
-            env["HTTP_" + k] = tonative(v)
-        
-        # CONTENT_TYPE/CONTENT_LENGTH
-        ct = env.pop("HTTP_CONTENT_TYPE", None)
-        if ct is not None:
-            env["CONTENT_TYPE"] = ct
-        cl = env.pop("HTTP_CONTENT_LENGTH", None)
-        if cl is not None:
-            env["CONTENT_LENGTH"] = cl
-        
-        if req.conn.ssl_env:
-            env.update(req.conn.ssl_env)
-        
-        return env
-
-
-class WSGIGateway_u0(WSGIGateway_10):
-    """A Gateway class to interface HTTPServer with WSGI u.0.
-    
-    WSGI u.0 is an experimental protocol, which uses unicode for keys and values
-    in both Python 2 and Python 3.
-    """
-    
-    def get_environ(self):
-        """Return a new environ dict targeting the given wsgi.version"""
-        req = self.req
-        env_10 = WSGIGateway_10.get_environ(self)
-        if py3k:
-            env = env_10.copy()
-        else:
-            env = dict([(k.decode('ISO-8859-1'), v) for k, v in env_10.iteritems()])
-        env[ntou('wsgi.version')] = ('u', 0)
-        
-        # Request-URI
-        env.setdefault(ntou('wsgi.url_encoding'), ntou('utf-8'))
-        try:
-            if py3k:
-                for key in ["PATH_INFO", "SCRIPT_NAME", "QUERY_STRING"]:
-                    # Re-encode since our "decoded" string is just
-                    # bytes masquerading as unicode via Latin-1
-                    val = env_10[key].encode('ISO-8859-1')
-                    # ...now decode according to the configured encoding
-                    env[key] = val.decode(env['wsgi.url_encoding'])
-            else:
-                # SCRIPT_NAME is the empty string, who cares what encoding it is?
-                env["PATH_INFO"] = req.path.decode(env['wsgi.url_encoding'])
-                env["QUERY_STRING"] = req.qs.decode(env['wsgi.url_encoding'])
-        except UnicodeDecodeError:
-            # Fall back to latin 1 so apps can transcode if needed.
-            env[ntou('wsgi.url_encoding')] = ntou('ISO-8859-1')
-            for key in [ntou("PATH_INFO"), ntou("SCRIPT_NAME"), ntou("QUERY_STRING")]:
-                if py3k:
-                    env[key] = env_10[key]
-                else:
-                    env[key] = env_10[str(key)].decode(env['wsgi.url_encoding'])
-
-        if not py3k:
-            for k, v in sorted(env.items()):
-                if isinstance(v, str) and k not in ('REQUEST_URI', 'wsgi.input'):
-                    env[k] = ntou(v)
-        
-        return env
-
-
-class WSGIPathInfoDispatcher(object):
-    """A WSGI dispatcher for dispatch based on the PATH_INFO.
-    
-    apps: a dict or list of (path_prefix, app) pairs.
-    """
-    
-    def __init__(self, apps):
-        try:
-            apps = list(apps.items())
-        except AttributeError:
-            pass
-        
-        # Sort the apps by len(path), descending
-        if py3k:
-            apps.sort()
-        else:
-            apps.sort(cmp=lambda x,y: cmp(len(x[0]), len(y[0])))
-        apps.reverse()
-        
-        # The path_prefix strings must start, but not end, with a slash.
-        # Use "" instead of "/".
-        self.apps = [(p.rstrip("/"), a) for p, a in apps]
-    
-    def __call__(self, environ, start_response):
-        path = environ["PATH_INFO"] or "/"
-        for p, app in self.apps:
-            # The apps list should be sorted by length, descending.
-            if path.startswith(p + "/") or path == p:
-                environ = environ.copy()
-                environ["SCRIPT_NAME"] = environ["SCRIPT_NAME"] + p
-                environ["PATH_INFO"] = path[len(p):]
-                return app(environ, start_response)
-        
-        start_response('404 Not Found', [('Content-Type', 'text/plain'),
-                                         ('Content-Length', '0')])
-        return ['']
-
+"""WSGI gateways for the Cheroot HTTP server."""
+
+import sys
+
+from cheroot.server import HTTPServer, Gateway
+from cheroot._compat import basestring, ntob, ntou, tonative, py3k, unicodestr
+
+
+class WSGIServer(HTTPServer):
+    """A subclass of HTTPServer which calls a WSGI application."""
+    
+    def __init__(self, bind_addr, gateway=None, **kwargs):
+        self.wsgi_app = kwargs.pop("wsgi_app", None)
+        if gateway is None:
+            gateway = WSGIGateway_10
+        HTTPServer.__init__(self, bind_addr, gateway=gateway, **kwargs)
+
+
+class WSGIGateway(Gateway):
+    """A base class to interface HTTPServer with WSGI."""
+    
+    def __init__(self, req):
+        self.req = req
+        self.started_response = False
+        self.env = self.get_environ()
+        self.remaining_bytes_out = None
+    
+    def get_environ(self):
+        """Return a new environ dict targeting the given wsgi.version"""
+        raise NotImplemented
+
+    def respond(self):
+        """Process the current request."""
+        response = self.req.server.wsgi_app(self.env, self.start_response)
+        try:
+            for chunk in response:
+                # "The start_response callable must not actually transmit
+                # the response headers. Instead, it must store them for the
+                # server or gateway to transmit only after the first
+                # iteration of the application return value that yields
+                # a NON-EMPTY string, or upon the application's first
+                # invocation of the write() callable." (PEP 333)
+                if chunk:
+                    if isinstance(chunk, unicodestr):
+                        chunk = chunk.encode('ISO-8859-1')
+                    self.write(chunk)
+        finally:
+            if hasattr(response, "close"):
+                response.close()
+
+    def start_response(self, status, headers, exc_info=None):
+        """WSGI callable to begin the HTTP response."""
+        # "The application may call start_response more than once,
+        # if and only if the exc_info argument is provided."
+        if self.started_response and not exc_info:
+            raise AssertionError("WSGI start_response called a second "
+                                 "time with no exc_info.")
+        self.started_response = True
+        
+        # "if exc_info is provided, and the HTTP headers have already been
+        # sent, start_response must raise an error, and should raise the
+        # exc_info tuple."
+        if (exc_info is not None) and self.req.sent_headers:
+            try:
+                if py3k:
+                    raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
+                else:
+                    raise (exc_info[0], exc_info[1], exc_info[2])
+            finally:
+                exc_info = None
+
+        # According to PEP 3333, when using Python 3, the response status
+        # and headers must be bytes masquerading as unicode; that is, they
+        # must be of type "str" but are restricted to code points in the
+        # "latin-1" set.
+        if not isinstance(status, str):
+            raise TypeError("WSGI response status is not of type str.")
+        self.req.status = ntob(status)
+
+        for k, v in headers:
+            if not isinstance(k, str):
+                raise TypeError(
+                    "WSGI response header key %s is not of type str." %
+                    repr(k))
+            if not isinstance(v, str):
+                raise TypeError(
+                    "WSGI response header value %s is not of type str." %
+                    repr(v))
+            if k.lower() == 'content-length':
+                self.remaining_bytes_out = int(v)
+            self.req.outheaders.append((ntob(k), ntob(v)))
+        
+        return self.write
+
+    def write(self, chunk):
+        """WSGI callable to write unbuffered data to the client.
+        
+        This method is also used internally by start_response (to write
+        data from the iterable returned by the WSGI application).
+        """
+        if not self.started_response:
+            raise AssertionError("WSGI write called before start_response.")
+        
+        chunklen = len(chunk)
+        rbo = self.remaining_bytes_out
+        if rbo is not None and chunklen > rbo:
+            if not self.req.sent_headers:
+                # Whew. We can send a 500 to the client.
+                self.req.simple_response("500 Internal Server Error",
+                    "The requested resource returned more bytes than the "
+                    "declared Content-Length.")
+            else:
+                # Dang. We have probably already sent data. Truncate the chunk
+                # to fit (so the client doesn't hang) and raise an error later.
+                chunk = chunk[:rbo]
+        
+        if not self.req.sent_headers:
+            self.req.sent_headers = True
+            self.req.send_headers()
+        
+        if self.req.allow_message_body:
+            self.req.write(chunk)
+        
+        if rbo is not None:
+            rbo -= chunklen
+            if rbo < 0:
+                raise ValueError(
+                    "Response body exceeds the declared Content-Length.")
+
+
+class WSGIGateway_10(WSGIGateway):
+    """A Gateway class to interface HTTPServer with WSGI 1.0.x."""
+    
+    def get_environ(self):
+        """Return a new environ dict targeting the given wsgi.version"""
+        req = self.req
+        env = {
+            # set a non-standard environ entry so the WSGI app can know what
+            # the *real* server protocol is (and what features to support).
+            # See http://www.faqs.org/rfcs/rfc2145.html.
+            'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
+            'PATH_INFO': tonative(req.path),
+            'QUERY_STRING': tonative(req.qs),
+            'REMOTE_ADDR': req.conn.remote_addr or '',
+            'REMOTE_PORT': str(req.conn.remote_port or ''),
+            'REQUEST_METHOD': tonative(req.method),
+            'REQUEST_URI': req.uri,
+            'SCRIPT_NAME': '',
+            'SERVER_NAME': req.server.server_name,
+            # Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
+            'SERVER_PROTOCOL': tonative(req.request_protocol),
+            'SERVER_SOFTWARE': req.server.software,
+            'wsgi.errors': sys.stderr,
+            'wsgi.input': req.rfile,
+            'wsgi.multiprocess': False,
+            'wsgi.multithread': True,
+            'wsgi.run_once': False,
+            'wsgi.url_scheme': tonative(req.scheme),
+            'wsgi.version': (1, 0),
+            }
+        
+        if isinstance(req.server.bind_addr, basestring):
+            # AF_UNIX. This isn't really allowed by WSGI, which doesn't
+            # address unix domain sockets. But it's better than nothing.
+            env["SERVER_PORT"] = ""
+        else:
+            env["SERVER_PORT"] = str(req.server.bind_addr[1])
+        
+        # Request headers
+        for k, v in req.inheaders.items():
+            k = tonative(k).upper().replace("-", "_")
+            env["HTTP_" + k] = tonative(v)
+        
+        # CONTENT_TYPE/CONTENT_LENGTH
+        ct = env.pop("HTTP_CONTENT_TYPE", None)
+        if ct is not None:
+            env["CONTENT_TYPE"] = ct
+        cl = env.pop("HTTP_CONTENT_LENGTH", None)
+        if cl is not None:
+            env["CONTENT_LENGTH"] = cl
+        
+        if req.conn.ssl_env:
+            env.update(req.conn.ssl_env)
+        
+        return env
+
+
+class WSGIGateway_u0(WSGIGateway_10):
+    """A Gateway class to interface HTTPServer with WSGI u.0.
+    
+    WSGI u.0 is an experimental protocol, which uses unicode for keys and values
+    in both Python 2 and Python 3.
+    """
+    
+    def get_environ(self):
+        """Return a new environ dict targeting the given wsgi.version"""
+        req = self.req
+        env_10 = WSGIGateway_10.get_environ(self)
+        if py3k:
+            env = env_10.copy()
+        else:
+            env = dict([(k.decode('ISO-8859-1'), v) for k, v in env_10.iteritems()])
+        env[ntou('wsgi.version')] = ('u', 0)
+        
+        # Request-URI
+        env.setdefault(ntou('wsgi.url_encoding'), ntou('utf-8'))
+        try:
+            if py3k:
+                for key in ["PATH_INFO", "SCRIPT_NAME", "QUERY_STRING"]:
+                    # Re-encode since our "decoded" string is just
+                    # bytes masquerading as unicode via Latin-1
+                    val = env_10[key].encode('ISO-8859-1')
+                    # ...now decode according to the configured encoding
+                    env[key] = val.decode(env['wsgi.url_encoding'])
+            else:
+                # SCRIPT_NAME is the empty string, who cares what encoding it is?
+                env["PATH_INFO"] = req.path.decode(env['wsgi.url_encoding'])
+                env["QUERY_STRING"] = req.qs.decode(env['wsgi.url_encoding'])
+        except UnicodeDecodeError:
+            # Fall back to latin 1 so apps can transcode if needed.
+            env[ntou('wsgi.url_encoding')] = ntou('ISO-8859-1')
+            for key in [ntou("PATH_INFO"), ntou("SCRIPT_NAME"), ntou("QUERY_STRING")]:
+                if py3k:
+                    env[key] = env_10[key]
+                else:
+                    env[key] = env_10[str(key)].decode(env['wsgi.url_encoding'])
+
+        if not py3k:
+            for k, v in sorted(env.items()):
+                if isinstance(v, str) and k not in ('REQUEST_URI', 'wsgi.input'):
+                    env[k] = ntou(v)
+        
+        return env
+
+
+class WSGIPathInfoDispatcher(object):
+    """A WSGI dispatcher for dispatch based on the PATH_INFO.
+    
+    apps: a dict or list of (path_prefix, app) pairs.
+    """
+    
+    def __init__(self, apps):
+        try:
+            apps = list(apps.items())
+        except AttributeError:
+            pass
+        
+        # Sort the apps by len(path), descending
+        if py3k:
+            apps.sort()
+        else:
+            apps.sort(lambda x,y: cmp(len(x[0]), len(y[0])))
+        apps.reverse()
+        
+        # The path_prefix strings must start, but not end, with a slash.
+        # Use "" instead of "/".
+        self.apps = [(p.rstrip("/"), a) for p, a in apps]
+    
+    def __call__(self, environ, start_response):
+        path = environ["PATH_INFO"] or "/"
+        for p, app in self.apps:
+            # The apps list should be sorted by length, descending.
+            if path.startswith(p + "/") or path == p:
+                environ = environ.copy()
+                environ["SCRIPT_NAME"] = environ["SCRIPT_NAME"] + p
+                environ["PATH_INFO"] = path[len(p):]
+                return app(environ, start_response)
+        
+        start_response('404 Not Found', [('Content-Type', 'text/plain'),
+                                         ('Content-Length', '0')])
+        return ['']
+