Robert Brewer avatar Robert Brewer committed 335a1ad

Oh so close to merging py2 and py3 back together

Comments (0)

Files changed (24)

py2/cherrypy/_cpcompat.py

 output.
 """
 import os
+import re
 import sys
 
 if sys.version_info >= (3, 0):
         return n
     def ntou(n, encoding='ISO-8859-1'):
         """Return the given native string as a unicode string with the given encoding."""
-        # In Python 2, the native string type is bytes. Assume it's already
-        # in the given encoding, which for ISO-8859-1 is almost always what
-        # was intended.
+        # In Python 2, the native string type is bytes.
+        # First, check for the special encoding 'escape'. The test suite uses this
+        # to signal that it wants to pass a string with embedded \uXXXX escapes,
+        # but without having to prefix it with u'' for Python 2, but no prefix
+        # for Python 3.
+        if encoding == 'escape':
+            return unicode(
+                re.sub(r'\\u([0-9a-zA-Z]{4})',
+                       lambda m: unichr(int(m.group(1), 16)),
+                       n.decode('ISO-8859-1')))
+        # Assume it's already in the given encoding, which for ISO-8859-1 is almost
+        # always what was intended.
         return n.decode(encoding)
     def tonative(n, encoding='ISO-8859-1'):
         """Return the given string as a native string in the given encoding."""

py2/cherrypy/test/test_dynamicobjectmapping.py

 import cherrypy
+from cherrypy._cpcompat import sorted, unicodestr
 from cherrypy._cptree import Application
 from cherrypy.test import helper
 
 
     def make_user(name, id=None):
         if not id:
-            id = max(*user_lookup.keys()) + 1
+            id = max(*list(user_lookup.keys())) + 1
         user_lookup[id] = User(id, name)
         return id
 
             return "POST %d" % make_user(name)
 
         def GET(self):
-            keys = user_lookup.keys()
-            keys.sort()
-            return unicode(keys)
+            return unicodestr(sorted(user_lookup.keys()))
 
         def dynamic_dispatch(self, vpath):
             try:
             """
             Return the appropriate representation of the instance.
             """
-            return unicode(self.user)
+            return unicodestr(self.user)
 
         def POST(self, name):
             """

py2/cherrypy/test/test_encoding.py

 from cherrypy._cpcompat import BytesIO, IncompleteRead, ntob, ntou
 
 europoundUnicode = ntou('\x80\xa3')
-sing = u"\u6bdb\u6cfd\u4e1c: Sing, Little Birdie?"
+sing = ntou("\u6bdb\u6cfd\u4e1c: Sing, Little Birdie?", 'escape')
 sing8 = sing.encode('utf-8')
 sing16 = sing.encode('utf-16')
 

py2/cherrypy/test/test_etags.py

 import cherrypy
+from cherrypy._cpcompat import ntou
 from cherrypy.test import helper
 
 
             fail.exposed = True
             
             def unicoded(self):
-                return u'I am a \u1ee4nicode string.'
+                return ntou('I am a \u1ee4nicode string.', 'escape')
             unicoded.exposed = True
             # In Python 3, tools.encode is on by default
             unicoded._cp_config = {'tools.encode.on': True}

py2/cherrypy/test/test_http.py

 import mimetypes
 
 import cherrypy
-from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob
+from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob, py3k
 
 
 def encode_multipart_formdata(files):
                 """Return a summary ("a * 65536\nb * 65536") of the uploaded file."""
                 contents = file.file.read()
                 summary = []
-                curchar = ""
+                curchar = None
                 count = 0
                 for c in contents:
                     if c == curchar:
                         count += 1
                     else:
                         if count:
+                            if py3k: curchar = chr(curchar)
                             summary.append("%s * %d" % (curchar, count))
                         count = 1
                         curchar = c
                 if count:
+                    if py3k: curchar = chr(curchar)
                     summary.append("%s * %d" % (curchar, count))
                 return ", ".join(summary)
             post_multipart.exposed = True

py2/cherrypy/test/test_logging.py

 localDir = os.path.dirname(__file__)
 
 import cherrypy
+from cherrypy._cpcompat import ntob, ntou, py3k
 
 access_log = os.path.join(localDir, "access.log")
 error_log = os.path.join(localDir, "error.log")
 
 # Some unicode strings.
-tartaros = u'\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2'
-erebos = u'\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com'
+tartaros = ntou('\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2', 'escape')
+erebos = ntou('\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com', 'escape')
 
 
 def setup_server():
         self.markLog()
         self.getPage("/uni_code")
         self.assertStatus(200)
-        self.assertLog(-1, repr(tartaros.encode('utf8'))[1:-1])
+        if py3k:
+            # The repr of a bytestring in py3k includes a b'' prefix
+            self.assertLog(-1, repr(tartaros.encode('utf8'))[2:-1])
+        else:
+            self.assertLog(-1, repr(tartaros.encode('utf8'))[1:-1])
         # Test the erebos value. Included inline for your enlightenment.
         # Note the 'r' prefix--those backslashes are literals.
         self.assertLog(-1, r'\xce\x88\xcf\x81\xce\xb5\xce\xb2\xce\xbf\xcf\x82')
         self.markLog()
         self.getPage("/slashes")
         self.assertStatus(200)
-        self.assertLog(-1, r'"GET /slashed\\path HTTP/1.1"')
+        if py3k:
+            self.assertLog(-1, ntob('"GET /slashed\\path HTTP/1.1"'))
+        else:
+            self.assertLog(-1, r'"GET /slashed\\path HTTP/1.1"')
         
         # Test whitespace in output.
         self.markLog()

py2/cherrypy/test/test_mime.py

         multipart_form_data.exposed = True
         
         def flashupload(self, Filedata, Upload, Filename):
-            return ("Upload: %r, Filename: %r, Filedata: %r" %
+            return ("Upload: %s, Filename: %s, Filedata: %r" %
                     (Upload, Filename, Filedata.file.read()))
         flashupload.exposed = True
     
                               ("Content-Length", str(len(body))),
                               ],
                      body=body),
-        self.assertBody(repr([('baz', [u'111', u'333']), ('foo', u'bar')]))
+        self.assertBody(repr([('baz', [ntou('111'), ntou('333')]), ('foo', ntou('bar'))]))
 
 
 class SafeMultipartHandlingTest(helper.CPWebCase):
             '------------KM7Ij5cH2KM7Ef1gL6ae0ae0cH2gL6--'
             ))
         self.getPage('/flashupload', headers, "POST", body)
-        self.assertBody("Upload: u'Submit Query', Filename: u'.project', "
+        self.assertBody("Upload: Submit Query, Filename: .project, "
                         "Filedata: %r" % filedata)
 

py2/cherrypy/test/test_misc_tools.py

             # Read a header directly with '__contains__'
             hasif = 'If-Modified-Since' in cherrypy.request.headers
             # Read a header directly with 'has_key'
-            has = cherrypy.request.headers.has_key('Range')
+            if hasattr(dict, 'has_key'):
+                # Python 2
+                has = cherrypy.request.headers.has_key('Range')
+            else:
+                # Python 3
+                has = 'Range' in cherrypy.request.headers
             # Call a lib function
             mtype = tools.accept.callable(['text/html', 'text/plain'])
             return "Hello, world!"

py2/cherrypy/test/test_objectmapping.py

 import cherrypy
+from cherrypy._cpcompat import ntou
 from cherrypy._cptree import Application
 from cherrypy.test import helper
 
         self.assertHeader('Allow', 'GET, HEAD, POST')
         
         self.getPage("/bymethod")
-        self.assertBody("['another', u'one']")
+        self.assertBody(repr(['another', ntou('one')]))
         self.assertHeader('Allow', 'GET, HEAD, POST')
         
         self.getPage("/bymethod", method="PUT")

py2/cherrypy/test/test_request_obj.py

 
         # Test "% HEX HEX"-encoded URL, param keys, and values
         self.getPage("/params/%d4%20%e3/cheese?Gruy%E8re=Bulgn%e9ville")
-        self.assertBody(r"args: ('\xd4 \xe3', 'cheese') "
-                        r"kwargs: {'Gruy\xe8re': u'Bulgn\xe9ville'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('\xd4 \xe3', 'cheese'),
+                         {'Gruy\xe8re': ntou('Bulgn\xe9ville')}))
         
         # Make sure that encoded = and & get parsed correctly
         self.getPage("/params/code?url=http%3A//cherrypy.org/index%3Fa%3D1%26b%3D2")
-        self.assertBody(r"args: ('code',) "
-                        r"kwargs: {'url': u'http://cherrypy.org/index?a=1&b=2'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('code',),
+                         {'url': ntou('http://cherrypy.org/index?a=1&b=2')}))
         
         # Test coordinates sent by <img ismap>
         self.getPage("/params/ismap?223,114")
         
         # Test "name[key]" dict-like params
         self.getPage("/params/dictlike?a[1]=1&a[2]=2&b=foo&b[bar]=baz")
-        self.assertBody(
-            "args: ('dictlike',) "
-            "kwargs: {'a[1]': u'1', 'b[bar]': u'baz', 'b': u'foo', 'a[2]': u'2'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('dictlike',),
+                         {'a[1]': ntou('1'), 'b[bar]': ntou('baz'),
+                          'b': ntou('foo'), 'a[2]': ntou('2')}))
 
     def testParamErrors(self):
 
     
     def test_encoded_headers(self):
         # First, make sure the innards work like expected.
-        self.assertEqual(httputil.decode_TEXT(u"=?utf-8?q?f=C3=BCr?="), u"f\xfcr")
+        self.assertEqual(httputil.decode_TEXT(ntou("=?utf-8?q?f=C3=BCr?=")), ntou("f\xfcr"))
         
         if cherrypy.server.protocol_version == "HTTP/1.1":
             # Test RFC-2047-encoded request and response header values
-            u = u'\u212bngstr\xf6m'
-            c = u"=E2=84=ABngstr=C3=B6m"
-            self.getPage("/headers/ifmatch", [('If-Match', u'=?utf-8?q?%s?=' % c)])
+            u = ntou('\u212bngstr\xf6m', 'escape')
+            c = ntou("=E2=84=ABngstr=C3=B6m")
+            self.getPage("/headers/ifmatch", [('If-Match', ntou('=?utf-8?q?%s?=') % c)])
             # The body should be utf-8 encoded.
-            self.assertBody("\xe2\x84\xabngstr\xc3\xb6m")
+            self.assertBody(ntob("\xe2\x84\xabngstr\xc3\xb6m"))
             # But the Etag header should be RFC-2047 encoded (binary)
-            self.assertHeader("ETag", u'=?utf-8?b?4oSrbmdzdHLDtm0=?=')
+            self.assertHeader("ETag", ntou('=?utf-8?b?4oSrbmdzdHLDtm0=?='))
             
             # Test a *LONG* RFC-2047-encoded request and response header value
             self.getPage("/headers/ifmatch",
-                         [('If-Match', u'=?utf-8?q?%s?=' % (c * 10))])
-            self.assertBody("\xe2\x84\xabngstr\xc3\xb6m" * 10)
+                         [('If-Match', ntou('=?utf-8?q?%s?=') % (c * 10))])
+            self.assertBody(ntob("\xe2\x84\xabngstr\xc3\xb6m") * 10)
             # Note: this is different output for Python3, but it decodes fine.
             etag = self.assertHeader("ETag",
                 '=?utf-8?b?4oSrbmdzdHLDtm3ihKtuZ3N0csO2beKEq25nc3Ryw7Zt'

py2/cherrypy/test/test_wsgi_ns.py

 import cherrypy
+from cherrypy._cpcompat import ntob
 from cherrypy.test import helper
 
 
             
             def next(self):
                 return self.iter.next()
+            def __next__(self):
+                return next(self.iter)
             
             def close(self):
                 if hasattr(self.appresults, "close"):
                 class CaseResults(WSGIResponse):
                     def next(this):
                         return getattr(this.iter.next(), self.to)()
+                    def __next__(this):
+                        return getattr(next(this.iter), self.to)()
                 return CaseResults(res)
         
         class Replacer(object):
                         for k, v in self.map.iteritems():
                             line = line.replace(k, v)
                         return line
+                    def __next__(this):
+                        line = next(this.iter)
+                        for k, v in self.map.items():
+                            line = line.replace(k, v)
+                        return line
                 return ReplaceResults(res)
         
         class Root(object):
         
         
         root_conf = {'wsgi.pipeline': [('replace', Replacer)],
-                     'wsgi.replace.map': {'L': 'X', 'l': 'r'},
+                     'wsgi.replace.map': {ntob('L'): ntob('X'),
+                                          ntob('l'): ntob('r')},
                      }
         
         app = cherrypy.Application(Root())

py3/cherrypy/_cpcompat.py

 output.
 """
 import os
+import re
 import sys
 
 if sys.version_info >= (3, 0):
         return n
     def ntou(n, encoding='ISO-8859-1'):
         """Return the given native string as a unicode string with the given encoding."""
-        # In Python 2, the native string type is bytes. Assume it's already
-        # in the given encoding, which for ISO-8859-1 is almost always what
-        # was intended.
+        # In Python 2, the native string type is bytes.
+        # First, check for the special encoding 'escape'. The test suite uses this
+        # to signal that it wants to pass a string with embedded \uXXXX escapes,
+        # but without having to prefix it with u'' for Python 2, but no prefix
+        # for Python 3.
+        if encoding == 'escape':
+            return unicode(
+                re.sub(r'\\u([0-9a-zA-Z]{4})',
+                       lambda m: unichr(int(m.group(1), 16)),
+                       n.decode('ISO-8859-1')))
+        # Assume it's already in the given encoding, which for ISO-8859-1 is almost
+        # always what was intended.
         return n.decode(encoding)
     def tonative(n, encoding='ISO-8859-1'):
         """Return the given string as a native string in the given encoding."""

py3/cherrypy/_cpthreadinglocal.py

+# This is a backport of Python-2.4's threading.local() implementation
+
+"""Thread-local objects
+
+(Note that this module provides a Python version of thread
+ threading.local class.  Depending on the version of Python you're
+ using, there may be a faster one available.  You should always import
+ the local class from threading.)
+
+Thread-local objects support the management of thread-local data.
+If you have data that you want to be local to a thread, simply create
+a thread-local object and use its attributes:
+
+  >>> mydata = local()
+  >>> mydata.number = 42
+  >>> mydata.number
+  42
+
+You can also access the local-object's dictionary:
+
+  >>> mydata.__dict__
+  {'number': 42}
+  >>> mydata.__dict__.setdefault('widgets', [])
+  []
+  >>> mydata.widgets
+  []
+
+What's important about thread-local objects is that their data are
+local to a thread. If we access the data in a different thread:
+
+  >>> log = []
+  >>> def f():
+  ...     items = mydata.__dict__.items()
+  ...     items.sort()
+  ...     log.append(items)
+  ...     mydata.number = 11
+  ...     log.append(mydata.number)
+
+  >>> import threading
+  >>> thread = threading.Thread(target=f)
+  >>> thread.start()
+  >>> thread.join()
+  >>> log
+  [[], 11]
+
+we get different data.  Furthermore, changes made in the other thread
+don't affect data seen in this thread:
+
+  >>> mydata.number
+  42
+
+Of course, values you get from a local object, including a __dict__
+attribute, are for whatever thread was current at the time the
+attribute was read.  For that reason, you generally don't want to save
+these values across threads, as they apply only to the thread they
+came from.
+
+You can create custom local objects by subclassing the local class:
+
+  >>> class MyLocal(local):
+  ...     number = 2
+  ...     initialized = False
+  ...     def __init__(self, **kw):
+  ...         if self.initialized:
+  ...             raise SystemError('__init__ called too many times')
+  ...         self.initialized = True
+  ...         self.__dict__.update(kw)
+  ...     def squared(self):
+  ...         return self.number ** 2
+
+This can be useful to support default values, methods and
+initialization.  Note that if you define an __init__ method, it will be
+called each time the local object is used in a separate thread.  This
+is necessary to initialize each thread's dictionary.
+
+Now if we create a local object:
+
+  >>> mydata = MyLocal(color='red')
+
+Now we have a default number:
+
+  >>> mydata.number
+  2
+
+an initial color:
+
+  >>> mydata.color
+  'red'
+  >>> del mydata.color
+
+And a method that operates on the data:
+
+  >>> mydata.squared()
+  4
+
+As before, we can access the data in a separate thread:
+
+  >>> log = []
+  >>> thread = threading.Thread(target=f)
+  >>> thread.start()
+  >>> thread.join()
+  >>> log
+  [[('color', 'red'), ('initialized', True)], 11]
+
+without affecting this thread's data:
+
+  >>> mydata.number
+  2
+  >>> mydata.color
+  Traceback (most recent call last):
+  ...
+  AttributeError: 'MyLocal' object has no attribute 'color'
+
+Note that subclasses can define slots, but they are not thread
+local. They are shared across threads:
+
+  >>> class MyLocal(local):
+  ...     __slots__ = 'number'
+
+  >>> mydata = MyLocal()
+  >>> mydata.number = 42
+  >>> mydata.color = 'red'
+
+So, the separate thread:
+
+  >>> thread = threading.Thread(target=f)
+  >>> thread.start()
+  >>> thread.join()
+
+affects what we see:
+
+  >>> mydata.number
+  11
+
+>>> del mydata
+"""
+
+# Threading import is at end
+
+class _localbase(object):
+    __slots__ = '_local__key', '_local__args', '_local__lock'
+
+    def __new__(cls, *args, **kw):
+        self = object.__new__(cls)
+        key = 'thread.local.' + str(id(self))
+        object.__setattr__(self, '_local__key', key)
+        object.__setattr__(self, '_local__args', (args, kw))
+        object.__setattr__(self, '_local__lock', RLock())
+
+        if args or kw and (cls.__init__ is object.__init__):
+            raise TypeError("Initialization arguments are not supported")
+
+        # We need to create the thread dict in anticipation of
+        # __init__ being called, to make sure we don't call it
+        # again ourselves.
+        dict = object.__getattribute__(self, '__dict__')
+        currentThread().__dict__[key] = dict
+
+        return self
+
+def _patch(self):
+    key = object.__getattribute__(self, '_local__key')
+    d = currentThread().__dict__.get(key)
+    if d is None:
+        d = {}
+        currentThread().__dict__[key] = d
+        object.__setattr__(self, '__dict__', d)
+
+        # we have a new instance dict, so call out __init__ if we have
+        # one
+        cls = type(self)
+        if cls.__init__ is not object.__init__:
+            args, kw = object.__getattribute__(self, '_local__args')
+            cls.__init__(self, *args, **kw)
+    else:
+        object.__setattr__(self, '__dict__', d)
+
+class local(_localbase):
+
+    def __getattribute__(self, name):
+        lock = object.__getattribute__(self, '_local__lock')
+        lock.acquire()
+        try:
+            _patch(self)
+            return object.__getattribute__(self, name)
+        finally:
+            lock.release()
+
+    def __setattr__(self, name, value):
+        lock = object.__getattribute__(self, '_local__lock')
+        lock.acquire()
+        try:
+            _patch(self)
+            return object.__setattr__(self, name, value)
+        finally:
+            lock.release()
+
+    def __delattr__(self, name):
+        lock = object.__getattribute__(self, '_local__lock')
+        lock.acquire()
+        try:
+            _patch(self)
+            return object.__delattr__(self, name)
+        finally:
+            lock.release()
+
+
+    def __del__():
+        threading_enumerate = enumerate
+        __getattribute__ = object.__getattribute__
+
+        def __del__(self):
+            key = __getattribute__(self, '_local__key')
+
+            try:
+                threads = list(threading_enumerate())
+            except:
+                # if enumerate fails, as it seems to do during
+                # shutdown, we'll skip cleanup under the assumption
+                # that there is nothing to clean up
+                return
+
+            for thread in threads:
+                try:
+                    __dict__ = thread.__dict__
+                except AttributeError:
+                    # Thread is dying, rest in peace
+                    continue
+
+                if key in __dict__:
+                    try:
+                        del __dict__[key]
+                    except KeyError:
+                        pass # didn't have anything in this thread
+
+        return __del__
+    __del__ = __del__()
+
+from threading import currentThread, enumerate, RLock

py3/cherrypy/test/test_dynamicobjectmapping.py

 import cherrypy
+from cherrypy._cpcompat import sorted, unicodestr
 from cherrypy._cptree import Application
 from cherrypy.test import helper
 
             return "POST %d" % make_user(name)
 
         def GET(self):
-            return str(sorted(user_lookup.keys()))
+            return unicodestr(sorted(user_lookup.keys()))
 
         def dynamic_dispatch(self, vpath):
             try:
             """
             Return the appropriate representation of the instance.
             """
-            return str(self.user)
+            return unicodestr(self.user)
 
         def POST(self, name):
             """

py3/cherrypy/test/test_encoding.py

 from cherrypy._cpcompat import BytesIO, IncompleteRead, ntob, ntou
 
 europoundUnicode = ntou('\x80\xa3')
-sing = "\u6bdb\u6cfd\u4e1c: Sing, Little Birdie?"
+sing = ntou("\u6bdb\u6cfd\u4e1c: Sing, Little Birdie?", 'escape')
 sing8 = sing.encode('utf-8')
 sing16 = sing.encode('utf-16')
 

py3/cherrypy/test/test_etags.py

 import cherrypy
+from cherrypy._cpcompat import ntou
 from cherrypy.test import helper
 
 
             fail.exposed = True
             
             def unicoded(self):
-                return 'I am a \u1ee4nicode string.'
+                return ntou('I am a \u1ee4nicode string.', 'escape')
             unicoded.exposed = True
             # In Python 3, tools.encode is on by default
             unicoded._cp_config = {'tools.encode.on': True}

py3/cherrypy/test/test_http.py

 import mimetypes
 
 import cherrypy
-from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob
+from cherrypy._cpcompat import HTTPConnection, HTTPSConnection, ntob, py3k
 
 
 def encode_multipart_formdata(files):
                         count += 1
                     else:
                         if count:
-                            summary.append("%s * %d" % (chr(curchar), count))
+                            if py3k: curchar = chr(curchar)
+                            summary.append("%s * %d" % (curchar, count))
                         count = 1
                         curchar = c
                 if count:
-                    summary.append("%s * %d" % (chr(curchar), count))
+                    if py3k: curchar = chr(curchar)
+                    summary.append("%s * %d" % (curchar, count))
                 return ", ".join(summary)
             post_multipart.exposed = True
         

py3/cherrypy/test/test_logging.py

 localDir = os.path.dirname(__file__)
 
 import cherrypy
+from cherrypy._cpcompat import ntob, ntou, py3k
 
 access_log = os.path.join(localDir, "access.log")
 error_log = os.path.join(localDir, "error.log")
 
 # Some unicode strings.
-tartaros = '\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2'
-erebos = '\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com'
+tartaros = ntou('\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2', 'escape')
+erebos = ntou('\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com', 'escape')
 
 
 def setup_server():
         self.markLog()
         self.getPage("/uni_code")
         self.assertStatus(200)
-        self.assertLog(-1, repr(tartaros.encode('utf8'))[2:-1])
+        if py3k:
+            # The repr of a bytestring in py3k includes a b'' prefix
+            self.assertLog(-1, repr(tartaros.encode('utf8'))[2:-1])
+        else:
+            self.assertLog(-1, repr(tartaros.encode('utf8'))[1:-1])
         # Test the erebos value. Included inline for your enlightenment.
         # Note the 'r' prefix--those backslashes are literals.
         self.assertLog(-1, r'\xce\x88\xcf\x81\xce\xb5\xce\xb2\xce\xbf\xcf\x82')
         self.markLog()
         self.getPage("/slashes")
         self.assertStatus(200)
-        self.assertLog(-1, b'"GET /slashed\\path HTTP/1.1"')
+        if py3k:
+            self.assertLog(-1, ntob('"GET /slashed\\path HTTP/1.1"'))
+        else:
+            self.assertLog(-1, r'"GET /slashed\\path HTTP/1.1"')
         
         # Test whitespace in output.
         self.markLog()

py3/cherrypy/test/test_mime.py

         multipart_form_data.exposed = True
         
         def flashupload(self, Filedata, Upload, Filename):
-            return ("Upload: %r, Filename: %r, Filedata: %r" %
+            return ("Upload: %s, Filename: %s, Filedata: %r" %
                     (Upload, Filename, Filedata.file.read()))
         flashupload.exposed = True
     
                               ("Content-Length", str(len(body))),
                               ],
                      body=body),
-        self.assertBody(repr([('baz', ['111', '333']), ('foo', 'bar')]))
+        self.assertBody(repr([('baz', [ntou('111'), ntou('333')]), ('foo', ntou('bar'))]))
 
 
 class SafeMultipartHandlingTest(helper.CPWebCase):
             '------------KM7Ij5cH2KM7Ef1gL6ae0ae0cH2gL6--'
             ))
         self.getPage('/flashupload', headers, "POST", body)
-        self.assertBody("Upload: 'Submit Query', Filename: '.project', "
+        self.assertBody("Upload: Submit Query, Filename: .project, "
                         "Filedata: %r" % filedata)
 

py3/cherrypy/test/test_misc_tools.py

             cl = cherrypy.request.headers['Host']
             # Read a header directly with '__contains__'
             hasif = 'If-Modified-Since' in cherrypy.request.headers
-            has = 'Range' in cherrypy.request.headers
+            # Read a header directly with 'has_key'
+            if hasattr(dict, 'has_key'):
+                # Python 2
+                has = cherrypy.request.headers.has_key('Range')
+            else:
+                # Python 3
+                has = 'Range' in cherrypy.request.headers
             # Call a lib function
             mtype = tools.accept.callable(['text/html', 'text/plain'])
             return "Hello, world!"

py3/cherrypy/test/test_objectmapping.py

 import cherrypy
+from cherrypy._cpcompat import ntou
 from cherrypy._cptree import Application
 from cherrypy.test import helper
 
         self.assertHeader('Allow', 'GET, HEAD, POST')
         
         self.getPage("/bymethod")
-        self.assertBody("['another', 'one']")
+        self.assertBody(repr(['another', ntou('one')]))
         self.assertHeader('Allow', 'GET, HEAD, POST')
         
         self.getPage("/bymethod", method="PUT")

py3/cherrypy/test/test_request_obj.py

 
         # Test "% HEX HEX"-encoded URL, param keys, and values
         self.getPage("/params/%d4%20%e3/cheese?Gruy%E8re=Bulgn%e9ville")
-        self.assertBody("args: ('\xd4 \xe3', 'cheese') "
-                        "kwargs: {'Gruy\xe8re': 'Bulgn\xe9ville'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('\xd4 \xe3', 'cheese'),
+                         {'Gruy\xe8re': ntou('Bulgn\xe9ville')}))
         
         # Make sure that encoded = and & get parsed correctly
         self.getPage("/params/code?url=http%3A//cherrypy.org/index%3Fa%3D1%26b%3D2")
-        self.assertBody(r"args: ('code',) "
-                        r"kwargs: {'url': 'http://cherrypy.org/index?a=1&b=2'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('code',),
+                         {'url': ntou('http://cherrypy.org/index?a=1&b=2')}))
         
         # Test coordinates sent by <img ismap>
         self.getPage("/params/ismap?223,114")
         
         # Test "name[key]" dict-like params
         self.getPage("/params/dictlike?a[1]=1&a[2]=2&b=foo&b[bar]=baz")
-        self.assertBody(
-            "args: ('dictlike',) "
-            "kwargs: {'a[1]': '1', 'b[bar]': 'baz', 'b': 'foo', 'a[2]': '2'}")
+        self.assertBody("args: %s kwargs: %s" %
+                        (('dictlike',),
+                         {'a[1]': ntou('1'), 'b[bar]': ntou('baz'),
+                          'b': ntou('foo'), 'a[2]': ntou('2')}))
 
     def testParamErrors(self):
 
     
     def test_encoded_headers(self):
         # First, make sure the innards work like expected.
-        self.assertEqual(httputil.decode_TEXT("=?utf-8?q?f=C3=BCr?="), "f\xfcr")
+        self.assertEqual(httputil.decode_TEXT(ntou("=?utf-8?q?f=C3=BCr?=")), ntou("f\xfcr"))
         
         if cherrypy.server.protocol_version == "HTTP/1.1":
             # Test RFC-2047-encoded request and response header values
-            u = '\u212bngstr\xf6m'
-            c = "=E2=84=ABngstr=C3=B6m"
-            self.getPage("/headers/ifmatch", [('If-Match', '=?utf-8?q?%s?=' % c)])
+            u = ntou('\u212bngstr\xf6m', 'escape')
+            c = ntou("=E2=84=ABngstr=C3=B6m")
+            self.getPage("/headers/ifmatch", [('If-Match', ntou('=?utf-8?q?%s?=') % c)])
             # The body should be utf-8 encoded.
-            self.assertBody(b"\xe2\x84\xabngstr\xc3\xb6m")
+            self.assertBody(ntob("\xe2\x84\xabngstr\xc3\xb6m"))
             # But the Etag header should be RFC-2047 encoded (binary)
-            self.assertHeader("ETag", '=?utf-8?b?4oSrbmdzdHLDtm0=?=')
+            self.assertHeader("ETag", ntou('=?utf-8?b?4oSrbmdzdHLDtm0=?='))
             
             # Test a *LONG* RFC-2047-encoded request and response header value
             self.getPage("/headers/ifmatch",
-                         [('If-Match', '=?utf-8?q?%s?=' % (c * 10))])
-            self.assertBody(b"\xe2\x84\xabngstr\xc3\xb6m" * 10)
+                         [('If-Match', ntou('=?utf-8?q?%s?=') % (c * 10))])
+            self.assertBody(ntob("\xe2\x84\xabngstr\xc3\xb6m") * 10)
             # Note: this is different output for Python3, but it decodes fine.
             etag = self.assertHeader("ETag",
                 '=?utf-8?b?4oSrbmdzdHLDtm3ihKtuZ3N0csO2beKEq25nc3Ryw7Zt'

py3/cherrypy/test/test_wsgi_ns.py

 import cherrypy
+from cherrypy._cpcompat import ntob
 from cherrypy.test import helper
 
 
             def __iter__(self):
                 return self
             
+            def next(self):
+                return self.iter.next()
             def __next__(self):
                 return next(self.iter)
             
             def __call__(self, environ, start_response):
                 res = self.app(environ, start_response)
                 class CaseResults(WSGIResponse):
+                    def next(this):
+                        return getattr(this.iter.next(), self.to)()
                     def __next__(this):
                         return getattr(next(this.iter), self.to)()
                 return CaseResults(res)
             def __call__(self, environ, start_response):
                 res = self.app(environ, start_response)
                 class ReplaceResults(WSGIResponse):
+                    def next(this):
+                        line = this.iter.next()
+                        for k, v in self.map.iteritems():
+                            line = line.replace(k, v)
+                        return line
                     def __next__(this):
                         line = next(this.iter)
                         for k, v in self.map.items():
         
         
         root_conf = {'wsgi.pipeline': [('replace', Replacer)],
-                     'wsgi.replace.map': {b'L': b'X', b'l': b'r'},
+                     'wsgi.replace.map': {ntob('L'): ntob('X'),
+                                          ntob('l'): ntob('r')},
                      }
         
         app = cherrypy.Application(Root())

py3/cherrypy/wsgiserver/ssl_pyopenssl.py

+"""A library for integrating pyOpenSSL with CherryPy.
+
+The OpenSSL module must be importable for SSL functionality.
+You can obtain it from http://pyopenssl.sourceforge.net/
+
+To use this module, set CherryPyWSGIServer.ssl_adapter to an instance of
+SSLAdapter. There are two ways to use SSL:
+
+Method One
+----------
+
+ * ``ssl_adapter.context``: an instance of SSL.Context.
+
+If this is not None, it is assumed to be an SSL.Context instance,
+and will be passed to SSL.Connection on bind(). The developer is
+responsible for forming a valid Context object. This approach is
+to be preferred for more flexibility, e.g. if the cert and key are
+streams instead of files, or need decryption, or SSL.SSLv3_METHOD
+is desired instead of the default SSL.SSLv23_METHOD, etc. Consult
+the pyOpenSSL documentation for complete options.
+
+Method Two (shortcut)
+---------------------
+
+ * ``ssl_adapter.certificate``: the filename of the server SSL certificate.
+ * ``ssl_adapter.private_key``: the filename of the server's private key file.
+
+Both are None by default. If ssl_adapter.context is None, but .private_key
+and .certificate are both given and valid, they will be read, and the
+context will be automatically created from them.
+"""
+
+import socket
+import threading
+import time
+
+from cherrypy import wsgiserver
+
+try:
+    from OpenSSL import SSL
+    from OpenSSL import crypto
+except ImportError:
+    SSL = None
+
+
+class SSL_fileobject(wsgiserver.CP_fileobject):
+    """SSL file object attached to a socket object."""
+    
+    ssl_timeout = 3
+    ssl_retry = .01
+    
+    def _safe_call(self, is_reader, call, *args, **kwargs):
+        """Wrap the given call with SSL error-trapping.
+        
+        is_reader: if False EOF errors will be raised. If True, EOF errors
+        will return "" (to emulate normal sockets).
+        """
+        start = time.time()
+        while True:
+            try:
+                return call(*args, **kwargs)
+            except SSL.WantReadError:
+                # Sleep and try again. This is dangerous, because it means
+                # the rest of the stack has no way of differentiating
+                # between a "new handshake" error and "client dropped".
+                # Note this isn't an endless loop: there's a timeout below.
+                time.sleep(self.ssl_retry)
+            except SSL.WantWriteError:
+                time.sleep(self.ssl_retry)
+            except SSL.SysCallError, e:
+                if is_reader and e.args == (-1, 'Unexpected EOF'):
+                    return ""
+                
+                errnum = e.args[0]
+                if is_reader and errnum in wsgiserver.socket_errors_to_ignore:
+                    return ""
+                raise socket.error(errnum)
+            except SSL.Error, e:
+                if is_reader and e.args == (-1, 'Unexpected EOF'):
+                    return ""
+                
+                thirdarg = None
+                try:
+                    thirdarg = e.args[0][0][2]
+                except IndexError:
+                    pass
+                
+                if thirdarg == 'http request':
+                    # The client is talking HTTP to an HTTPS server.
+                    raise wsgiserver.NoSSLError()
+                
+                raise wsgiserver.FatalSSLAlert(*e.args)
+            except:
+                raise
+            
+            if time.time() - start > self.ssl_timeout:
+                raise socket.timeout("timed out")
+    
+    def recv(self, *args, **kwargs):
+        buf = []
+        r = super(SSL_fileobject, self).recv
+        while True:
+            data = self._safe_call(True, r, *args, **kwargs)
+            buf.append(data)
+            p = self._sock.pending()
+            if not p:
+                return "".join(buf)
+    
+    def sendall(self, *args, **kwargs):
+        return self._safe_call(False, super(SSL_fileobject, self).sendall,
+                               *args, **kwargs)
+
+    def send(self, *args, **kwargs):
+        return self._safe_call(False, super(SSL_fileobject, self).send,
+                               *args, **kwargs)
+
+
+class SSLConnection:
+    """A thread-safe wrapper for an SSL.Connection.
+    
+    ``*args``: the arguments to create the wrapped ``SSL.Connection(*args)``.
+    """
+    
+    def __init__(self, *args):
+        self._ssl_conn = SSL.Connection(*args)
+        self._lock = threading.RLock()
+    
+    for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read',
+              'renegotiate', 'bind', 'listen', 'connect', 'accept',
+              'setblocking', 'fileno', 'close', 'get_cipher_list',
+              'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
+              'makefile', 'get_app_data', 'set_app_data', 'state_string',
+              'sock_shutdown', 'get_peer_certificate', 'want_read',
+              'want_write', 'set_connect_state', 'set_accept_state',
+              'connect_ex', 'sendall', 'settimeout', 'gettimeout'):
+        exec("""def %s(self, *args):
+        self._lock.acquire()
+        try:
+            return self._ssl_conn.%s(*args)
+        finally:
+            self._lock.release()
+""" % (f, f))
+    
+    def shutdown(self, *args):
+        self._lock.acquire()
+        try:
+            # pyOpenSSL.socket.shutdown takes no args
+            return self._ssl_conn.shutdown()
+        finally:
+            self._lock.release()
+
+
+class pyOpenSSLAdapter(wsgiserver.SSLAdapter):
+    """A wrapper for integrating pyOpenSSL with CherryPy."""
+    
+    context = None
+    """An instance of SSL.Context."""
+    
+    certificate = None
+    """The filename of the server SSL certificate."""
+    
+    private_key = None
+    """The filename of the server's private key file."""
+    
+    certificate_chain = None
+    """Optional. The filename of CA's intermediate certificate bundle.
+    
+    This is needed for cheaper "chained root" SSL certificates, and should be
+    left as None if not required."""
+    
+    def __init__(self, certificate, private_key, certificate_chain=None):
+        if SSL is None:
+            raise ImportError("You must install pyOpenSSL to use HTTPS.")
+        
+        self.context = None
+        self.certificate = certificate
+        self.private_key = private_key
+        self.certificate_chain = certificate_chain
+        self._environ = None
+    
+    def bind(self, sock):
+        """Wrap and return the given socket."""
+        if self.context is None:
+            self.context = self.get_context()
+        conn = SSLConnection(self.context, sock)
+        self._environ = self.get_environ()
+        return conn
+    
+    def wrap(self, sock):
+        """Wrap and return the given socket, plus WSGI environ entries."""
+        return sock, self._environ.copy()
+    
+    def get_context(self):
+        """Return an SSL.Context from self attributes."""
+        # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
+        c = SSL.Context(SSL.SSLv23_METHOD)
+        c.use_privatekey_file(self.private_key)
+        if self.certificate_chain:
+            c.load_verify_locations(self.certificate_chain)
+        c.use_certificate_file(self.certificate)
+        return c
+    
+    def get_environ(self):
+        """Return WSGI environ entries to be merged into each request."""
+        ssl_environ = {
+            "HTTPS": "on",
+            # pyOpenSSL doesn't provide access to any of these AFAICT
+##            'SSL_PROTOCOL': 'SSLv2',
+##            SSL_CIPHER 	string 	The cipher specification name
+##            SSL_VERSION_INTERFACE 	string 	The mod_ssl program version
+##            SSL_VERSION_LIBRARY 	string 	The OpenSSL program version
+            }
+        
+        if self.certificate:
+            # Server certificate attributes
+            cert = open(self.certificate, 'rb').read()
+            cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
+            ssl_environ.update({
+                'SSL_SERVER_M_VERSION': cert.get_version(),
+                'SSL_SERVER_M_SERIAL': cert.get_serial_number(),
+##                'SSL_SERVER_V_START': Validity of server's certificate (start time),
+##                'SSL_SERVER_V_END': Validity of server's certificate (end time),
+                })
+            
+            for prefix, dn in [("I", cert.get_issuer()),
+                               ("S", cert.get_subject())]:
+                # X509Name objects don't seem to have a way to get the
+                # complete DN string. Use str() and slice it instead,
+                # because str(dn) == "<X509Name object '/C=US/ST=...'>"
+                dnstr = str(dn)[18:-2]
+                
+                wsgikey = 'SSL_SERVER_%s_DN' % prefix
+                ssl_environ[wsgikey] = dnstr
+                
+                # The DN should be of the form: /k1=v1/k2=v2, but we must allow
+                # for any value to contain slashes itself (in a URL).
+                while dnstr:
+                    pos = dnstr.rfind("=")
+                    dnstr, value = dnstr[:pos], dnstr[pos + 1:]
+                    pos = dnstr.rfind("/")
+                    dnstr, key = dnstr[:pos], dnstr[pos + 1:]
+                    if key and value:
+                        wsgikey = 'SSL_SERVER_%s_DN_%s' % (prefix, key)
+                        ssl_environ[wsgikey] = value
+        
+        return ssl_environ
+    
+    def makefile(self, sock, mode='r', bufsize=-1):
+        if SSL and isinstance(sock, SSL.ConnectionType):
+            timeout = sock.gettimeout()
+            f = SSL_fileobject(sock, mode, bufsize)
+            f.ssl_timeout = timeout
+            return f
+        else:
+            return wsgiserver.CP_fileobject(sock, mode, bufsize)
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.