Commits

Robert Brewer committed 5b94c4a

Fixed a couple tests and cleaned up _compat.py

  • Participants
  • Parent commits 03c5d26

Comments (0)

Files changed (4)

File cheroot/_compat.py

 provides new 'bytestr', 'unicodestr', and 'nativestr' attributes, as well as
 two functions: 'ntob', which translates native strings (of type 'str') into
 byte strings regardless of Python version, and 'ntou', which translates native
-strings to unicode strings. This also provides a 'BytesIO' name for dealing
-specifically with bytes, and a 'StringIO' name for dealing with native strings.
-It also provides a 'base64_decode' function with native strings as input and
-output.
+strings to unicode strings.
 """
 import os
 import re
 except NameError:
     from sets import Set as set
 
-try:
-    # Python 3.1+
-    from base64 import decodebytes as _base64_decodebytes
-except ImportError:
-    # Python 3.0-
-    # since Cheroot claims compability with Python 2.3, we must use
-    # the legacy API of base64
-    from base64 import decodestring as _base64_decodebytes
-
-def base64_decode(n, encoding='ISO-8859-1'):
-    """Return the native string base64-decoded (as a native string)."""
-    if isinstance(n, unicodestr):
-        b = n.encode(encoding)
-    else:
-        b = n
-    b = _base64_decodebytes(b)
-    if nativestr is unicodestr:
-        return b.decode(encoding)
-    else:
-        return b
-
-try:
-    # Python 2.5+
-    from hashlib import md5
-except ImportError:
-    from md5 import new as md5
-
-try:
-    # Python 2.5+
-    from hashlib import sha1 as sha
-except ImportError:
-    from sha import new as sha
-
-try:
-    sorted = sorted
-except NameError:
-    def sorted(i):
-        i = i[:]
-        i.sort()
-        return i
-
-try:
-    reversed = reversed
-except NameError:
-    def reversed(x):
-        i = len(x)
-        while i > 0:
-            i -= 1
-            yield x[i]
-
-try:
-    # Python 3
-    from urllib.parse import urljoin, urlencode
-    from urllib.parse import quote, quote_plus
-    from urllib.request import unquote, urlopen
-    from urllib.request import parse_http_list, parse_keqv_list
-except ImportError:
-    # Python 2
-    from urlparse import urljoin
-    from urllib import urlencode, urlopen
-    from urllib import quote, quote_plus
+if py3k:
+    from urllib.request import urlopen
+    PERCENT = ntob('%')
+    EMPTY = ntob('')
+    def unquote(path):
+        """takes quoted byte string and unquotes % encoded values""" 
+        res = path.split(PERCENT)
+        for i in range(1, len(res)):
+            item = res[i]
+            res[i] = bytes([int(item[:2], 16)]) + item[2:]
+        return EMPTY.join(res)
+else:
+    from urllib import urlopen
     from urllib import unquote
-    from urllib2 import parse_http_list, parse_keqv_list
-
-try:
-    dict.iteritems
-    # Python 2
-    iteritems = lambda d: d.iteritems()
-    copyitems = lambda d: d.items()
-except AttributeError:
-    # Python 3
-    iteritems = lambda d: d.items()
-    copyitems = lambda d: list(d.items())
-
-try:
-    dict.iterkeys
-    # Python 2
-    iterkeys = lambda d: d.iterkeys()
-    copykeys = lambda d: d.keys()
-except AttributeError:
-    # Python 3
-    iterkeys = lambda d: d.keys()
-    copykeys = lambda d: list(d.keys())
-
-try:
-    dict.itervalues
-    # Python 2
-    itervalues = lambda d: d.itervalues()
-    copyvalues = lambda d: d.values()
-except AttributeError:
-    # Python 3
-    itervalues = lambda d: d.values()
-    copyvalues = lambda d: list(d.values())
-
-try:
-    # Python 3
-    import builtins
-except ImportError:
-    # Python 2
-    import __builtin__ as builtins
 
 try:
     # Python 2.
-    from httplib import BadStatusLine, HTTPConnection, HTTPSConnection, IncompleteRead, NotConnected
-    from BaseHTTPServer import BaseHTTPRequestHandler
+    from httplib import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
 except ImportError:
     # Python 3
-    from http.client import BadStatusLine, HTTPConnection, HTTPSConnection, IncompleteRead, NotConnected
-    from http.server import BaseHTTPRequestHandler
+    from http.client import BadStatusLine, HTTPConnection, IncompleteRead, NotConnected
 
 try:
     # Python 2.
         HTTPSConnection = None
 
 try:
-    # Python 2
-    xrange = xrange
-except NameError:
-    # Python 3
-    xrange = range
-
-import threading
-if hasattr(threading.Thread, "daemon"):
-    # Python 2.6+
-    def get_daemon(t):
-        return t.daemon
-    def set_daemon(t, val):
-        t.daemon = val
-else:
-    def get_daemon(t):
-        return t.isDaemon()
-    def set_daemon(t, val):
-        t.setDaemon(val)
-
-try:
     from email.utils import formatdate
     def HTTPDate(timeval=None):
         return formatdate(timeval, usegmt=True)
     from rfc822 import formatdate as HTTPDate
 
 try:
-    # Python 3
-    from urllib.parse import unquote as parse_unquote
-    def unquote_qs(atom, encoding, errors='strict'):
-        return parse_unquote(atom.replace('+', ' '), encoding=encoding, errors=errors)
+    # Python 2.4+
+    from traceback import format_exc
 except ImportError:
-    # Python 2
-    from urllib import unquote as parse_unquote
-    def unquote_qs(atom, encoding, errors='strict'):
-        return parse_unquote(atom.replace('+', ' ')).decode(encoding, errors)
-
-try:
-    # Prefer simplejson, which is usually more advanced than the builtin module.
-    import simplejson as json
-    json_decode = json.JSONDecoder().decode
-    json_encode = json.JSONEncoder().iterencode
-except ImportError:
-    if py3k:
-        # Python 3.0: json is part of the standard library,
-        # but outputs unicode. We need bytes.
-        import json
-        json_decode = json.JSONDecoder().decode
-        _json_encode = json.JSONEncoder().iterencode
-        def json_encode(value):
-            for chunk in _json_encode(value):
-                yield chunk.encode('utf8')
-    elif sys.version_info >= (2, 6):
-        # Python 2.6: json is part of the standard library
-        import json
-        json_decode = json.JSONDecoder().decode
-        json_encode = json.JSONEncoder().iterencode
-    else:
-        json = None
-        def json_decode(s):
-            raise ValueError('No JSON library is available')
-        def json_encode(s):
-            raise ValueError('No JSON library is available')
-
-try:
-    import cPickle as pickle
-except ImportError:
-    # In Python 2, pickle is a Python version.
-    # In Python 3, pickle is the sped-up C version.
-    import pickle
-
-try:
-    os.urandom(20)
-    import binascii
-    def random20():
-        return binascii.hexlify(os.urandom(20)).decode('ascii')
-except (AttributeError, NotImplementedError):
-    import random
-    # os.urandom not available until Python 2.4. Fall back to random.random.
-    def random20():
-        return sha('%s' % random.random()).hexdigest()
-
-try:
-    from _thread import get_ident as get_thread_ident
-except ImportError:
-    from thread import get_ident as get_thread_ident
+    def format_exc(limit=None):
+        """Like print_exc() but return a string. Backport for Python 2.3."""
+        try:
+            etype, value, tb = sys.exc_info()
+            return ''.join(traceback.format_exception(etype, value, tb, limit))
+        finally:
+            etype = value = tb = None
 
 try:
     # Python 3
-    next = next
-except NameError:
+    import email.utils
+    def formatdate():
+        return email.utils.formatdate(usegmt=True).encode('ISO-8859-1')
+except ImportError:
     # Python 2
-    def next(i):
-        return i.next()
+    from rfc822 import formatdate
+

File cheroot/server.py

 
 __all__ = ['HTTPRequest', 'HTTPConnection', 'HTTPServer',
            'SizeCheckWrapper', 'KnownLengthRFile', 'ChunkedRFile',
-           'WorkerThread', 'ThreadPool', 'Gateway',]
+           'WorkerThread', 'ThreadPool', 'Gateway']
 
 from cheroot._compat import bytestr, unicodestr, basestring, ntob, py3k
+from cheroot._compat import formatdate, format_exc, unquote
 
 LF = ntob('\n')
 CRLF = ntob('\r\n')
 COLON = ntob(':')
 SEMICOLON = ntob(';')
 COMMA = ntob(',')
-PERCENT = ntob('%')
 EMPTY = ntob('')
 NUMBER_SIGN = ntob('#')
 QUESTION_MARK = ntob('?')
 except:
     import Queue as queue
 import re
-if py3k:
-    import email.utils
-    def formatdate():
-        return email.utils.formatdate(usegmt=True).encode('ISO-8859-1')
-else:
-    from rfc822 import formatdate
 import socket
 import sys
 if 'win' in sys.platform and not hasattr(socket, 'IPPROTO_IPV6'):
 
 import threading
 import time
-try:
-    from traceback import format_exc
-except ImportError:
-    def format_exc(limit=None):
-        """Like print_exc() but return a string. Backport for Python 2.3."""
-        try:
-            etype, value, tb = sys.exc_info()
-            return ''.join(traceback.format_exception(etype, value, tb, limit))
-        finally:
-            etype = value = tb = None
-
-if py3k:
-    from urllib.parse import urlparse
-    def unquote(path):
-        """takes quoted string and unquotes % encoded values""" 
-        res = path.split(PERCENT)
-        for i in range(1, len(res)):
-            item = res[i]
-            res[i] = bytes([int(item[:2], 16)]) + item[2:]
-        return EMPTY.join(res)
-else:
-    from urllib import unquote
-    from urlparse import urlparse
 import warnings
 
 from cheroot import errors

File cheroot/test/helper.py

 import sys
 import time
 import threading
+import traceback
 
 import cheroot
-from cheroot._compat import basestring, copyitems, HTTPSConnection, ntob
+from cheroot._compat import basestring, format_exc, HTTPSConnection, ntob
 from cheroot import server, wsgi
 from cheroot.test import webtest
 
 class Controller(object):
 
     def __call__(self, environ, start_response):
-        req, resp = Request(environ), Response()
         try:
-            handler = getattr(self, environ["PATH_INFO"].lstrip("/").replace("/", "_"))
-        except AttributeError:
-            resp.status = '404 Not Found'
-        else:
-            output = handler(req, resp)
-            if output is not None:
-                resp.body = output
-                if isinstance(output, basestring):
-                    cl = len(output)
-                elif isinstance(output, (tuple, list)):
-                    cl = sum([len(a) for a in output])
+            req, resp = Request(environ), Response()
+            try:
+                handler = getattr(self, environ["PATH_INFO"].lstrip("/").replace("/", "_"))
+            except AttributeError:
+                resp.status = '404 Not Found'
+            else:
+                output = handler(req, resp)
+                if output is not None:
+                    resp.body = output
+                    if isinstance(output, basestring):
+                        cl = len(output)
+                    elif isinstance(output, (tuple, list)):
+                        cl = sum([len(a) for a in output])
+                    else:
+                        cl = None
+                    if cl is not None:
+                        resp.headers.setdefault('Content-Length', str(cl))
+            h = []
+            for k, v in resp.headers.items():
+                if isinstance(v, (tuple, list)):
+                    for atom in v:
+                        h.append((k, atom))
                 else:
-                    cl = None
-                if cl is not None:
-                    resp.headers.setdefault('Content-Length', str(cl))
-        start_response(resp.status, resp.headers.items())
-        return resp.output()
-
+                    h.append((k, v))
+            start_response(resp.status, h)
+            return resp.output()
+        except:
+            status = "500 Server Error"
+            response_headers = [("Content-Type", "text/plain")]
+            start_response(status, response_headers, sys.exc_info())
+            return format_exc()
 
 # --------------------------- Spawning helpers --------------------------- #
 

File cheroot/test/test_core.py

                 return "bad news"
 
             def header_list(self, req, resp):
-                resp.headers['WWW-Authenticate'] = 'Negotiate'
-                resp.headers['www-authenticate'] = 'Basic realm="foo"'
+                # helper.Controller.__call__ will transform this into
+                # multiple headers with the same name, which is what
+                # we're trying to test
+                resp.headers['WWW-Authenticate'] = [
+                    'Negotiate','Basic realm="foo"']
             
             def commas(self, req, resp):
                 resp.headers['WWW-Authenticate'] = 'Negotiate,Basic realm="foo"'
 
     def test_multiple_headers(self):
         self.getPage('/header_list')
-        print(repr(self.headers))
         self.assertEqual([(k, v) for k, v in self.headers if k == 'WWW-Authenticate'],
                          [('WWW-Authenticate', 'Negotiate'),
                           ('WWW-Authenticate', 'Basic realm="foo"'),
     def test_start_response_error(self):
         self.getPage("/start_response_error")
         self.assertStatus(500)
-        self.assertInBody("TypeError: response.header_list key 2 is not a byte string.")
+        self.assertInBody("TypeError: WSGI response header key 2 is not of type str.")