Commits

Gustavo Picon committed 363cb91

Turned a big chunk of ntob/ntou calls into native b’’/u’’

Comments (0)

Files changed (10)

cheroot/compat.py

 
 if py3k:
     from urllib.request import urlopen
-    PERCENT = ntob('%')
-    EMPTY = ntob('')
+    PERCENT = b'%'
+    EMPTY = b''
 
     def unquote(path):
         """takes quoted byte string and unquotes % encoded values"""

cheroot/server.py

                        'request due to a temporary overloading or '
                        'maintenance of the server.')
 
-LF = ntob('\n')
-CRLF = ntob('\r\n')
-TAB = ntob('\t')
-SPACE = ntob(' ')
-COLON = ntob(':')
-SEMICOLON = ntob(';')
-COMMA = ntob(',')
-EMPTY = ntob('')
-NUMBER_SIGN = ntob('#')
-QUESTION_MARK = ntob('?')
-ASTERISK = ntob('*')
-FORWARD_SLASH = ntob('/')
+LF = b'\n'
+CRLF = b'\r\n'
+TAB = b'\t'
+SPACE = b' '
+COLON = b':'
+SEMICOLON = b';'
+COMMA = b','
+EMPTY = b''
+NUMBER_SIGN = b'#'
+QUESTION_MARK = b'?'
+ASTERISK = b'*'
+FORWARD_SLASH = b'/'
 
 import os
 import re
     else:
         wfile.write(output)
 
-quoted_slash = re.compile(ntob("(?i)%2F"))
+quoted_slash = re.compile(b"(?i)%2F")
 
 comma_separated_headers = [
     ntob(h) for h in
         if k in comma_separated_headers:
             existing = hdict.get(hname)
             if existing:
-                v = ntob(", ").join((existing, v))
+                v = b", ".join((existing, v))
         hdict[hname] = v
 
     return hdict
 
         self.ready = False
         self.started_request = False
-        self.scheme = ntob("http")
+        self.scheme = b"http"
         if self.server.ssl_adapter is not None:
-            self.scheme = ntob("https")
+            self.scheme = b"https"
         # Use the lowest-common protocol in case read_request_line errors.
         self.response_protocol = 'HTTP/1.0'
         self.inheaders = {}
 
     def _set_status(self, value):
         if not value:
-            value = ntob("200")
+            value = b"200"
 
         if not isinstance(value, bytestr):
             value = ntob(str(value))
             ex = sys.exc_info()[1]
             self.simple_response("400 Bad Request", ex.args[0])
             return False
-        path = ntob("%2F").join(atoms)
+        path = b"%2F".join(atoms)
         self.path = path
 
         # Note that, like wsgiref and most other HTTP servers,
             return False
 
         mrbs = self.server.max_request_body_size
-        if mrbs and int(self.inheaders.get(ntob("Content-Length"), 0)) > mrbs:
+        if mrbs and int(self.inheaders.get(b"Content-Length", 0)) > mrbs:
             self.simple_response(
                 "413 Request Entity Too Large",
                 "The entity sent with the request exceeds the maximum "
         # Persistent connection support
         if self.response_protocol == "HTTP/1.1":
             # Both server and client are HTTP/1.1
-            if self.inheaders.get(ntob("Connection"), EMPTY) == ntob("close"):
+            if self.inheaders.get(b"Connection", EMPTY) == b"close":
                 self.close_connection = True
         else:
             # Either the server or client (or both) are HTTP/1.0
-            if (
-                    self.inheaders.get(ntob("Connection"), EMPTY) !=
-                    ntob("Keep-Alive")
-            ):
+            if self.inheaders.get(b"Connection", EMPTY) != b"Keep-Alive":
                 self.close_connection = True
 
         # Transfer-Encoding support
         te = None
         if self.response_protocol == "HTTP/1.1":
-            te = self.inheaders.get(ntob("Transfer-Encoding"))
+            te = self.inheaders.get(b"Transfer-Encoding")
             if te:
                 te = [x.strip().lower() for x in te.split(COMMA) if x.strip()]
 
 
         if te:
             for enc in te:
-                if enc == ntob("chunked"):
+                if enc == b"chunked":
                     self.chunked_read = True
                 else:
                     # Note that, even if we see "chunked", we must reject
         #
         # We used to do 3, but are now doing 1. Maybe we'll do 2 someday,
         # but it seems like it would be a big slowdown for such a rare case.
-        if self.inheaders.get(ntob("Expect"), EMPTY) == ntob("100-continue"):
+        if self.inheaders.get(b"Expect", EMPTY) == b"100-continue":
             # Don't use simple_response here, because it emits headers
             # we don't want.
             # See http://www.bitbucket.org/cherrypy/cherrypy/issue/951
-            msg = ntob(self.server.protocol, 'ascii') + \
-                ntob(" 100 Continue\r\n\r\n")
+            msg = ntob(self.server.protocol, 'ascii') + b" 100 Continue\r\n\r\n"
             try:
                 write(self.conn.wfile, msg)
             except socket.error:
         if uri == ASTERISK:
             return None, None, uri
 
-        i = uri.find(ntob('://'))
+        i = uri.find(b'://')
         if i > 0 and QUESTION_MARK not in uri[:i]:
             # An absoluteURI.
             # If there's a scheme (and it must be http or https), then:
         if self.chunked_read:
             self.rfile = ChunkedRFile(self.conn.rfile, mrbs)
         else:
-            cl = int(self.inheaders.get(ntob("Content-Length"), 0))
+            cl = int(self.inheaders.get(b"Content-Length", 0))
             if mrbs and mrbs < cl:
                 if not self.sent_headers:
                     self.simple_response(
                 self.close_connection = True
                 return
         if self.chunked_write:
-            write(self.conn.wfile, ntob("0\r\n\r\n"))
+            write(self.conn.wfile, b"0\r\n\r\n")
 
     def simple_response(self, status, msg=""):
         """Write a simple response back to the client."""
         buf = [ntob(self.server.protocol, "ascii") + SPACE +
                ntob(status, "ISO-8859-1") + CRLF,
                ntob("Content-Length: %s\r\n" % len(msg), "ISO-8859-1"),
-               ntob("Content-Type: text/plain\r\n")]
+               b"Content-Type: text/plain\r\n"]
 
         if status[:3] in ("413", "414"):
             # Request Entity Too Large / Request-URI Too Long
                 # This will not be true for 414, since read_request_line
                 # usually raises 414 before reading the whole line, and we
                 # therefore cannot know the proper response_protocol.
-                buf.append(ntob("Connection: close\r\n"))
+                buf.append(b"Connection: close\r\n")
             else:
                 # HTTP/1.0 had no 413/414 status nor Connection header.
                 # Emit 400 instead and trust the message body is enough.
         # include a message-body." So no point chunking.
         if status < 200 or status in (204, 205, 304):
             self.outheaders = [(k, v) for k, v in self.outheaders
-                               if k.lower() != ntob('content-length')]
+                               if k.lower() != b'content-length']
             self.allow_message_body = False
-        elif ntob("content-length") not in hkeys:
+        elif b"content-length" not in hkeys:
             if (self.response_protocol == 'HTTP/1.1'
-                    and self.method != ntob('HEAD')):
+                    and self.method != b'HEAD'):
                 # Use the chunked transfer-coding
                 self.chunked_write = True
-                self.outheaders.append(
-                    (ntob("Transfer-Encoding"), ntob("chunked")))
+                self.outheaders.append((b"Transfer-Encoding", b"chunked"))
             else:
                 # Closing the conn is the only way to determine len.
                 self.close_connection = True
 
-        if ntob("connection") not in hkeys:
+        if b"connection" not in hkeys:
             if self.response_protocol == 'HTTP/1.1':
                 # Both server and client are HTTP/1.1 or better
                 if self.close_connection:
-                    self.outheaders.append((ntob("Connection"), ntob("close")))
+                    self.outheaders.append((b"Connection", b"close"))
             else:
                 # Server and/or client are HTTP/1.0
                 if not self.close_connection:
                     self.outheaders.append(
-                        (ntob("Connection"), ntob("Keep-Alive")))
+                        (b"Connection", b"Keep-Alive"))
 
         if (not self.close_connection) and (not self.chunked_read):
             # Read any remaining request body data on the socket.
             if remaining > 0:
                 self.rfile.read(remaining)
 
-        if ntob("date") not in hkeys:
-            self.outheaders.append((ntob("Date"), HTTPDate()))
+        if b"date" not in hkeys:
+            self.outheaders.append((b"Date", HTTPDate()))
 
-        if ntob("server") not in hkeys:
+        if b"server" not in hkeys:
             self.outheaders.append(
-                (ntob("Server"), ntob(self.server.server_name)))
+                (b"Server", ntob(self.server.server_name)))
 
         buf = [ntob(self.server.protocol, 'ascii')
                + SPACE + self.status + CRLF]

cheroot/test/benchmark.py

 import traceback
 
 import cherrypy
-from cherrypy._cpcompat import ntob
 from cherrypy import _cperror, _cpmodpy
 from cheroot._cpcompat import HTTPDate
 
 
     parse_patterns = [
         ('complete_requests', 'Completed',
-         ntob(r'^Complete requests:\s*(\d+)')),
+         br'^Complete requests:\s*(\d+)'),
         ('failed_requests', 'Failed',
-         ntob(r'^Failed requests:\s*(\d+)')),
+         br'^Failed requests:\s*(\d+)'),
         ('requests_per_second', 'req/sec',
-         ntob(r'^Requests per second:\s*([0-9.]+)')),
+         br'^Requests per second:\s*([0-9.]+)'),
         ('time_per_request_concurrent', 'msec/req',
-         ntob(
-             r'^Time per request:\s*([0-9.]+).*concurrent requests\)$')),
-        ('transfer_rate', 'KB/sec',
-         ntob(r'^Transfer rate:\s*([0-9.]+)')),
+         br'^Time per request:\s*([0-9.]+).*concurrent requests\)$'),
+        ('transfer_rate', 'KB/sec', br'^Transfer rate:\s*([0-9.]+)')
     ]
 
     def __init__(self, path=SCRIPT_NAME + "/hello", requests=1000,

cheroot/test/test_conn.py

         conn = self.HTTP_CONN
         conn.auto_open = False
         conn.connect()
-        conn.send(ntob('GET /hello HTTP/1.1'))
+        conn.send(b'GET /hello HTTP/1.1')
         conn.send(("Host: %s" % self.HOST).encode('ascii'))
 
         # Wait for our socket timeout
         self.assertBody(str(timeout))
 
         # Make a second request on the same socket
-        conn._output(ntob('GET /hello HTTP/1.1'))
+        conn._output(b'GET /hello HTTP/1.1')
         conn._output(ntob("Host: %s" % self.HOST, 'ascii'))
         conn._send_output()
         response = conn.response_class(conn.sock, method="GET")
         time.sleep(timeout * 2)
 
         # Make another request on the same socket, which should error
-        conn._output(ntob('GET /hello HTTP/1.1'))
+        conn._output(b'GET /hello HTTP/1.1')
         conn._output(ntob("Host: %s" % self.HOST, 'ascii'))
         conn._send_output()
         response = conn.response_class(conn.sock, method="GET")
 
         # Make another request on the same socket,
         # but timeout on the headers
-        conn.send(ntob('GET /hello HTTP/1.1'))
+        conn.send(b'GET /hello HTTP/1.1')
         # Wait for our socket timeout
         time.sleep(timeout * 2)
         response = conn.response_class(conn.sock, method="GET")
             response.begin()
             body = response.read(13)
             self.assertEqual(response.status, 200)
-            self.assertEqual(body, ntob("Hello, world!"))
+            self.assertEqual(body, b"Hello, world!")
 
         # Retrieve final response
         response = conn.response_class(conn.sock, method="GET")
         response.begin()
         body = response.read()
         self.assertEqual(response.status, 200)
-        self.assertEqual(body, ntob("Hello, world!"))
+        self.assertEqual(body, b"Hello, world!")
 
         conn.close()
 
         conn.putheader("Content-Type", "text/plain")
         conn.putheader("Content-Length", "4")
         conn.endheaders()
-        conn.send(ntob("d'oh"))
+        conn.send(b"d'oh")
         response = conn.response_class(conn.sock, method="POST")
         version, status, reason = response._read_status()
         self.assertNotEqual(status, 100)
                 break
 
         # ...send the body
-        body = ntob("I am a small file")
+        body = b"I am a small file"
         conn.send(body)
 
         # ...get the final response
                     break
 
             # ...send the body
-            conn.send(ntob("x" * 1000))
+            conn.send(b"x" * 1000)
 
             # ...get the final response
             response.begin()
             self.assertStatus(500)
 
             # Now try a working page with an Expect header...
-            conn._output(ntob('POST /upload HTTP/1.1'))
+            conn._output(b'POST /upload HTTP/1.1')
             conn._output(ntob("Host: %s" % self.HOST, 'ascii'))
-            conn._output(ntob("Content-Type: text/plain"))
-            conn._output(ntob("Content-Length: 17"))
-            conn._output(ntob("Expect: 100-continue"))
+            conn._output(b"Content-Type: text/plain")
+            conn._output(b"Content-Length: 17")
+            conn._output(b"Expect: 100-continue")
             conn._send_output()
             response = conn.response_class(conn.sock, method="POST")
 
                     break
 
             # ...send the body
-            body = ntob("I am a small file")
+            body = b"I am a small file"
             conn.send(body)
 
             # ...get the final response
         conn = self.HTTP_CONN
 
         # Try a normal chunked request (with extensions)
-        body = ntob("8;key=value\r\nxx\r\nxxxx\r\n5\r\nyyyyy\r\n0\r\n"
-                    "Content-Type: application/json\r\n"
-                    "\r\n")
+        body = (b"8;key=value\r\nxx\r\nxxxx\r\n5\r\nyyyyy\r\n0\r\n"
+                b"Content-Type: application/json\r\n"
+                b"\r\n")
         conn.putrequest("POST", "/upload", skip_host=True)
         conn.putheader("Host", self.HOST)
         conn.putheader("Transfer-Encoding", "chunked")
         response = conn.getresponse()
         self.status, self.headers, self.body = webtest.shb(response)
         self.assertStatus('200 OK')
-        self.assertBody("thanks for '%s'" % ntob('xx\r\nxxxxyyyyy'))
+        self.assertBody("thanks for '%s'" % b'xx\r\nxxxxyyyyy')
 
         # Try a chunked request that exceeds server.max_request_body_size.
         # Note that the delimiters and trailer are included.
-        body = ntob("3e3\r\n" + ("x" * 995) + "\r\n0\r\n\r\n")
+        body = b"3e3\r\n" + (b"x" * 995) + b"\r\n0\r\n\r\n"
         conn.putrequest("POST", "/upload", skip_host=True)
         conn.putheader("Host", self.HOST)
         conn.putheader("Transfer-Encoding", "chunked")
             remaining -= len(data)
 
         self.assertEqual(len(buf), 1024 * 1024)
-        self.assertEqual(buf, ntob("a" * 1024 * 1024))
+        self.assertEqual(buf, b"a" * 1024 * 1024)
         self.assertEqual(remaining, 0)
         remote_data_conn.close()
 
         self.persistent = True
 
         conn = self.HTTP_CONN
-        conn.send(ntob('GET /hello HTTP/1.1\n\n'))
+        conn.send(b'GET /hello HTTP/1.1\n\n')
         response = conn.response_class(conn.sock, method="GET")
         response.begin()
         self.body = response.read()
         conn.close()
 
         conn.connect()
-        conn.send(ntob('GET /hello HTTP/1.1\r\n\n'))
+        conn.send(b'GET /hello HTTP/1.1\r\n\n')
         response = conn.response_class(conn.sock, method="GET")
         response.begin()
         self.body = response.read()

cheroot/test/test_core.py

 import socket
 import time
 
-from cheroot.compat import HTTPConnection, HTTPSConnection, ntob, tonative
+from cheroot.compat import HTTPConnection, HTTPSConnection, tonative
 from cheroot.test import helper, webtest
 
 
 
                 if hasattr(f, 'read_trailer_lines'):
                     for line in f.read_trailer_lines():
-                        k, v = line.split(ntob(":"), 1)
+                        k, v = line.split(b":", 1)
                         k = tonative(k.strip())
                         v = tonative(v.strip())
                         resp.headers[k] = v
         self.assertStatus(500)
         self.assertInBody(
             "Illegal response status from server (%s is non-numeric)." %
-            (repr(ntob('error'))))
+            (repr(b'error')))
 
     def test_multiple_headers(self):
         self.getPage('/header_list')
         else:
             c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
         c.putrequest("POST", "/echo")
-        body = ntob("x" * 1001)
+        body = b"x" * 1001
         c.putheader("Content-Length", len(body))
         c.endheaders()
         c.send(body)
         else:
             c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
         c.putrequest("POST", "/echo")
-        body = ntob("I am a request body")
+        body = b"I am a request body"
         c.putheader("Content-Length", len(body))
         c.endheaders()
         c.send(body)
         else:
             c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
         c.putrequest("POST", "/echo_lines")
-        body = ntob("I am a\nrequest body")
+        body = b"I am a\nrequest body"
         c.putheader("Content-Length", len(body))
         c.endheaders()
         c.send(body)
         c.putrequest("POST", "/echo")
         c.putheader("Transfer-Encoding", "chunked")
         c.endheaders()
-        c.send(ntob("13\r\nI am a request body\r\n0\r\n\r\n"))
+        c.send(b"13\r\nI am a request body\r\n0\r\n\r\n")
         response = c.getresponse()
         self.status, self.headers, self.body = webtest.shb(response)
         c.close()
         c.putrequest("POST", "/echo_lines")
         c.putheader("Transfer-Encoding", "chunked")
         c.endheaders()
-        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n\r\n"))
+        c.send(b"13\r\nI am a\nrequest body\r\n0\r\n\r\n")
         response = c.getresponse()
         self.status, self.headers, self.body = webtest.shb(response)
         c.close()
         c.putrequest("POST", "/echo_lines")
         c.putheader("Transfer-Encoding", "chunked")
         c.endheaders()
-        c.send(ntob("13\r\nI am a\nrequest body\r\n0\r\n"
-                    "Content-Type: application/json\r\n\r\n"))
+        c.send(b"13\r\nI am a\nrequest body\r\n0\r\n"
+               b"Content-Type: application/json\r\n\r\n")
         response = c.getresponse()
         self.status, self.headers, self.body = webtest.shb(response)
         c.close()

cheroot/test/test_http.py

 import sys
 
 import cheroot
-from cheroot.compat import HTTPConnection, HTTPSConnection, ntob, py3k
+from cheroot.compat import HTTPConnection, HTTPSConnection, py3k
 from cheroot import wsgi
 
 from cheroot.test import helper
     def test_normal_request(self):
         self.getPage("/hello")
         self.assertStatus(200)
-        self.assertBody(ntob('Hello world!'))
+        self.assertBody(b'Hello world!')
 
     def test_no_content_length(self):
         # "The presence of a message-body in a request is signaled by the
         self.body = response.fp.read()
         self.status = str(response.status)
         self.assertStatus(200)
-        self.assertBody(ntob('Hello world!'))
+        self.assertBody(b'Hello world!')
 
     def test_content_length_required(self):
         # Now send a message that has no Content-Length, but does send a body.
             c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
         else:
             c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c._output(ntob('GET /'))
+        c._output(b'GET /')
         c._send_output()
         if hasattr(c, 'strict'):
             response = c.response_class(c.sock, strict=c.strict, method='GET')
             response = c.response_class(c.sock, method='GET')
         response.begin()
         self.assertEqual(response.status, 400)
-        self.assertEqual(response.fp.read(22), ntob("Malformed Request-Line"))
+        self.assertEqual(response.fp.read(22), b"Malformed Request-Line")
         c.close()
 
     def test_malformed_header(self):
         c.putrequest('GET', '/')
         c.putheader('Content-Type', 'text/plain')
         # See http://www.bitbucket.org/cherrypy/cherrypy/issue/941
-        c._output(ntob('Re, 1.2.3.4#015#012'))
+        c._output(b'Re, 1.2.3.4#015#012')
         c.endheaders()
 
         response = c.getresponse()
     def test_garbage_in(self):
         # Connect without SSL regardless of server.scheme
         c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
-        c._output(ntob('gjkgjklsgjklsgjkljklsg'))
+        c._output(b'gjkgjklsgjklsgjkljklsg')
         c._send_output()
         response = c.response_class(c.sock, method="GET")
         try:
             response.begin()
             self.assertEqual(response.status, 400)
-            self.assertEqual(response.fp.read(22),
-                             ntob("Malformed Request-Line"))
+            self.assertEqual(response.fp.read(22), b"Malformed Request-Line")
             c.close()
         except socket.error:
             e = sys.exc_info()[1]

cheroot/test/test_wsgi.py

 import sys
 
-from cheroot.compat import ntob
 from cheroot.test import helper
 from cheroot import wsgi
 
             response_headers = [('Content-type', 'text/plain'),
                                 ('Content-Length', '11')]
             start_response(status, response_headers)
-            return [ntob('Hello world')]
+            return [b'Hello world']
 
         def foo(environ, start_response):
             status = '200 OK'
             start_response(status, response_headers)
             # This should fail according to the WSGI spec.
             start_response(status, response_headers)
-            return [ntob('Hello world')]
+            return [b'Hello world']
 
         def bar(environ, start_response):
             status = '200 OK'
             response_headers = [('Content-type', 'text/plain'),
                                 ('Content-Length', '3')]
             write = start_response(status, response_headers)
-            write(ntob('boo'))
+            write(b'boo')
             # This should fail according to the WSGI spec.
             try:
                 noname
             except NameError:
                 start_response(status, response_headers, sys.exc_info())
-            return [ntob('Hello world')]
+            return [b'Hello world']
 
         def baz(environ, start_response):
             status = 200
             response_headers = [('Content-type', 'text/plain')]
             # This should fail because status is not a str
             start_response(status, response_headers)
-            return [ntob('Hello world')]
+            return [b'Hello world']
 
         def qoph(environ, start_response):
             status = '200 OK'
             response_headers = [('Content-type', 5)]
             # This should fail because the response header value is not a str
             start_response(status, response_headers)
-            return [ntob('Hello world')]
+            return [b'Hello world']
 
         cls.httpserver.wsgi_app = wsgi.WSGIPathInfoDispatcher({
             '/foo': foo,

cheroot/test/test_wsgiapps.py

             status = '200 OK'
             response_headers = [('Content-type', 'text/plain')]
             start_response(status, response_headers)
-            return [
-                ntob('Hello'), ntob(''), ntob(' '), ntob(''), ntob('world')]
+            return [b'Hello', b'', b' ', b'', b'world']
 
         class WSGIResponse(object):
 

cheroot/test/webtest.py

 
                     self._method = method
                     if not url:
-                        url = ntob('/')
-                    request = ntob(' ').join(
+                        url = b'/'
+                    request = b' '.join(
                         (
                             method.encode("ASCII"),
                             url,
         req = self.req
         env_10 = WSGIGateway_10.get_environ(self)
         env = env_10.copy()
-        env[ntou('wsgi.version')] = ('u', 0)
+        env[u'wsgi.version'] = ('u', 0)
 
         # Request-URI
-        env.setdefault(ntou('wsgi.url_encoding'), ntou('utf-8'))
+        env.setdefault(u'wsgi.url_encoding', u'utf-8')
         try:
             if py3k:
                 for key in ["PATH_INFO", "SCRIPT_NAME", "QUERY_STRING"]:
                 env["QUERY_STRING"] = req.qs.decode(env['wsgi.url_encoding'])
         except UnicodeDecodeError:
             # Fall back to latin 1 so apps can transcode if needed.
-            env[ntou('wsgi.url_encoding')] = ntou('ISO-8859-1')
-            for key in [
-                ntou("PATH_INFO"), ntou("SCRIPT_NAME"), ntou("QUERY_STRING")
-            ]:
+            env[u'wsgi.url_encoding'] = u'ISO-8859-1'
+            for key in [u"PATH_INFO", u"SCRIPT_NAME", u"QUERY_STRING"]:
                 if py3k:
                     env[key] = env_10[key]
                 else: