Jason R. Coombs avatar Jason R. Coombs committed 6ce6d1e Merge

Merge with cherrypy-3.2.x

Comments (0)

Files changed (6)

Add a comment to this file

cherrypy/lib/sessions.py

File contents unchanged.

cherrypy/test/helper.py

         td = getattr(self, 'teardown', None)
         if td:
             td()
-        
+
         cherrypy.engine.exit()
-        
+
         for name, server in copyitems(getattr(cherrypy, 'servers', {})):
             server.unsubscribe()
             del cherrypy.servers[name]
         """Obtain a new (decorated) WSGI app to hook into the origin server."""
         if app is None:
             app = cherrypy.tree
-        
+
         if self.conquer:
             try:
                 import wsgiconq
                 warnings.warn("Error importing wsgiconq. pyconquer will not run.")
             else:
                 app = wsgiconq.WSGILogger(app, c_calls=True)
-        
+
         if self.validate:
             try:
                 from wsgiref import validate
             else:
                 #wraps the app in the validator
                 app = validate.validator(app)
-        
+
         return app
 
 
 
 
 class CPWebCase(webtest.WebCase):
- 
+
     script_name = ""
     scheme = "http"
 
                          'modfastcgi': get_modfastcgi_supervisor,
                          }
     default_server = "wsgi"
-    
+
     def _setup_server(cls, supervisor, conf):
         v = sys.version.split()[0]
         log.info("Python version used to run this test script: %s" % v)
             webtest.WebCase.HTTP_CONN = HTTPSConnection
         return baseconf
     _setup_server = classmethod(_setup_server)
-    
+
     def setup_class(cls):
         ''
         #Creates a server
         if hasattr(cls, 'setup_server'):
             cls.supervisor.stop()
     teardown_class = classmethod(teardown_class)
-    
+
     do_gc_test = False
-    
+
     def test_gc(self):
         if self.do_gc_test:
             self.getPage("/gc/stats")
     # Tell nose to run this last in each class.
     # Prefer sys.maxint for Python 2.3, which didn't have float('inf')
     test_gc.compat_co_firstlineno = getattr(sys, 'maxint', None) or float('inf')
-    
+
     def prefix(self):
         return self.script_name.rstrip("/")
-    
+
     def base(self):
         if ((self.scheme == "http" and self.PORT == 80) or
             (self.scheme == "https" and self.PORT == 443)):
             port = ""
         else:
             port = ":%s" % self.PORT
-        
+
         return "%s://%s%s%s" % (self.scheme, self.HOST, port,
                                 self.script_name.rstrip("/"))
-    
+
     def exit(self):
         sys.exit()
-    
+
     def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
         """Open the url. Return status, headers, body."""
         if self.script_name:
             url = httputil.urljoin(self.script_name, url)
         return webtest.WebCase.getPage(self, url, headers, method, body, protocol)
-    
+
     def skip(self, msg='skipped '):
         raise nose.SkipTest(msg)
-    
+
     def assertErrorPage(self, status, message=None, pattern=''):
         """Compare the response body with a built in error page.
-        
+
         The function will optionally look for the regexp pattern,
         within the exception embedded in the error page."""
-        
+
         # This will never contain a traceback
         page = cherrypy._cperror.get_error_page(status, message=message)
-        
+
         # First, test the response body without checking the traceback.
         # Stick a match-all group (.*) in to grab the traceback.
         esc = re.escape
         if not m:
             self._handlewebError('Error page does not match; expected:\n' + page)
             return
-        
+
         # Now test the pattern against the traceback
         if pattern is None:
             # Special-case None to mean that there should be *no* traceback.
                               m.group(1))):
                 msg = 'Error page does not contain %s in traceback'
                 self._handlewebError(msg % repr(pattern))
-    
+
     date_tolerance = 2
-    
+
     def assertEqualDates(self, dt1, dt2, seconds=None):
         """Assert abs(dt1 - dt2) is within Y seconds."""
         if seconds is None:
             seconds = self.date_tolerance
-        
+
         if dt1 > dt2:
             diff = dt1 - dt2
         else:
 
 
 class CPProcess(object):
-    
+
     pid_file = os.path.join(thisdir, 'test.pid')
     config_file = os.path.join(thisdir, 'test.conf')
     config_template = """[global]
 """
     error_log = os.path.join(thisdir, 'test.error.log')
     access_log = os.path.join(thisdir, 'test.access.log')
-    
+
     def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
         self.wait = wait
         self.daemonize = daemonize
         self.ssl = ssl
         self.host = socket_host or cherrypy.server.socket_host
         self.port = socket_port or cherrypy.server.socket_port
-    
+
     def write_conf(self, extra=""):
         if self.ssl:
             serverpem = os.path.join(thisdir, 'test.pem')
 """ % (serverpem, serverpem)
         else:
             ssl = ""
-        
+
         conf = self.config_template % {
             'host': self.host,
             'port': self.port,
         f = open(self.config_file, 'wb')
         f.write(ntob(conf, 'utf-8'))
         f.close()
-    
+
     def start(self, imports=None):
         """Start cherryd in a subprocess."""
         cherrypy._cpserver.wait_for_free_port(self.host, self.port)
-        
+
         args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'),
                 '-c', self.config_file, '-p', self.pid_file]
-        
+
         if not isinstance(imports, (list, tuple)):
             imports = [imports]
         for i in imports:
             if i:
                 args.append('-i')
                 args.append(i)
-        
+
         if self.daemonize:
             args.append('-d')
 
         else:
             os.spawnve(os.P_NOWAIT, sys.executable, args, env)
             cherrypy._cpserver.wait_for_occupied_port(self.host, self.port)
-        
+
         # Give the engine a wee bit more time to finish STARTING
         if self.daemonize:
             time.sleep(2)
         else:
             time.sleep(1)
-    
+
     def get_pid(self):
         return int(open(self.pid_file, 'rb').read())
-    
+
     def join(self):
         """Wait for the process to exit."""
         try:

cherrypy/test/test_states.py

 from cherrypy._cpcompat import BadStatusLine, ntob
 import os
 import sys
-import threading
 import time
+import signal
 
 import cherrypy
 engine = cherrypy.engine
             self.getPage("/exit")
         p.join()
 
+    def _require_signal_and_kill(self, signal_name):
+        if not hasattr(signal, signal_name):
+            self.skip("skipped (no %(signal_name)s)" % vars())
+
+        if not hasattr(os, 'kill'):
+            self.skip("skipped (no os.kill)")
+
     def test_SIGTERM(self):
-        # SIGTERM should shut down the server whether daemonized or not.
-        try:
-            from signal import SIGTERM
-        except ImportError:
-            return self.skip("skipped (no SIGTERM) ")
-
-        try:
-            from os import kill
-        except ImportError:
-            return self.skip("skipped (no os.kill) ")
+        "SIGTERM should shut down the server whether daemonized or not."
+        self._require_signal_and_kill('SIGTERM')
 
         # Spawn a normal, undaemonized process.
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
                 extra='test_case_name: "test_SIGTERM"')
         p.start(imports='cherrypy.test._test_states_demo')
         # Send a SIGTERM
-        os.kill(p.get_pid(), SIGTERM)
+        os.kill(p.get_pid(), signal.SIGTERM)
         # This might hang if things aren't working right, but meh.
         p.join()
 
                  extra='test_case_name: "test_SIGTERM_2"')
             p.start(imports='cherrypy.test._test_states_demo')
             # Send a SIGTERM
-            os.kill(p.get_pid(), SIGTERM)
+            os.kill(p.get_pid(), signal.SIGTERM)
             # This might hang if things aren't working right, but meh.
             p.join()
 
     def test_signal_handler_unsubscribe(self):
-        try:
-            from signal import SIGTERM
-        except ImportError:
-            return self.skip("skipped (no SIGTERM) ")
+        self._require_signal_and_kill('SIGTERM')
 
-        try:
-            from os import kill
-        except ImportError:
-            return self.skip("skipped (no os.kill) ")
+        # Although Windows has `os.kill` and SIGTERM is defined, the
+        #  platform does not implement signals and sending SIGTERM
+        #  will result in a forced termination of the process.
+        #  Therefore, this test is not suitable for Windows.
+        if os.name == 'nt':
+            self.skip("SIGTERM not available")
 
         # Spawn a normal, undaemonized process.
         p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
 test_case_name: "test_signal_handler_unsubscribe"
 """)
         p.start(imports='cherrypy.test._test_states_demo')
-        # Send a SIGTERM
-        os.kill(p.get_pid(), SIGTERM)
+        # Ask the process to quit
+        os.kill(p.get_pid(), signal.SIGTERM)
         # This might hang if things aren't working right, but meh.
         p.join()
 
         target_line = open(p.error_log, 'rb').readlines()[-10]
         if not ntob("I am an old SIGTERM handler.") in target_line:
             self.fail("Old SIGTERM handler did not run.\n%r" % target_line)
-

cherrypy/test/test_static.py

 has_space_filepath = os.path.join(curdir, 'static', 'has space.html')
 bigfile_filepath = os.path.join(curdir, "static", "bigfile.log")
 BIGFILE_SIZE = 1024 * 1024
-import threading
 
 import cherrypy
 from cherrypy.lib import static
             open(has_space_filepath, 'wb').write(ntob('Hello, world\r\n'))
         if not os.path.exists(bigfile_filepath):
             open(bigfile_filepath, 'wb').write(ntob("x" * BIGFILE_SIZE))
-        
+
         class Root:
-            
+
             def bigfile(self):
                 from cherrypy.lib import static
                 self.f = static.serve_file(bigfile_filepath)
                 return self.f
             bigfile.exposed = True
             bigfile._cp_config = {'response.stream': True}
-            
+
             def tell(self):
                 if self.f.input.closed:
                     return ''
                 return repr(self.f.input.tell()).rstrip('L')
             tell.exposed = True
-            
+
             def fileobj(self):
                 f = open(os.path.join(curdir, 'style.css'), 'rb')
                 return static.serve_fileobj(f, content_type='text/css')
             fileobj.exposed = True
-            
+
             def bytesio(self):
                 f = BytesIO(ntob('Fee\nfie\nfo\nfum'))
                 return static.serve_fileobj(f, content_type='text/plain')
             bytesio.exposed = True
-        
+
         class Static:
-            
+
             def index(self):
                 return 'You want the Baron? You can have the Baron!'
             index.exposed = True
-            
+
             def dynamic(self):
                 return "This is a DYNAMIC page"
             dynamic.exposed = True
-        
-        
+
+
         root = Root()
         root.static = Static()
-        
+
         rootconf = {
             '/static': {
                 'tools.staticdir.on': True,
             }
         rootApp = cherrypy.Application(root)
         rootApp.merge(rootconf)
-        
+
         test_app_conf = {
             '/test': {
                 'tools.staticdir.index': 'index.html',
             }
         testApp = cherrypy.Application(Static())
         testApp.merge(test_app_conf)
-        
+
         vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp})
         cherrypy.tree.graft(vhost)
     setup_server = staticmethod(setup_server)
                     pass
     teardown_server = staticmethod(teardown_server)
 
-    
+
     def testStatic(self):
         self.getPage("/static/index.html")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/html')
         self.assertBody('Hello, world\r\n')
-        
+
         # Using a staticdir.root value in a subdir...
         self.getPage("/docroot/index.html")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/html')
         self.assertBody('Hello, world\r\n')
-        
+
         # Check a filename with spaces in it
         self.getPage("/static/has%20space.html")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/html')
         self.assertBody('Hello, world\r\n')
-        
+
         self.getPage("/style.css")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/css')
         #   into \r\n on Windows when extracting the CherryPy tarball so
         #   we just check the content
         self.assertMatchesBody('^Dummy stylesheet')
-    
+
     def test_fallthrough(self):
         # Test that NotFound will then try dynamic handlers (see [878]).
         self.getPage("/static/dynamic")
         self.assertBody("This is a DYNAMIC page")
-        
+
         # Check a directory via fall-through to dynamic handler.
         self.getPage("/static/")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/html;charset=utf-8')
         self.assertBody('You want the Baron? You can have the Baron!')
-    
+
     def test_index(self):
         # Check a directory via "staticdir.index".
         self.getPage("/docroot/")
         self.assertHeader('Location', '%s/docroot/' % self.base())
         self.assertMatchesBody("This resource .* <a href='%s/docroot/'>"
                                "%s/docroot/</a>." % (self.base(), self.base()))
-    
+
     def test_config_errors(self):
         # Check that we get an error if no .file or .dir
         self.getPage("/error/thing.html")
         self.assertErrorPage(500)
         self.assertMatchesBody(ntob("TypeError: staticdir\(\) takes at least 2 "
                                     "(positional )?arguments \(0 given\)"))
-    
+
     def test_security(self):
         # Test up-level security
         self.getPage("/static/../../test/style.css")
         self.assertStatus((400, 403))
-    
+
     def test_modif(self):
         # Test modified-since on a reasonably-large file
         self.getPage("/static/dirback.jpg")
         self.assertNoHeader("Content-Length")
         self.assertNoHeader("Content-Disposition")
         self.assertBody("")
-    
+
     def test_755_vhost(self):
         self.getPage("/test/", [('Host', 'virt.net')])
         self.assertStatus(200)
         self.getPage("/test", [('Host', 'virt.net')])
         self.assertStatus(301)
         self.assertHeader('Location', self.scheme + '://virt.net/test/')
-    
+
     def test_serve_fileobj(self):
         self.getPage("/fileobj")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/css;charset=utf-8')
         self.assertMatchesBody('^Dummy stylesheet')
-    
+
     def test_serve_bytesio(self):
         self.getPage("/bytesio")
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/plain;charset=utf-8')
         self.assertHeader('Content-Length', 14)
         self.assertMatchesBody('Fee\nfie\nfo\nfum')
-    
+
     def test_file_stream(self):
         if cherrypy.server.protocol_version != "HTTP/1.1":
             return self.skip()
-        
+
         self.PROTOCOL = "HTTP/1.1"
-        
+
         # Make an initial request
         self.persistent = True
         conn = self.HTTP_CONN
         response = conn.response_class(conn.sock, method="GET")
         response.begin()
         self.assertEqual(response.status, 200)
-        
+
         body = ntob('')
         remaining = BIGFILE_SIZE
         while remaining > 0:
                 break
             body += data
             remaining -= len(data)
-            
+
             if self.scheme == "https":
                 newconn = HTTPSConnection
             else:
                 tell_position = BIGFILE_SIZE
             else:
                 tell_position = int(b)
-            
+
             expected = len(body)
             if tell_position >= BIGFILE_SIZE:
                 # We can't exactly control how much content the server asks for.
                     "only advanced to position %r. It may not be streamed "
                     "as intended, or at the wrong chunk size (65536)" %
                     (expected, tell_position))
-        
+
         if body != ntob("x" * BIGFILE_SIZE):
             self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
                       (BIGFILE_SIZE, body[:50], len(body)))
         conn.close()
-    
+
     def test_file_stream_deadlock(self):
         if cherrypy.server.protocol_version != "HTTP/1.1":
             return self.skip()
-        
+
         self.PROTOCOL = "HTTP/1.1"
-        
+
         # Make an initial request but abort early.
         self.persistent = True
         conn = self.HTTP_CONN
                       (65536, body[:50], len(body)))
         response.close()
         conn.close()
-        
+
         # Make a second request, which should fetch the whole file.
         self.persistent = False
         self.getPage("/bigfile")
         if self.body != ntob("x" * BIGFILE_SIZE):
             self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
                       (BIGFILE_SIZE, self.body[:50], len(body)))
-

cherrypy/test/test_virtualhost.py

             def index(self):
                 return "Hello, world"
             index.exposed = True
-            
+
             def dom4(self):
                 return "Under construction"
             dom4.exposed = True
-            
+
             def method(self, value):
                 return "You sent %s" % value
             method.exposed = True
-        
+
         class VHost:
             def __init__(self, sitename):
                 self.sitename = sitename
-            
+
             def index(self):
                 return "Welcome to %s" % self.sitename
             index.exposed = True
-            
+
             def vmethod(self, value):
                 return "You sent %s" % value
             vmethod.exposed = True
-            
+
             def url(self):
                 return cherrypy.url("nextpage")
             url.exposed = True
-            
+
             # Test static as a handler (section must NOT include vhost prefix)
             static = cherrypy.tools.staticdir.handler(section='/static', dir=curdir)
-        
+
         root = Root()
         root.mydom2 = VHost("Domain 2")
         root.mydom3 = VHost("Domain 3")
                                 },
             })
     setup_server = staticmethod(setup_server)
-    
+
     def testVirtualHost(self):
         self.getPage("/", [('Host', 'www.mydom1.com')])
         self.assertBody('Hello, world')
         self.getPage("/mydom2/", [('Host', 'www.mydom1.com')])
         self.assertBody('Welcome to Domain 2')
-        
+
         self.getPage("/", [('Host', 'www.mydom2.com')])
         self.assertBody('Welcome to Domain 2')
         self.getPage("/", [('Host', 'www.mydom3.com')])
         self.assertBody('Welcome to Domain 3')
         self.getPage("/", [('Host', 'www.mydom4.com')])
         self.assertBody('Under construction')
-        
+
         # Test GET, POST, and positional params
         self.getPage("/method?value=root")
         self.assertBody("You sent root")
         self.assertBody("You sent dom3 POST")
         self.getPage("/vmethod/pos", [('Host', 'www.mydom3.com')])
         self.assertBody("You sent pos")
-        
+
         # Test that cherrypy.url uses the browser url, not the virtual url
         self.getPage("/url", [('Host', 'www.mydom2.com')])
         self.assertBody("%s://www.mydom2.com/nextpage" % self.scheme)
-    
+
     def test_VHost_plus_Static(self):
         # Test static as a handler
         self.getPage("/static/style.css", [('Host', 'www.mydom2.com')])
         self.assertStatus('200 OK')
         self.assertHeader('Content-Type', 'text/css;charset=utf-8')
-        
+
         # Test static in config
         self.getPage("/static2/dirback.jpg", [('Host', 'www.mydom2.com')])
         self.assertStatus('200 OK')
-        self.assertHeader('Content-Type', 'image/jpeg')
-        
+        self.assertHeaderIn('Content-Type', ['image/jpeg', 'image/pjpeg'])
+
         # Test static config with "index" arg
         self.getPage("/static2/", [('Host', 'www.mydom2.com')])
         self.assertStatus('200 OK')
         # Since tools.trailing_slash is on by default, this should redirect
         self.getPage("/static2", [('Host', 'www.mydom2.com')])
         self.assertStatus(301)
-

cherrypy/test/webtest.py

 be of further significance to your tests).
 """
 
-import os
 import pprint
 import re
 import socket
 from cherrypy._cpcompat import basestring, ntob, py3k, HTTPConnection, HTTPSConnection, unicodestr
 
 
-
 def interface(host):
     """Return an IP address for a client connection given the server host.
 
     status = None
     headers = None
     body = None
-    
+
     encoding = 'utf-8'
-    
+
     time = None
 
     def get_conn(self, auto_open=False):
     def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
         """Open the url with debugging support. Return status, headers, body."""
         ServerError.on = False
-        
+
         if isinstance(url, unicodestr):
             url = url.encode('utf-8')
         if isinstance(body, unicodestr):
             body = body.encode('utf-8')
-        
+
         self.url = url
         self.time = None
         start = time.time()
                 msg = '%r:%r not in headers' % (key, value)
         self._handlewebError(msg)
 
+    def assertHeaderIn(self, key, values, msg=None):
+        """Fail if header indicated by key doesn't have one of the values."""
+        lowkey = key.lower()
+        for k, v in self.headers:
+            if k.lower() == lowkey:
+                matches = [value for value in values if str(value) == v]
+                if matches:
+                    return matches
+
+        if msg is None:
+            msg = '%(key)r not in %(values)r' % vars()
+        self._handlewebError(msg)
+
     def assertHeaderItemValue(self, key, value, msg=None):
         """Fail if the header does not contain the specified value"""
         actual_value = self.assertHeader(key, msg=msg)
                 def putrequest(self, method, url):
                     if self._HTTPConnection__response and self._HTTPConnection__response.isclosed():
                         self._HTTPConnection__response = None
-                    
+
                     if self._HTTPConnection__state == http.client._CS_IDLE:
                         self._HTTPConnection__state = http.client._CS_REQ_STARTED
                     else:
                         raise http.client.CannotSendRequest()
-                    
+
                     self._method = method
                     if not url:
                         url = ntob('/')
                     self._output(request)
                 import types
                 conn.putrequest = types.MethodType(putrequest, conn)
-                
+
                 conn.putrequest(method.upper(), url)
 
             for key, value in headers:
         print("")
         print("".join(traceback.format_exception(*exc)))
         return True
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.