Jason R. Coombs avatar Jason R. Coombs committed dbaec4c Merge

Merge with cherrypy-3.2.x

Comments (0)

Files changed (4)

cherrypy/_cpcompat.py

     basestring = (bytes, str)
     def ntob(n, encoding='ISO-8859-1'):
         """Return the given native string as a byte string in the given encoding."""
+        assert_native(n)
         # In Python 3, the native string type is unicode
         return n.encode(encoding)
     def ntou(n, encoding='ISO-8859-1'):
         """Return the given native string as a unicode string with the given encoding."""
+        assert_native(n)
         # In Python 3, the native string type is unicode
         return n
     def tonative(n, encoding='ISO-8859-1'):
     basestring = basestring
     def ntob(n, encoding='ISO-8859-1'):
         """Return the given native string as a byte string in the given encoding."""
+        assert_native(n)
         # In Python 2, the native string type is bytes. Assume it's already
         # in the given encoding, which for ISO-8859-1 is almost always what
         # was intended.
         return n
     def ntou(n, encoding='ISO-8859-1'):
         """Return the given native string as a unicode string with the given encoding."""
+        assert_native(n)
         # In Python 2, the native string type is bytes.
         # First, check for the special encoding 'escape'. The test suite uses this
         # to signal that it wants to pass a string with embedded \uXXXX escapes,
     # bytes:
     BytesIO = StringIO
 
+def assert_native(n):
+    if not isinstance(n, nativestr):
+        raise TypeError("n must be a native str (got %s)" % type(n).__name__)
+
 try:
     set = set
 except NameError:

cherrypy/lib/cptools.py

 import re
 
 import cherrypy
-from cherrypy._cpcompat import basestring, ntob, md5, set
+from cherrypy._cpcompat import basestring, md5, set
 from cherrypy.lib import httputil as _httputil
 
 
 
 def validate_etags(autotags=False, debug=False):
     """Validate the current ETag against If-Match, If-None-Match headers.
-    
+
     If autotags is True, an ETag response-header value will be provided
     from an MD5 hash of the response body (unless some other code has
     already provided an ETag header). If False (the default), the ETag
     will not be automatic.
-    
+
     WARNING: the autotags feature is not designed for URL's which allow
     methods other than GET. For example, if a POST to the same URL returns
     no content, the automatic ETag will be incorrect, breaking a fundamental
     See :rfc:`2616` Section 14.24.
     """
     response = cherrypy.serving.response
-    
+
     # Guard against being run twice.
     if hasattr(response, "ETag"):
         return
-    
+
     status, reason, msg = _httputil.valid_status(response.status)
-    
+
     etag = response.headers.get('ETag')
-    
+
     # Automatic ETag generation. See warning in docstring.
     if etag:
         if debug:
         if debug:
             cherrypy.log('Setting ETag: %s' % etag, 'TOOLS.ETAGS')
         response.headers['ETag'] = etag
-    
+
     response.ETag = etag
-    
+
     # "If the request would, without the If-Match header field, result in
     # anything other than a 2xx or 412 status, then the If-Match header
     # MUST be ignored."
         cherrypy.log('Status: %s' % status, 'TOOLS.ETAGS')
     if status >= 200 and status <= 299:
         request = cherrypy.serving.request
-        
+
         conditions = request.headers.elements('If-Match') or []
         conditions = [str(x) for x in conditions]
         if debug:
         if conditions and not (conditions == ["*"] or etag in conditions):
             raise cherrypy.HTTPError(412, "If-Match failed: ETag %r did "
                                      "not match %r" % (etag, conditions))
-        
+
         conditions = request.headers.elements('If-None-Match') or []
         conditions = [str(x) for x in conditions]
         if debug:
 
 def validate_since():
     """Validate the current Last-Modified against If-Modified-Since headers.
-    
+
     If no code has set the Last-Modified response header, then no validation
     will be performed.
     """
     lastmod = response.headers.get('Last-Modified')
     if lastmod:
         status, reason, msg = _httputil.valid_status(response.status)
-        
+
         request = cherrypy.serving.request
-        
+
         since = request.headers.get('If-Unmodified-Since')
         if since and since != lastmod:
             if (status >= 200 and status <= 299) or status == 412:
                 raise cherrypy.HTTPError(412)
-        
+
         since = request.headers.get('If-Modified-Since')
         if since and since == lastmod:
             if (status >= 200 and status <= 299) or status == 304:
 
 def allow(methods=None, debug=False):
     """Raise 405 if request.method not in methods (default ['GET', 'HEAD']).
-    
+
     The given methods are case-insensitive, and may be in any order.
     If only one method is allowed, you may supply a single string;
     if more than one, supply a list of strings.
-    
+
     Regardless of whether the current method is allowed or not, this
     also emits an 'Allow' response header, containing the given methods.
     """
         methods = ['GET', 'HEAD']
     elif 'GET' in methods and 'HEAD' not in methods:
         methods.append('HEAD')
-    
+
     cherrypy.response.headers['Allow'] = ', '.join(methods)
     if cherrypy.request.method not in methods:
         if debug:
 def proxy(base=None, local='X-Forwarded-Host', remote='X-Forwarded-For',
           scheme='X-Forwarded-Proto', debug=False):
     """Change the base URL (scheme://host[:port][/path]).
-    
+
     For running a CP server behind Apache, lighttpd, or other HTTP server.
-    
+
     For Apache and lighttpd, you should leave the 'local' argument at the
     default value of 'X-Forwarded-Host'. For Squid, you probably want to set
     tools.proxy.local = 'Origin'.
-    
+
     If you want the new request.base to include path info (not just the host),
     you must explicitly set base to the full base path, and ALSO set 'local'
     to '', so that the X-Forwarded-Host request header (which never includes
     path info) does not override it. Regardless, the value for 'base' MUST
     NOT end in a slash.
-    
+
     cherrypy.request.remote.ip (the IP address of the client) will be
     rewritten if the header specified by the 'remote' arg is valid.
     By default, 'remote' is set to 'X-Forwarded-For'. If you do not
     want to rewrite remote.ip, set the 'remote' arg to an empty string.
     """
-    
+
     request = cherrypy.serving.request
-    
+
     if scheme:
         s = request.headers.get(scheme, None)
         if debug:
             scheme = s
     if not scheme:
         scheme = request.base[:request.base.find("://")]
-    
+
     if local:
         lbase = request.headers.get(local, None)
         if debug:
             base = '127.0.0.1'
         else:
             base = '127.0.0.1:%s' % port
-    
+
     if base.find("://") == -1:
         # add http:// or https:// if needed
         base = scheme + "://" + base
-    
+
     request.base = base
-    
+
     if remote:
         xff = request.headers.get(remote)
         if debug:
 
 def ignore_headers(headers=('Range',), debug=False):
     """Delete request headers whose field names are included in 'headers'.
-    
+
     This is a useful tool for working behind certain HTTP servers;
     for example, Apache duplicates the work that CP does for 'Range'
     headers, and will doubly-truncate the response.
 def referer(pattern, accept=True, accept_missing=False, error=403,
             message='Forbidden Referer header.', debug=False):
     """Raise HTTPError if Referer header does/does not match the given pattern.
-    
+
     pattern
         A regular expression pattern to test against the Referer.
-        
+
     accept
         If True, the Referer must match the pattern; if False,
         the Referer must NOT match the pattern.
 
     error
         The HTTP error code to return to the client on failure.
-        
+
     message
         A string to include in the response body on failure.
-    
+
     """
     try:
         ref = cherrypy.serving.request.headers['Referer']
             cherrypy.log('No Referer header', 'TOOLS.REFERER')
         if accept_missing:
             return
-    
+
     raise cherrypy.HTTPError(error, message)
 
 
 class SessionAuth(object):
     """Assert that the user is logged in."""
-    
+
     session_key = "username"
     debug = False
-    
+
     def check_username_and_password(self, username, password):
         pass
-    
+
     def anonymous(self):
         """Provide a temporary user name for anonymous users."""
         pass
-    
+
     def on_login(self, username):
         pass
-    
+
     def on_logout(self, username):
         pass
-    
+
     def on_check(self, username):
         pass
-    
+
     def login_screen(self, from_page='..', username='', error_msg='', **kwargs):
-        return ntob("""<html><body>
+        return (u"""<html><body>
 Message: %(error_msg)s
 <form method="post" action="do_login">
     Login: <input type="text" name="username" value="%(username)s" size="10" /><br />
     <input type="hidden" name="from_page" value="%(from_page)s" /><br />
     <input type="submit" />
 </form>
-</body></html>""" % {'from_page': from_page, 'username': username,
-                     'error_msg': error_msg}, "utf-8")
-    
+</body></html>""" % vars()).encode("utf-8")
+
     def do_login(self, username, password, from_page='..', **kwargs):
         """Login. May raise redirect, or return True if request handled."""
         response = cherrypy.serving.response
             cherrypy.session[self.session_key] = username
             self.on_login(username)
             raise cherrypy.HTTPRedirect(from_page or "/")
-    
+
     def do_logout(self, from_page='..', **kwargs):
         """Logout. May raise redirect, or return True if request handled."""
         sess = cherrypy.session
             cherrypy.serving.request.login = None
             self.on_logout(username)
         raise cherrypy.HTTPRedirect(from_page)
-    
+
     def do_check(self):
         """Assert username. May raise redirect, or return True if request handled."""
         sess = cherrypy.session
         request = cherrypy.serving.request
         response = cherrypy.serving.response
-        
+
         username = sess.get(self.session_key)
         if not username:
             sess[self.session_key] = username = self.anonymous()
             cherrypy.log('Setting request.login to %r' % username, 'TOOLS.SESSAUTH')
         request.login = username
         self.on_check(username)
-    
+
     def run(self):
         request = cherrypy.serving.request
         response = cherrypy.serving.response
-        
+
         path = request.path_info
         if path.endswith('login_screen'):
             if self.debug:
 def log_hooks(debug=False):
     """Write request.hooks to the cherrypy error log."""
     request = cherrypy.serving.request
-    
+
     msg = []
     # Sort by the standard points if possible.
     from cherrypy import _cprequest
     for k in request.hooks.keys():
         if k not in points:
             points.append(k)
-    
+
     for k in points:
         msg.append("    %s:" % k)
         v = request.hooks.get(k, [])
     """Redirect if path_info has (missing|extra) trailing slash."""
     request = cherrypy.serving.request
     pi = request.path_info
-    
+
     if debug:
         cherrypy.log('is_index: %r, missing: %r, extra: %r, path_info: %r' %
                      (request.is_index, missing, extra, pi),
 
 def flatten(debug=False):
     """Wrap response.body in a generator that recursively iterates over body.
-    
+
     This allows cherrypy.response.body to consist of 'nested generators';
     that is, a set of generators that yield generators.
     """
 
 def accept(media=None, debug=False):
     """Return the client's preferred media-type (from the given Content-Types).
-    
+
     If 'media' is None (the default), no test will be performed.
-    
+
     If 'media' is provided, it should be the Content-Type value (as a string)
     or values (as a list or tuple of strings) which the current resource
     can emit. The client's acceptable media ranges (as declared in the
     values; the first such string is returned. That is, the return value
     will always be one of the strings provided in the 'media' arg (or None
     if 'media' is None).
-    
+
     If no match is found, then HTTPError 406 (Not Acceptable) is raised.
     Note that most web browsers send */* as a (low-quality) acceptable
     media range, which should match any Content-Type. In addition, "...if
     no Accept header field is present, then it is assumed that the client
     accepts all media types."
-    
+
     Matching types are checked in order of client preference first,
     and then in the order of the given 'media' values.
-    
+
     Note that this function does not honor accept-params (other than "q").
     """
     if not media:
     if isinstance(media, basestring):
         media = [media]
     request = cherrypy.serving.request
-    
+
     # Parse the Accept request header, and try to match one
     # of the requested media-ranges (in order of preference).
     ranges = request.headers.elements('Accept')
                             cherrypy.log('Match due to %s' % element.value,
                                          'TOOLS.ACCEPT')
                         return element.value
-    
+
     # No suitable media-range found.
     ah = request.headers.get('Accept')
     if ah is None:
 
 
 class MonitoredHeaderMap(_httputil.HeaderMap):
-    
+
     def __init__(self):
         self.accessed_headers = set()
-    
+
     def __getitem__(self, key):
         self.accessed_headers.add(key)
         return _httputil.HeaderMap.__getitem__(self, key)
-    
+
     def __contains__(self, key):
         self.accessed_headers.add(key)
         return _httputil.HeaderMap.__contains__(self, key)
-    
+
     def get(self, key, default=None):
         self.accessed_headers.add(key)
         return _httputil.HeaderMap.get(self, key, default=default)
-    
+
     if hasattr({}, 'has_key'):
         # Python 2
         def has_key(self, key):
 def autovary(ignore=None, debug=False):
     """Auto-populate the Vary response header based on request.header access."""
     request = cherrypy.serving.request
-    
+
     req_h = request.headers
     request.headers = MonitoredHeaderMap()
     request.headers.update(req_h)
     if ignore is None:
         ignore = set(['Content-Disposition', 'Content-Length', 'Content-Type'])
-    
+
     def set_response_header():
         resp_h = cherrypy.serving.response.headers
         v = set([e.value for e in resp_h.elements('Vary')])

cherrypy/test/test_compat.py

+import unittest
+
+import nose
+
+from cherrypy import _cpcompat as compat
+
+class StringTester(unittest.TestCase):
+    def test_ntob_non_native(self):
+        """
+        ntob should raise an Exception on unicode.
+        (Python 2 only)
+
+        See #1132 for discussion.
+        """
+        if compat.py3k:
+            raise nose.SkipTest("Only useful on Python 2")
+        self.assertRaises(Exception, compat.ntob, u'fight')

cherrypy/test/test_tools.py

 
 import gzip
 import sys
+import unittest
 from cherrypy._cpcompat import BytesIO, copyitems, itervalues
 from cherrypy._cpcompat import IncompleteRead, ntob, ntou, py3k, xrange
+from cherrypy._cpcompat import bytestr
 import time
 timeout = 0.2
 import types
 
 class ToolTests(helper.CPWebCase):
     def setup_server():
-        
+
         # Put check_access in a custom toolbox with its own namespace
         myauthtools = cherrypy._cptools.Toolbox("myauth")
-        
+
         def check_access(default=False):
             if not getattr(cherrypy.request, "userid", default):
                 raise cherrypy.HTTPError(401)
         myauthtools.check_access = cherrypy.Tool('before_request_body', check_access)
-        
+
         def numerify():
             def number_it(body):
                 for chunk in body:
                         chunk = chunk.replace(k, v)
                     yield chunk
             cherrypy.response.body = number_it(cherrypy.response.body)
-        
+
         class NumTool(cherrypy.Tool):
             def _setup(self):
                 def makemap():
                     m = self._merged_args().get("map", {})
                     cherrypy.request.numerify_map = copyitems(m)
                 cherrypy.request.hooks.attach('on_start_resource', makemap)
-                
+
                 def critical():
                     cherrypy.request.error_response = cherrypy.HTTPError(502).set_response
                 critical.failsafe = True
-                
+
                 cherrypy.request.hooks.attach('on_start_resource', critical)
                 cherrypy.request.hooks.attach(self._point, self.callable)
-        
+
         tools.numerify = NumTool('before_finalize', numerify)
-        
+
         # It's not mandatory to inherit from cherrypy.Tool.
         class NadsatTool:
-            
+
             def __init__(self):
                 self.ended = {}
                 self._name = "nadsat"
-            
+
             def nadsat(self):
                 def nadsat_it_up(body):
                     for chunk in body:
                         yield chunk
                 cherrypy.response.body = nadsat_it_up(cherrypy.response.body)
             nadsat.priority = 0
-            
+
             def cleanup(self):
                 # This runs after the request has been completely written out.
                 cherrypy.response.body = [ntob("razdrez")]
                 if id:
                     self.ended[id] = True
             cleanup.failsafe = True
-            
+
             def _setup(self):
                 cherrypy.request.hooks.attach('before_finalize', self.nadsat)
                 cherrypy.request.hooks.attach('on_end_request', self.cleanup)
         tools.nadsat = NadsatTool()
-        
+
         def pipe_body():
             cherrypy.request.process_request_body = False
             clen = int(cherrypy.request.headers['Content-Length'])
             cherrypy.request.body = cherrypy.request.rfile.read(clen)
-        
+
         # Assert that we can use a callable object instead of a function.
         class Rotator(object):
             def __call__(self, scale):
                 else:
                     r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]]
         cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator())
-        
+
         def stream_handler(next_handler, *args, **kwargs):
             cherrypy.response.output = o = BytesIO()
             try:
             finally:
                 o.close()
         cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler)
-        
+
         class Root:
             def index(self):
                 return "Howdy earth!"
             index.exposed = True
-            
+
             def tarfile(self):
                 cherrypy.response.output.write(ntob('I am '))
                 cherrypy.response.output.write(ntob('a tarfile'))
             tarfile.exposed = True
             tarfile._cp_config = {'tools.streamer.on': True}
-            
+
             def euro(self):
                 hooks = list(cherrypy.request.hooks['before_finalize'])
                 hooks.sort()
                 yield ntou("world")
                 yield europoundUnicode
             euro.exposed = True
-            
+
             # Bare hooks
             def pipe(self):
                 return cherrypy.request.body
             pipe.exposed = True
             pipe._cp_config = {'hooks.before_request_body': pipe_body}
-            
+
             # Multiple decorators; include kwargs just for fun.
             # Note that rotator must run before gzip.
             def decorated_euro(self, *vpath):
             decorated_euro.exposed = True
             decorated_euro = tools.gzip(compress_level=6)(decorated_euro)
             decorated_euro = tools.rotator(scale=3)(decorated_euro)
-        
+
         root = Root()
-        
-        
+
         class TestType(type):
             """Metaclass which automatically exposes all functions in each subclass,
             and adds an instance of the subclass as an attribute of root.
                         value.exposed = True
                 setattr(root, name.lower(), cls())
         Test = TestType('Test', (object,), {})
-        
-        
+
         # METHOD ONE:
         # Declare Tools in _cp_config
         class Demo(Test):
-            
+
             _cp_config = {"tools.nadsat.on": True}
-            
+
             def index(self, id=None):
                 return "A good piece of cherry pie"
-            
+
             def ended(self, id):
                 return repr(tools.nadsat.ended[id])
-            
+
             def err(self, id=None):
                 raise ValueError()
-            
+
             def errinstream(self, id=None):
                 yield "nonconfidential"
                 raise ValueError()
                 yield "confidential"
-            
+
             # METHOD TWO: decorator using Tool()
             # We support Python 2.3, but the @-deco syntax would look like this:
             # @tools.check_access()
                 return "Welcome!"
             restricted = myauthtools.check_access()(restricted)
             userid = restricted
-            
+
             def err_in_onstart(self):
                 return "success!"
-            
+
             def stream(self, id=None):
                 for x in xrange(100000000):
                     yield str(x)
             stream._cp_config = {'response.stream': True}
-        
-        
+
+
         conf = {
             # METHOD THREE:
             # Declare Tools in detached config
         }
         app = cherrypy.tree.mount(root, config=conf)
         app.request_class.namespaces['myauth'] = myauthtools
-        
+
         if sys.version_info >= (2, 5):
             from cherrypy.test import _test_decorators
             root.tooldecs = _test_decorators.ToolExamples()
         time.sleep(0.1)
         self.getPage("/demo/ended/1")
         self.assertBody("True")
-        
+
         valerr = '\n    raise ValueError()\nValueError'
         self.getPage("/demo/err?id=3")
         # If body is "razdrez", then on_end_request is being called too early.
         time.sleep(0.1)
         self.getPage("/demo/ended/3")
         self.assertBody("True")
-        
+
         # If body is "razdrez", then on_end_request is being called too early.
         if (cherrypy.server.protocol_version == "HTTP/1.0" or
             getattr(cherrypy.server, "using_apache", False)):
         time.sleep(0.1)
         self.getPage("/demo/ended/5")
         self.assertBody("True")
-        
+
         # Test the "__call__" technique (compile-time decorator).
         self.getPage("/demo/restricted")
         self.assertErrorPage(401)
-        
+
         # Test compile-time decorator with kwargs from config.
         self.getPage("/demo/userid")
         self.assertBody("Welcome!")
-    
+
     def testEndRequestOnDrop(self):
         old_timeout = None
         try:
             old_timeout = httpserver.timeout
         except (AttributeError, IndexError):
             return self.skip()
-        
+
         try:
             httpserver.timeout = timeout
-            
+
             # Test that on_end_request is called even if the client drops.
             self.persistent = True
             try:
         finally:
             if old_timeout is not None:
                 httpserver.timeout = old_timeout
-    
+
     def testGuaranteedHooks(self):
         # The 'critical' on_start_resource hook is 'failsafe' (guaranteed
         # to run even if there are failures in other on_start methods).
         self.getPage("/demo/err_in_onstart")
         self.assertErrorPage(502)
         self.assertInBody("AttributeError: 'str' object has no attribute 'items'")
-    
+
     def testCombinedTools(self):
         expectedResult = (ntou("Hello,world") + europoundUnicode).encode('utf-8')
         zbuf = BytesIO()
         zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9)
         zfile.write(expectedResult)
         zfile.close()
-        
+
         self.getPage("/euro", headers=[("Accept-Encoding", "gzip"),
                                         ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")])
         self.assertInBody(zbuf.getvalue()[:3])
-        
+
         zbuf = BytesIO()
         zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
         zfile.write(expectedResult)
         zfile.close()
-        
+
         self.getPage("/decorated_euro", headers=[("Accept-Encoding", "gzip")])
         self.assertInBody(zbuf.getvalue()[:3])
-        
+
         # This returns a different value because gzip's priority was
         # lowered in conf, allowing the rotator to run after gzip.
         # Of course, we don't want breakage in production apps,
             self.assertInBody(bytes([(x + 3) % 256 for x in zbuf.getvalue()]))
         else:
             self.assertInBody(''.join([chr((ord(x) + 3) % 256) for x in zbuf.getvalue()]))
-    
+
     def testBareHooks(self):
         content = "bit of a pain in me gulliver"
         self.getPage("/pipe",
                               ("Content-Type", "text/plain")],
                      method="POST", body=content)
         self.assertBody(content)
-    
+
     def testHandlerWrapperTool(self):
         self.getPage("/tarfile")
         self.assertBody("I am a tarfile")
-    
+
     def testToolWithConfig(self):
         if not sys.version_info >= (2, 5):
             return self.skip("skipped (Python 2.5+ only)")
-        
+
         self.getPage('/tooldecs/blah')
         self.assertHeader('Content-Type', 'application/data')
-    
+
     def testWarnToolOn(self):
         # get
         try:
-            numon = cherrypy.tools.numerify.on
+            cherrypy.tools.numerify.on
         except AttributeError:
             pass
         else:
             raise AssertionError("Tool.on did not error as it should have.")
-        
+
         # set
         try:
             cherrypy.tools.numerify.on = True
         else:
             raise AssertionError("Tool.on did not error as it should have.")
 
+class SessionAuthTest(unittest.TestCase):
+    def test_login_screen_returns_bytes(self):
+        """
+        login_screen must return bytes even if unicode parameters are passed.
+        Issue 1132 revealed that login_screen would return unicode if the
+        username and password were unicode.
+        """
+        sa = cherrypy.lib.cptools.SessionAuth()
+        res = sa.login_screen(None, username=u'nobody', password=u'anypass')
+        self.assertIsInstance(res, bytestr)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.