Robert Brewer avatar Robert Brewer committed 9affd6e

Fix for #824 (_cplogging.LogManager.access method not handling unicode in login names properly). While I was at it, I made the access log template configurable, moved the log tests from test_core into test_logging, and added a new logtest module.

Comments (0)

Files changed (5)

cherrypy/_cplogging.py

     appid = None
     error_log = None
     access_log = None
+    access_log_format = \
+        '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
     
     def __init__(self, appid=None, logger_root="cherrypy"):
         self.logger_root = logger_root
         return self.error(*args, **kwargs)
     
     def access(self):
-        """Write to the access log (in Apache/NCSA Combined Log format)."""
+        """Write to the access log (in Apache/NCSA Combined Log format).
+        
+        Like Apache started doing in 2.0.46, non-printable and other special
+        characters in %r (and we expand that to all parts) are escaped using
+        \\xhh sequences, where hh stands for the hexadecimal representation
+        of the raw byte. Exceptions from this rule are " and \\, which are
+        escaped by prepending a backslash, and all whitespace characters,
+        which are written in their C-style notation (\\n, \\t, etc).
+        """
         request = cherrypy.request
-        inheaders = request.headers
         remote = request.remote
         response = cherrypy.response
         outheaders = response.headers
-        tmpl = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
-        s = tmpl % {'h': remote.name or remote.ip,
-                    'l': '-',
-                    'u': getattr(request, "login", None) or "-",
-                    't': self.time(),
-                    'r': request.request_line,
-                    's': response.status.split(" ", 1)[0],
-                    'b': outheaders.get('Content-Length', '') or "-",
-                    'f': inheaders.get('Referer', ''),
-                    'a': inheaders.get('User-Agent', ''),
-                    }
+        inheaders = request.headers
+        
+        atoms = {'h': remote.name or remote.ip,
+                 'l': '-',
+                 'u': getattr(request, "login", None) or "-",
+                 't': self.time(),
+                 'r': request.request_line,
+                 's': response.status.split(" ", 1)[0],
+                 'b': outheaders.get('Content-Length', '') or "-",
+                 'f': inheaders.get('Referer', ''),
+                 'a': inheaders.get('User-Agent', ''),
+                 }
+        for k, v in atoms.items():
+            if isinstance(v, unicode):
+                v = v.encode('utf8')
+            elif not isinstance(v, str):
+                v = str(v)
+            # Fortunately, repr(str) escapes unprintable chars, \n, \t, etc
+            # and backslash for us. All we have to do is strip the quotes.
+            v = repr(v)[1:-1]
+            # Escape double-quote.
+            atoms[k] = v.replace('"', '\\"')
+        
         try:
-            self.access_log.log(logging.INFO, s)
+            self.access_log.log(logging.INFO, self.access_log_format % atoms)
         except:
             self(traceback=True)
     

cherrypy/test/logtest.py

+"""logtest, a unittest.TestCase helper for testing log output."""
+
+import sys
+import time
+
+
+try:
+    # On Windows, msvcrt.getch reads a single char without output.
+    import msvcrt
+    def getchar():
+        return msvcrt.getch()
+except ImportError:
+    # Unix getchr
+    import tty, termios
+    def getchar():
+        fd = sys.stdin.fileno()
+        old_settings = termios.tcgetattr(fd)
+        try:
+            tty.setraw(sys.stdin.fileno())
+            ch = sys.stdin.read(1)
+        finally:
+            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
+        return ch
+
+
+class LogCase(object):
+    """unittest.TestCase mixin for testing log messages.
+    
+    logfile: a filename for the desired log. Yes, I know modes are evil,
+        but it makes the test functions so much cleaner to set this once.
+    
+    lastmarker: the last marker in the log. This can be used to search for
+        messages since the last marker.
+    
+    markerPrefix: a string with which to prefix log markers. This should be
+        unique enough from normal log output to use for marker identification.
+    """
+    
+    logfile = None
+    lastmarker = None
+    markerPrefix = "test suite marker: "
+    
+    def _handleLogError(self, msg, data, marker, pattern):
+        print
+        print "    ERROR:", msg
+        
+        if not self.interactive:
+            raise self.failureException(msg)
+        
+        p = "    Show: [L]og [M]arker [P]attern; [I]gnore, [R]aise, or sys.e[X]it >> "
+        print p,
+        while True:
+            i = getchar().upper()
+            if i not in "MPLIRX":
+                continue
+            print i.upper()  # Also prints new line
+            if i == "L":
+                for x, line in enumerate(data):
+                    if (x + 1) % self.console_height == 0:
+                        # The \r and comma should make the next line overwrite
+                        print "<-- More -->\r",
+                        m = getchar().lower()
+                        # Erase our "More" prompt
+                        print "            \r",
+                        if m == "q":
+                            break
+                    print line.rstrip()
+            elif i == "M":
+                print repr(marker or self.lastmarker)
+            elif i == "P":
+                print repr(pattern)
+            elif i == "I":
+                # return without raising the normal exception
+                return
+            elif i == "R":
+                raise self.failureException(msg)
+            elif i == "X":
+                self.exit()
+            print p,
+    
+    def exit(self):
+        sys.exit()
+    
+    def emptyLog(self):
+        """Overwrite self.logfile with 0 bytes."""
+        open(self.logfile, 'wb').write("")
+    
+    def markLog(self, key=None):
+        """Insert a marker line into the log and set self.lastmarker."""
+        if key is None:
+            key = str(time.time())
+        self.lastmarker = key
+        
+        open(self.logfile, 'ab+').write("%s%s\n" % (self.markerPrefix, key))
+    
+    def _read_marked_region(self, marker=None):
+        """Return lines from self.logfile in the marked region.
+        
+        If marker is None, self.lastmarker is used. If the log hasn't
+        been marked (using self.markLog), the entire log will be returned.
+        """
+##        # Give the logger time to finish writing?
+##        time.sleep(0.5)
+        
+        logfile = self.logfile
+        marker = marker or self.lastmarker
+        if marker is None:
+            return open(logfile, 'rb').readlines()
+        
+        data = []
+        in_region = False
+        for line in open(logfile, 'rb'):
+            if in_region:
+                if (line.startswith(self.markerPrefix) and not marker in line):
+                    break
+                else:
+                    data.append(line)
+            elif marker in line:
+                in_region = True
+        return data
+    
+    def assertInLog(self, line, marker=None):
+        """Fail if the given (partial) line is not in the log.
+        
+        The log will be searched from the given marker to the next marker.
+        If marker is None, self.lastmarker is used. If the log hasn't
+        been marked (using self.markLog), the entire log will be searched.
+        """
+        data = self._read_marked_region(marker)
+        for logline in data:
+            if line in logline:
+                return
+        msg = "%r not found in log" % line
+        self._handleLogError(msg, data, marker, line)
+    
+    def assertNotInLog(self, line, marker=None):
+        """Fail if the given (partial) line is in the log.
+        
+        The log will be searched from the given marker to the next marker.
+        If marker is None, self.lastmarker is used. If the log hasn't
+        been marked (using self.markLog), the entire log will be searched.
+        """
+        data = self._read_marked_region(marker)
+        for logline in data:
+            if line in logline:
+                msg = "%r found in log" % line
+                self._handleLogError(msg, data, marker, line)
+    
+    def assertLog(self, sliceargs, lines, marker=None):
+        """Fail if log.readlines()[sliceargs] is not contained in 'lines'.
+        
+        The log will be searched from the given marker to the next marker.
+        If marker is None, self.lastmarker is used. If the log hasn't
+        been marked (using self.markLog), the entire log will be searched.
+        """
+        data = self._read_marked_region(marker)
+        if isinstance(sliceargs, int):
+            # Single arg. Use __getitem__ and allow lines to be str or list.
+            if isinstance(lines, (tuple, list)):
+                lines = lines[0]
+            if lines not in data[sliceargs]:
+                msg = "%r not found on log line %r" % (lines, sliceargs)
+                self._handleLogError(msg, [data[sliceargs]], marker, lines)
+        else:
+            # Multiple args. Use __getslice__ and require lines to be list.
+            if isinstance(lines, tuple):
+                lines = list(lines)
+            elif isinstance(lines, basestring):
+                raise TypeError("The 'lines' arg must be a list when "
+                                "'sliceargs' is a tuple.")
+            
+            start, stop = sliceargs
+            for line, logline in zip(lines, data[start:stop]):
+                if line not in logline:
+                    msg = "%r not found in log" % line
+                    self._handleLogError(msg, data[start:stop], marker, line)
+

cherrypy/test/test.py

         'test_http',
         'test_httpauth',
         'test_httplib',
+        'test_logging',
         'test_objectmapping',
         'test_misc_tools',
         'test_static',

cherrypy/test/test_core.py

 from cherrypy.lib import http, static
 
 
-log_file = os.path.join(localDir, "test.log")
-log_access_file = os.path.join(localDir, "access.log")
 favicon_path = os.path.join(os.getcwd(), localDir, "../favicon.ico")
 
 defined_http_methods = ("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE",
             return existing
     
     cherrypy.config.update({
-        'log.error_file': log_file,
         'environment': 'test_suite',
         'server.max_request_body_size': 200,
         'server.max_request_header_size': 500,
         })
     appconf = {
-        '/': {'log.access_file': log_access_file},
         '/method': {'request.methods_with_bodies': ("POST", "PUT", "PROPFIND")},
         }
     cherrypy.tree.mount(root, config=appconf)
         msg = "Illegal response status from server ('error' is non-numeric)."
         self.assertErrorPage(500, msg)
     
-    def testLogging(self):
-        f = open(log_access_file, "wb")
-        f.write("")
-        f.close()
-        f = open(log_file, "wb")
-        f.write("")
-        f.close()
-        
-        self.getPage("/flatten/as_string",
-                     headers=[('Referer', 'http://www.cherrypy.org/'),
-                              ('User-Agent', 'Mozilla/5.0')])
-        self.assertBody('content')
-        self.assertStatus(200)
-        
-        self.getPage("/flatten/as_yield")
-        self.assertBody('content')
-        self.assertStatus(200)
-        
-        data = open(log_access_file, "rb").readlines()
-        
-        host = self.HOST
-        if host == '0.0.0.0':
-            # INADDR_ANY, which should respond on localhost.
-            host = "127.0.0.1"
-        if host == '::':
-            # IN6ADDR_ANY, which should respond on localhost.
-            host = "::1"
-        intro = '%s - - [' % host
-        
-        if not data[0].startswith(intro):
-            self.fail("%r doesn't start with %r" % (data[0], intro))
-        haslength = False
-        for k, v in self.headers:
-            if k.lower() == 'content-length':
-                haslength = True
-        line = data[-2].strip()
-        if haslength:
-            if not line.endswith('] "GET %s/flatten/as_string HTTP/1.1" 200 7 '
-                                 '"http://www.cherrypy.org/" "Mozilla/5.0"'
-                                 % self.prefix()):
-                self.fail(line)
-        else:
-            if not line.endswith('] "GET %s/flatten/as_string HTTP/1.1" 200 - '
-                                 '"http://www.cherrypy.org/" "Mozilla/5.0"'
-                                 % self.prefix()):
-                self.fail(line)
-        
-        if not data[-1].startswith(intro):
-            self.fail("%r doesn't start with %r" % (data[-1], intro))
-        haslength = False
-        for k, v in self.headers:
-            if k.lower() == 'content-length':
-                haslength = True
-        line = data[-1].strip()
-        if haslength:
-            self.assert_(line.endswith('] "GET %s/flatten/as_yield HTTP/1.1" 200 7 "" ""'
-                                          % self.prefix()))
-        else:
-            self.assert_(line.endswith('] "GET %s/flatten/as_yield HTTP/1.1" 200 - "" ""'
-                                          % self.prefix()))
-        
-        ignore = helper.webtest.ignored_exceptions
-        ignore.append(ValueError)
-        try:
-            # Test that tracebacks get written to the error log.
-            self.getPage("/error/page_method")
-            self.assertInBody("raise ValueError()")
-            data = open(log_file, "rb").readlines()
-            self.assertEqual(data[0].strip().endswith('HTTP Traceback (most recent call last):'), True)
-            self.assertEqual(data[-3].strip().endswith('raise ValueError()'), True)
-        finally:
-            ignore.pop()
-    
     def testSlashes(self):
         # Test that requests for index methods without a trailing slash
         # get redirected to the same URI path with a trailing slash.

cherrypy/test/test_logging.py

+"""Basic tests for the CherryPy core: request handling."""
+
+from cherrypy.test import test
+test.prefer_parent_path()
+
+import os
+localDir = os.path.dirname(__file__)
+
+import cherrypy
+
+access_log = os.path.join(localDir, "access.log")
+error_log = os.path.join(localDir, "error.log")
+
+# Some unicode strings.
+tartaros = u'\u03a4\u1f71\u03c1\u03c4\u03b1\u03c1\u03bf\u03c2'
+erebos = u'\u0388\u03c1\u03b5\u03b2\u03bf\u03c2.com'
+
+
+def setup_server():
+    class Root:
+        
+        def index(self):
+            return "hello"
+        index.exposed = True
+        
+        def uni_code(self):
+            cherrypy.request.login = tartaros
+            cherrypy.request.remote.name = erebos
+        uni_code.exposed = True
+        
+        def slashes(self):
+            cherrypy.request.request_line = r'GET /slashed\path HTTP/1.1'
+        slashes.exposed = True
+        
+        def whitespace(self):
+            # User-Agent = "User-Agent" ":" 1*( product | comment )
+            # comment    = "(" *( ctext | quoted-pair | comment ) ")"
+            # ctext      = <any TEXT excluding "(" and ")">
+            # TEXT       = <any OCTET except CTLs, but including LWS>
+            # LWS        = [CRLF] 1*( SP | HT )
+            cherrypy.request.headers['User-Agent'] = 'Browzuh (1.0\r\n\t\t.3)'
+        whitespace.exposed = True
+        
+        def as_string(self):
+            return "content"
+        as_string.exposed = True
+        
+        def as_yield(self):
+            yield "content"
+        as_yield.exposed = True
+        
+        def error(self):
+            raise ValueError()
+        error.exposed = True
+        error._cp_config = {'tools.log_tracebacks.on': True}
+    
+    root = Root()
+
+
+    cherrypy.config.update({'log.error_file': error_log,
+                            'log.access_file': access_log,
+                            'environment': 'test_suite',
+                            })
+    cherrypy.tree.mount(root)
+
+
+
+from cherrypy.test import helper, logtest
+
+class AccessLogTests(helper.CPWebCase, logtest.LogCase):
+    
+    logfile = access_log
+    
+    def testNormalReturn(self):
+        self.markLog()
+        self.getPage("/as_string",
+                     headers=[('Referer', 'http://www.cherrypy.org/'),
+                              ('User-Agent', 'Mozilla/5.0')])
+        self.assertBody('content')
+        self.assertStatus(200)
+        
+        host = self.HOST
+        if host == '0.0.0.0':
+            # INADDR_ANY, which should respond on localhost.
+            host = "127.0.0.1"
+        if host == '::':
+            # IN6ADDR_ANY, which should respond on localhost.
+            host = "::1"
+        intro = '%s - - [' % host
+        
+        self.assertLog(-1, intro)
+        
+        if [k for k, v in self.headers if k.lower() == 'content-length']:
+            self.assertLog(-1, '] "GET %s/as_string HTTP/1.1" 200 7 '
+                           '"http://www.cherrypy.org/" "Mozilla/5.0"'
+                           % self.prefix())
+        else:
+            self.assertLog(-1, '] "GET %s/as_string HTTP/1.1" 200 - '
+                           '"http://www.cherrypy.org/" "Mozilla/5.0"'
+                           % self.prefix())
+    
+    def testNormalYield(self):
+        self.markLog()
+        self.getPage("/as_yield")
+        self.assertBody('content')
+        self.assertStatus(200)
+        
+        host = self.HOST
+        if host == '0.0.0.0':
+            # INADDR_ANY, which should respond on localhost.
+            host = "127.0.0.1"
+        if host == '::':
+            # IN6ADDR_ANY, which should respond on localhost.
+            host = "::1"
+        intro = '%s - - [' % host
+        
+        self.assertLog(-1, intro)
+        if [k for k, v in self.headers if k.lower() == 'content-length']:
+            self.assertLog(-1, '] "GET %s/as_yield HTTP/1.1" 200 7 "" ""' %
+                           self.prefix())
+        else:
+            self.assertLog(-1, '] "GET %s/as_yield HTTP/1.1" 200 - "" ""'
+                           % self.prefix())
+    
+    def testEscapedOutput(self):
+        # Test unicode in access log pieces.
+        self.markLog()
+        self.getPage("/uni_code")
+        self.assertStatus(200)
+        self.assertLog(-1, repr(tartaros.encode('utf8'))[1:-1])
+        # Test the erebos value. Included inline for your enlightenment.
+        # Note the 'r' prefix--those backslashes are literals.
+        self.assertLog(-1, r'\xce\x88\xcf\x81\xce\xb5\xce\xb2\xce\xbf\xcf\x82')
+        
+        # Test backslashes in output.
+        self.markLog()
+        self.getPage("/slashes")
+        self.assertStatus(200)
+        self.assertLog(-1, r'"GET /slashed\\path HTTP/1.1"')
+        
+        # Test whitespace in output.
+        self.markLog()
+        self.getPage("/whitespace")
+        self.assertStatus(200)
+        # Again, note the 'r' prefix.
+        self.assertLog(-1, r'"Browzuh (1.0\r\n\t\t.3)"')
+
+
+class ErrorLogTests(helper.CPWebCase, logtest.LogCase):
+    
+    logfile = error_log
+    
+    def testTracebacks(self):
+        # Test that tracebacks get written to the error log.
+        self.markLog()
+        ignore = helper.webtest.ignored_exceptions
+        ignore.append(ValueError)
+        try:
+            self.getPage("/error")
+            self.assertInBody("raise ValueError()")
+            self.assertLog(0, 'HTTP Traceback (most recent call last):')
+            self.assertLog(-3, 'raise ValueError()')
+        finally:
+            ignore.pop()
+
+
+if __name__ == '__main__':
+    setup_server()
+    helper.testmain()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.