Source

CherryPy / cherrypy / test / webtest.py

Robert Brewer 09e3214 

















Robert Brewer 5200546 
Robert Brewer 09e3214 
Robert Brewer 6d51eba 
Robert Brewer 09e3214 


























































Robert Brewer 91ffbcd 
Robert Brewer 09e3214 












Robert Brewer 91ffbcd 

Robert Brewer 09e3214 















Robert Brewer 91ffbcd 

Robert Brewer 09e3214 

Robert Brewer 91ffbcd 
Robert Brewer 09e3214 

Robert Brewer 6d51eba 


















Robert Brewer 09e3214 





Robert Brewer 79367e6 
Robert Brewer 09e3214 

Robert Brewer 6d51eba 
Robert Brewer 09e3214 
Robert Brewer 81515bb 
Robert Brewer 09e3214 
Robert Brewer 066c721 



Robert Brewer 09e3214 
Robert Brewer 91ffbcd 
Robert Brewer 09e3214 
Robert Brewer 81515bb 
Robert Brewer 6d51eba 


Robert Brewer 79367e6 
Robert Brewer 6d51eba 
Robert Brewer 91ffbcd 
Robert Brewer 6d51eba 






























Robert Brewer 91ffbcd 
Robert Brewer 6d51eba 
Robert Brewer 2e7786f 
Robert Brewer 6d51eba 

Robert Brewer 2e7786f 







































Robert Brewer 81515bb 

Robert Brewer 953b47a 









Robert Brewer 81515bb 





Robert Brewer d6edb21 
Robert Brewer 81515bb 

Robert Brewer 6d51eba 




Robert Brewer 79367e6 
Robert Brewer 81515bb 
Robert Brewer d3bf2ac 






Robert Brewer 79367e6 
Robert Brewer d3bf2ac 
Robert Brewer 81515bb 




Robert Brewer 79367e6 
Robert Brewer 81515bb 



Robert Brewer 6d51eba 

Robert Brewer 79367e6 
Robert Brewer 81515bb 



Robert Brewer 6d51eba 

Robert Brewer 79367e6 
Robert Brewer 5200546 





Robert Brewer 79367e6 
Robert Brewer 81515bb 
Robert Brewer 09e3214 

Robert Brewer e6f398a 
Robert Brewer 188adf6 
Robert Brewer 09e3214 


Robert Brewer 188adf6 

Robert Brewer e6f398a 







Robert Brewer 09e3214 















Robert Brewer 79367e6 
Robert Brewer 09e3214 
Robert Brewer e6f398a 
Robert Brewer 09e3214 





Robert Brewer a3bda71 
Robert Brewer 09e3214 













Robert Brewer 6c1a7ac 
Robert Brewer 09e3214 

Robert Brewer 6c1a7ac 
Robert Brewer 09e3214 
Robert Brewer 6c1a7ac 
Robert Brewer 09e3214 

Robert Brewer 6c1a7ac 
Robert Brewer 09e3214 











Robert Brewer fafa2c1 




Robert Brewer 09e3214 




Robert Brewer 79367e6 
Robert Brewer 09e3214 






Robert Brewer fafa2c1 
Robert Brewer 09e3214 





  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
"""Extensions to unittest for web frameworks.

Use the WebCase.getPage method to request a page from your HTTP server.

Framework Integration
=====================

If you have control over your server process, you can handle errors
in the server-side of the HTTP conversation a bit better. You must run
both the client (your WebCase tests) and the server in the same process
(but in separate threads, obviously).

When an error occurs in the framework, call server_error. It will print
the traceback to stdout, and keep any assertions you have from running
(the assumption is that, if the server errors, the page output won't be
of further significance to your tests).
"""

import os, sys, time, re
import types
import pprint
import socket
import httplib
import traceback

from unittest import *
from unittest import _TextTestResult


class TerseTestResult(_TextTestResult):
    
    def printErrors(self):
        # Overridden to avoid unnecessary empty line
        if self.errors or self.failures:
            if self.dots or self.showAll:
                self.stream.writeln()
            self.printErrorList('ERROR', self.errors)
            self.printErrorList('FAIL', self.failures)


class TerseTestRunner(TextTestRunner):
    """A test runner class that displays results in textual form."""
    
    def _makeResult(self):
        return TerseTestResult(self.stream, self.descriptions, self.verbosity)
    
    def run(self, test):
        "Run the given test case or test suite."
        # Overridden to remove unnecessary empty lines and separators
        result = self._makeResult()
        startTime = time.time()
        test(result)
        timeTaken = float(time.time() - startTime)
        result.printErrors()
        if not result.wasSuccessful():
            self.stream.write("FAILED (")
            failed, errored = map(len, (result.failures, result.errors))
            if failed:
                self.stream.write("failures=%d" % failed)
            if errored:
                if failed: self.stream.write(", ")
                self.stream.write("errors=%d" % errored)
            self.stream.writeln(")")
        return result


class ReloadingTestLoader(TestLoader):
    
    def loadTestsFromName(self, name, module=None):
        """Return a suite of all tests cases given a string specifier.

        The name may resolve either to a module, a test case class, a
        test method within a test case class, or a callable object which
        returns a TestCase or TestSuite instance.

        The method optionally resolves the names relative to a given module.
        """
        parts = name.split('.')
        if module is None:
            if not parts:
                raise ValueError("incomplete test name: %s" % name)
            else:
                parts_copy = parts[:]
                while parts_copy:
                    target = ".".join(parts_copy)
                    if target in sys.modules:
                        module = reload(sys.modules[target])
                        break
                    else:
                        try:
                            module = __import__(target)
                            break
                        except ImportError:
                            del parts_copy[-1]
                            if not parts_copy:
                                raise
                parts = parts[1:]
        obj = module
        for part in parts:
            obj = getattr(obj, part)
        
        if type(obj) == types.ModuleType:
            return self.loadTestsFromModule(obj)
        elif (isinstance(obj, (type, types.ClassType)) and
              issubclass(obj, TestCase)):
            return self.loadTestsFromTestCase(obj)
        elif type(obj) == types.UnboundMethodType:
            return obj.im_class(obj.__name__)
        elif callable(obj):
            test = obj()
            if not isinstance(test, TestCase) and \
               not isinstance(test, TestSuite):
                raise ValueError("calling %s returned %s, "
                                 "not a test" % (obj,test))
            return test
        else:
            raise ValueError("don't know how to make test from: %s" % obj)


try:
    # On Windows, msvcrt.getch reads a single char without output.
    import msvcrt
    def getchar():
        return msvcrt.getch()
except ImportError:
    # Unix getchr
    import tty, termios
    def getchar():
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            tty.setraw(sys.stdin.fileno())
            ch = sys.stdin.read(1)
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
        return ch


class WebCase(TestCase):
    
    HOST = "127.0.0.1"
    PORT = 8000
    
    def getPage(self, url, headers=None, method="GET", body=None):
        """Open the url with debugging support. Return status, headers, body."""
        ServerError.on = False
        
        self.url = url
        result = openURL(url, headers, method, body, self.HOST, self.PORT)
        self.status, self.headers, self.body = result
        
        # Build a list of request cookies from the previous response cookies.
        self.cookies = [('Cookie', v) for k, v in self.headers
                        if k.lower() == 'set-cookie']
        
        if ServerError.on:
            raise ServerError()
        return result
    
    interactive = True
    console_height = 30
    
    def _handlewebError(self, msg):
        if not self.interactive:
            raise self.failureException(msg)
        
        print
        print "    ERROR:", msg
        p = "    Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> "
        print p,
        while True:
            i = getchar().upper()
            if i not in "BHSUIRX":
                continue
            print i.upper()  # Also prints new line
            if i == "B":
                for x, line in enumerate(self.body.splitlines()):
                    if (x + 1) % self.console_height == 0:
                        # The \r and comma should make the next line overwrite
                        print "<-- More -->\r",
                        m = getchar().lower()
                        # Erase our "More" prompt
                        print "            \r",
                        if m == "q":
                            break
                    print line
            elif i == "H":
                pprint.pprint(self.headers)
            elif i == "S":
                print self.status
            elif i == "U":
                print self.url
            elif i == "I":
                # return without raising the normal exception
                return
            elif i == "R":
                raise self.failureException(msg)
            elif i == "X":
                self.exit()
            print p,
    
    def exit(self):
        sys.exit()
    
    def __call__(self, result=None):
        if result is None:
            result = self.defaultTestResult()
        result.startTest(self)
        testMethod = getattr(self, self._TestCase__testMethodName)
        try:
            try:
                self.setUp()
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                result.addError(self, self._TestCase__exc_info())
                return
            
            ok = 0
            try:
                testMethod()
                ok = 1
            except self.failureException:
                result.addFailure(self, self._TestCase__exc_info())
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                result.addError(self, self._TestCase__exc_info())
            
            try:
                self.tearDown()
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                result.addError(self, self._TestCase__exc_info())
                ok = 0
            if ok:
                result.addSuccess(self)
        finally:
            result.stopTest(self)
    
    def assertStatus(self, status, msg=None):
        """Fail if self.status != status."""
        if isinstance(status, basestring):
            if not self.status == status:
                if msg is None:
                    msg = 'Status (%s) != %s' % (`self.status`, `status`)
                self._handlewebError(msg)
        else:
            if not self.status in status:
                if msg is None:
                    msg = 'Status (%s) not in %s' % (`self.status`, `status`)
                self._handlewebError(msg)
    
    def assertHeader(self, key, value=None, msg=None):
        """Fail if (key, [value]) not in self.headers."""
        lowkey = key.lower()
        for k, v in self.headers:
            if k.lower() == lowkey:
                if value is None or str(value) == v:
                    return
        
        if msg is None:
            if value is None:
                msg = '%s not in headers' % `key`
            else:
                msg = '%s:%s not in headers' % (`key`, `value`)
        self._handlewebError(msg)
    
    def assertNoHeader(self, key, msg=None):
        """Fail if key in self.headers."""
        lowkey = key.lower()
        matches = [k for k, v in self.headers if k.lower() == lowkey]
        if matches:
            if msg is None:
                msg = '%s in headers' % `key`
            self._handlewebError(msg)
    
    def assertBody(self, value, msg=None):
        """Fail if value != self.body."""
        if value != self.body:
            if msg is None:
                msg = 'expected body:\n%s\n\nactual body:\n%s' % (`value`, `self.body`)
            self._handlewebError(msg)
    
    def assertInBody(self, value, msg=None):
        """Fail if value not in self.body."""
        if value not in self.body:
            if msg is None:
                msg = '%s not in body' % `value`
            self._handlewebError(msg)
    
    def assertNotInBody(self, value, msg=None):
        """Fail if value in self.body."""
        if value in self.body:
            if msg is None:
                msg = '%s found in body' % `value`
            self._handlewebError(msg)
    
    def assertMatchesBody(self, pattern, msg=None, flags=0):
        """Fail if value (a regex pattern) is not in self.body."""
        if re.search(pattern, self.body, flags) is None:
            if msg is None:
                msg = 'No match for %s in body' % `pattern`
            self._handlewebError(msg)



def cleanHeaders(headers, method, body, host, port):
    """Return request headers, with required headers added (if missing)."""
    if headers is None:
        headers = []
    
    # Add the required Host request header if not present.
    # [This specifies the host:port of the server, not the client.]
    found = False
    for k, v in headers:
        if k.lower() == 'host':
            found = True
            break
    if not found:
        headers.append(("Host", "%s:%s" % (host, port)))
    
    if method in ("POST", "PUT"):
        # Stick in default type and length headers if not present
        found = False
        for k, v in headers:
            if k.lower() == 'content-type':
                found = True
                break
        if not found:
            headers.append(("Content-Type", "application/x-www-form-urlencoded"))
            headers.append(("Content-Length", str(len(body or ""))))
    
    return headers


def openURL(url, headers=None, method="GET", body=None,
            host="127.0.0.1", port=8000):
    """Open the given HTTP resource and return status, headers, and body."""
    
    headers = cleanHeaders(headers, method, body, host, port)
    
    # Trying 10 times is simply in case of socket errors.
    # Normal case--it should run once.
    trial = 0
    while trial < 10:
        try:
            conn = httplib.HTTPConnection(host, port)
            conn.putrequest(method.upper(), url)
            
            for key, value in headers:
                conn.putheader(key, value)
            conn.endheaders()
            
            if body is not None:
                conn.send(body)
            
            # Handle response
            response = conn.getresponse()
            
            status = "%s %s" % (response.status, response.reason)
            
            outheaders = []
            for line in response.msg.headers:
                key, value = line.split(":", 1)
                outheaders.append((key.strip(), value.strip()))
            
            outbody = response.read()
            
            conn.close()
            return status, outheaders, outbody
        except socket.error:
            trial += 1
            if trial >= 10:
                raise
            else:
                time.sleep(0.5)


# Add any exceptions which your web framework handles
# normally (that you don't want server_error to trap).
ignored_exceptions = []

# You'll want set this to True when you can't guarantee
# that each response will immediately follow each request;
# for example, when handling requests via multiple threads.
ignore_all = False

class ServerError(Exception):
    on = False


def server_error(exc=None):
    """Server debug hook. Return True if exception handled, False if ignored.
    
    You probably want to wrap this, so you can still handle an error using
    your framework when it's ignored.
    """
    if exc is None: 
        exc = sys.exc_info()
    
    if ignore_all or exc[0] in ignored_exceptions:
        return False
    else:
        ServerError.on = True
        print
        print "".join(traceback.format_exception(*exc))
        return True
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.