Anonymous avatar Anonymous committed 295a941 Merge

Merge heads.

Comments (0)

Files changed (28)

 ^docs/_build
 [^/]+-stats\.txt$
 ^bench/[ab]/
+\.coverage
+coverage_out/
 test:
 	@(nosetests $(TEST_OPTIONS) $(TESTS))
 
+coverage:
+	@(nosetests $(TEST_OPTIONS) --with-coverage --cover-package=werkzeug --cover-html --cover-html-dir=coverage_out $(TESTS))
+
 doctest:
 	@(cd docs; sphinx-build -b doctest . _build/doctest)

tests/test_compat.py

 '''
 
 
-# ignore some warnings werkzeug emits for backwards compat
-for msg in ['called into deprecated fix_headers',
-            'fix_headers changed behavior']:
-    warnings.filterwarnings('ignore', message=msg,
-                            category=DeprecationWarning)
-
-
 def perform_import(module, allowed):
     client = Popen([sys.executable, '-c', import_code % module],
                    stdout=PIPE)
 
 def test_fix_headers_in_response():
     """Make sure fix_headers still works for backwards compatibility"""
+    # ignore some warnings werkzeug emits for backwards compat
+    for msg in ['called into deprecated fix_headers',
+                'fix_headers changed behavior']:
+        warnings.filterwarnings('ignore', message=msg,
+                                category=DeprecationWarning)
+
     from werkzeug import Response
     class MyResponse(Response):
         def fix_headers(self, environ):
     resp = Response.from_app(myresp, create_environ(method='GET'))
     assert resp.headers['x-foo'] == 'meh'
     assert resp.data == 'Foo'
+
+    warnings.resetwarnings()

tests/test_datastructures.py

 from werkzeug.datastructures import FileStorage, MultiDict, \
      ImmutableMultiDict, CombinedMultiDict, ImmutableTypeConversionDict, \
      ImmutableDict, Headers, ImmutableList, EnvironHeaders, \
-     OrderedMultiDict, ImmutableOrderedMultiDict
+     OrderedMultiDict, ImmutableOrderedMultiDict, HeaderSet
 
 
 def test_multidict_pickle():
     assert "('a', 2)" in repr(md)
     assert "('b', 3)" in repr(md)
 
+    # add and getlist
+    md.add('c', '42')
+    md.add('c', '23')
+    assert md.getlist('c') == ['42', '23']
+    md.add('c', 'blah')
+    assert md.getlist('c', type=int) == [42, 23]
+
+    # iter interfaces
+    assert list(zip(md.keys(), md.listvalues())) == list(md.lists())
+    assert list(zip(md, md.iterlistvalues())) == list(md.iterlists())
+    assert list(zip(md.iterkeys(), md.iterlistvalues())) == list(md.iterlists())
+
+    # setdefault
+    md = MultiDict()
+    md.setdefault('x', []).append(42)
+    md.setdefault('x', []).append(23)
+    assert md['x'] == [42, 23]
+
+    # to dict
+    md = MultiDict()
+    md['foo'] = 42
+    md.add('bar', 1)
+    md.add('bar', 2)
+    assert md.to_dict() == {'foo': 42, 'bar': 1}
+    assert md.to_dict(flat=False) == {'foo': [42], 'bar': [1, 2]}
+
+    # popitem from empty dict
+    assert_raises(KeyError, MultiDict().popitem)
+    assert_raises(KeyError, MultiDict().popitemlist)
+
+    # key errors are of a special type
+    assert_raises(MultiDict.KeyError, MultiDict().__getitem__, 42)
+
+    # setlist works
+    md = MultiDict()
+    md['foo'] = 42
+    md.setlist('foo', [1, 2])
+    assert md.getlist('foo') == [1, 2]
+
+
 def test_combined_multidict():
     """Combined multidict behavior"""
     d1 = MultiDict([('foo', '1')])
 
     assert sorted(d.items()) == [('bar', '2'), ('foo', '1')], d.items()
     assert sorted(d.items(multi=True)) == [('bar', '2'), ('bar', '3'), ('foo', '1')]
-
+    assert 'missingkey' not in d
+    assert 'foo' in d
 
     # type lookup
     assert d.get('foo', type=int) == 1
 
 
 def test_immutable_dict_copies_are_mutable():
-    for cls in ImmutableTypeConversionDict, ImmutableMultiDict, ImmutableDict:
+    for cls in ImmutableTypeConversionDict, ImmutableMultiDict, ImmutableDict, \
+               ImmutableOrderedMultiDict:
         immutable = cls({'a': 1})
         assert_raises(TypeError, immutable.pop, 'a')
 
     assert headers['x'] == r'y; z="\""'
 
     # defaults
-    headers = Headers({
-        'Content-Type': 'text/plain',
-        'X-Foo':        'bar',
-        'X-Bar':        ['1', '2']
-    })
+    headers = Headers([
+        ('Content-Type', 'text/plain'),
+        ('X-Foo',        'bar'),
+        ('X-Bar',        '1'),
+        ('X-Bar',        '2')
+    ])
     assert headers.getlist('x-bar') == ['1', '2']
     assert headers.get('x-Bar') == '1'
     assert headers.get('Content-Type') == 'text/plain'
     assert headers[:1] == Headers([('Content-Type', 'text/plain')])
     del headers[:2]
     del headers[-1]
-    assert headers == Headers([('X-Bar', '2')])
+    assert headers == Headers([('X-Bar', '1')])
 
     # copying
     a = Headers([('foo', 'bar')])
     assert headers.pop('b', 2) == 2
     assert_raises(KeyError, headers.pop, 'c')
 
+    # set replaces and accepts same arguments as add
+    a = Headers()
+    a.set('Content-Disposition', 'useless')
+    a.set('Content-Disposition', 'attachment', filename='foo')
+    assert a['Content-Disposition'] == 'attachment; filename=foo'
+
+
+def test_header_set():
+    """Test the header set"""
+    hs = HeaderSet()
+    hs.add('foo')
+    hs.add('bar')
+    assert 'Bar' in hs
+    assert hs.find('foo') == 0
+    assert hs.find('BAR') == 1
+    assert hs.find('baz') < 0
+    hs.discard('missing')
+    hs.discard('foo')
+    assert hs.find('foo') < 0
+    assert hs.find('bar') == 0
+    assert_raises(IndexError, hs.index, 'missing')
+    assert hs.index('bar') == 0
+    assert hs
+    hs.clear()
+    assert not hs
+
 
 def test_environ_headers_counts():
     """Ensure that the EnvironHeaders count correctly."""
     assert d.poplist('bar') == [42]
     assert not d
 
+    d.get('missingkey') is None
+
     d.add('foo', 42)
     d.add('foo', 23)
     d.add('bar', 2)
     assert d == id
     d.add('foo', 2)
     assert d != id
+
+    d.update({'blah': [1, 2, 3]})
+    assert d['blah'] == 1
+    assert d.getlist('blah') == [1, 2, 3]
+
+    # setlist works
+    d = OrderedMultiDict()
+    d['foo'] = 42
+    d.setlist('foo', [1, 2])
+    assert d.getlist('foo') == [1, 2]
+
+    assert_raises(OrderedMultiDict.KeyError, d.pop, 'missing')
+    assert_raises(OrderedMultiDict.KeyError, d.__getitem__, 'missing')
+
+    # popping
+    d = OrderedMultiDict()
+    d.add('foo', 23)
+    d.add('foo', 42)
+    d.add('foo', 1)
+    assert d.popitem() == ('foo', 23)
+    assert_raises(OrderedMultiDict.KeyError, d.popitem)
+    assert not d
+
+    d.add('foo', 23)
+    d.add('foo', 42)
+    d.add('foo', 1)
+    assert d.popitemlist() == ('foo', [23, 42, 1])
+    assert_raises(OrderedMultiDict.KeyError, d.popitemlist)
+
+
+def test_immutable_structures():
+    """Test immutable structures"""
+    l = ImmutableList([1, 2, 3])
+    assert_raises(TypeError, l.__delitem__, 0)
+    assert_raises(TypeError, l.__delslice__, 0, 1)
+    assert_raises(TypeError, l.__iadd__, [1, 2])
+    assert_raises(TypeError, l.__setitem__, 0, 1)
+    assert_raises(TypeError, l.__setslice__, 0, 1, [2, 3])
+    assert_raises(TypeError, l.append, 42)
+    assert_raises(TypeError, l.insert, 0, 32)
+    assert_raises(TypeError, l.pop)
+    assert_raises(TypeError, l.extend, [2, 3])
+    assert_raises(TypeError, l.reverse)
+    assert_raises(TypeError, l.sort)
+    assert l == [1, 2, 3]
+
+    d = ImmutableDict(foo=23, bar=42)
+    assert_raises(TypeError, d.setdefault, 'baz')
+    assert_raises(TypeError, d.update, {2: 3})
+    assert_raises(TypeError, d.popitem)
+    assert_raises(TypeError, d.__delitem__, 'foo')
+    assert_raises(TypeError, d.clear)
+    assert d == dict(foo=23, bar=42)
+
+    d = ImmutableMultiDict(d)
+    assert_raises(TypeError, d.add, 'fuss', 44)
+    assert_raises(TypeError, d.popitemlist)
+    assert_raises(TypeError, d.poplist, 'foo')
+    assert_raises(TypeError, d.setlist, 'tadaa', [1, 2])
+    assert_raises(TypeError, d.setlistdefault, 'tadaa')
+
+    d = EnvironHeaders({'HTTP_X_FOO': 'test'})
+    assert_raises(TypeError, d.__delitem__, 0)
+    assert_raises(TypeError, d.add, 42)
+    assert_raises(TypeError, d.pop, 'x-foo')
+    assert_raises(TypeError, d.popitem)
+    assert_raises(TypeError, d.setdefault, 'foo', 42)
+    assert dict(d.items()) == {'X-Foo': 'test'}
+    assert_raises(TypeError, d.copy)

tests/test_debug.py

+# -*- coding: utf-8 -*-
+"""
+    werkzeug.debug test
+    ~~~~~~~~~~~~~~~~~~~
+
+    :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details.
+    :license: BSD license.
+"""
+import re
+import sys
+from werkzeug.debug.repr import debug_repr, DebugReprGenerator, dump, helper
+from werkzeug.debug.console import HTMLStringO
+
+
+def test_debug_repr():
+    """Test the debug repr from the debug component"""
+    assert debug_repr([]) == u'[]'
+    assert debug_repr([1, 2]) == \
+        u'[<span class="number">1</span>, <span class="number">2</span>]'
+    assert debug_repr([1, 'test']) == \
+        u'[<span class="number">1</span>, <span class="string">\'test\'</span>]'
+    assert debug_repr([None]) == \
+        u'[<span class="object">None</span>]'
+    assert debug_repr(list(range(20))) == (
+        u'[<span class="number">0</span>, <span class="number">1</span>, '
+        u'<span class="number">2</span>, <span class="number">3</span>, '
+        u'<span class="number">4</span>, <span class="number">5</span>, '
+        u'<span class="number">6</span>, <span class="number">7</span>, '
+        u'<span class="extended"><span class="number">8</span>, '
+        u'<span class="number">9</span>, <span class="number">10</span>, '
+        u'<span class="number">11</span>, <span class="number">12</span>, '
+        u'<span class="number">13</span>, <span class="number">14</span>, '
+        u'<span class="number">15</span>, <span class="number">16</span>, '
+        u'<span class="number">17</span>, <span class="number">18</span>, '
+        u'<span class="number">19</span></span>]'
+    )
+    assert debug_repr({}) == u'{}'
+    assert debug_repr({'foo': 42}) == \
+        u'{<span class="pair"><span class="key"><span class="string">\'foo\''\
+        u'</span></span>: <span class="value"><span class="number">42' \
+        u'</span></span></span>}'
+    assert debug_repr(dict(zip(range(10), [None] * 10))) == \
+        u'{<span class="pair"><span class="key"><span class="number">0</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">1</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">2</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">3</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="extended"><span class="pair"><span class="key"><span class="number">4</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">5</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">6</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">7</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">8</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">9</span></span>: <span class="value"><span class="object">None</span></span></span></span>}'
+    assert debug_repr((1, 'zwei', u'drei')) ==\
+        u'(<span class="number">1</span>, <span class="string">\'' \
+        u'zwei\'</span>, <span class="string">u\'drei\'</span>)'
+
+    class Foo(object):
+        def __repr__(self):
+            return '<Foo 42>'
+    assert debug_repr(Foo()) == '<span class="object">&lt;Foo 42&gt;</span>'
+
+    class MyList(list):
+        pass
+    assert debug_repr(MyList([1, 2])) == \
+        u'<span class="module">test_debug.</span>MyList([' \
+        u'<span class="number">1</span>, <span class="number">2</span>])'
+
+    assert debug_repr(re.compile(r'foo\d')) == \
+        u're.compile(<span class="string regex">r\'foo\\d\'</span>)'
+    assert debug_repr(re.compile(ur'foo\d')) == \
+        u're.compile(<span class="string regex">ur\'foo\\d\'</span>)'
+
+    assert debug_repr(frozenset('x')) == \
+        u'frozenset([<span class="string">\'x\'</span>])'
+    assert debug_repr(set('x')) == \
+        u'set([<span class="string">\'x\'</span>])'
+
+    a = [1]
+    a.append(a)
+    assert debug_repr(a) == u'[<span class="number">1</span>, [...]]'
+
+    class Foo(object):
+        def __repr__(self):
+            1/0
+
+    assert debug_repr(Foo()) == \
+        u'<span class="brokenrepr">&lt;broken repr (ZeroDivisionError: ' \
+        u'integer division or modulo by zero)&gt;</span>'
+
+
+def test_object_dumping():
+    """Test debug object dumping to HTML"""
+    class Foo(object):
+        x = 42
+        y = 23
+        def __init__(self):
+            self.z = 15
+
+    drg = DebugReprGenerator()
+    out = drg.dump_object(Foo())
+    assert re.search('Details for test_debug.Foo object at', out)
+    assert re.search('<th>x</th>.*<span class="number">42</span>(?s)', out)
+    assert re.search('<th>y</th>.*<span class="number">23</span>(?s)', out)
+    assert re.search('<th>z</th>.*<span class="number">15</span>(?s)', out)
+
+    out = drg.dump_object({'x': 42, 'y': 23})
+    assert re.search('Contents of', out)
+    assert re.search('<th>x</th>.*<span class="number">42</span>(?s)', out)
+    assert re.search('<th>y</th>.*<span class="number">23</span>(?s)', out)
+
+    out = drg.dump_object({'x': 42, 'y': 23, 23: 11})
+    assert not re.search('Contents of', out)
+
+    out = drg.dump_locals({'x': 42, 'y': 23})
+    assert re.search('Local variables in frame', out)
+    assert re.search('<th>x</th>.*<span class="number">42</span>(?s)', out)
+    assert re.search('<th>y</th>.*<span class="number">23</span>(?s)', out)
+
+
+def test_debug_dump():
+    """Test debug dump"""
+    old = sys.stdout
+    sys.stdout = HTMLStringO()
+    try:
+        dump([1, 2, 3])
+        x = sys.stdout.reset()
+        dump()
+        y = sys.stdout.reset()
+    finally:
+        sys.stdout = old
+
+    assert 'Details for list object at' in x
+    assert '<span class="number">1</span>' in x
+    assert 'Local variables in frame' in y
+    assert '<th>x</th>' in y
+    assert '<th>old</th>' in y
+
+
+def test_debug_help():
+    """Test debug help"""
+    old = sys.stdout
+    sys.stdout = HTMLStringO()
+    try:
+        helper([1, 2, 3])
+        x = sys.stdout.reset()
+    finally:
+        sys.stdout = old
+
+    assert 'Help on list object' in x
+    assert '__delitem__' in x

tests/test_exceptions.py

+# -*- coding: utf-8 -*-
+"""
+    werkzeug.exceptiosn test
+    ~~~~~~~~~~~~~~~~~~~~~~~~
+
+    :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details.
+    :license: BSD license.
+"""
+from nose.tools import assert_raises
+
+from werkzeug import exceptions, Response, abort, Aborter
+
+
+def test_proxy_exception():
+    """Proxy exceptions"""
+    orig_resp = Response('Hello World')
+    try:
+        abort(orig_resp)
+    except exceptions.HTTPException, e:
+        resp = e.get_response({})
+    else:
+        assert False, 'exception not raised'
+    assert resp is orig_resp
+    assert resp.data == 'Hello World'
+
+
+def test_aborter():
+    """Exception aborter"""
+    assert_raises(exceptions.BadRequest, abort, 400)
+    assert_raises(exceptions.Unauthorized, abort, 401)
+    assert_raises(exceptions.Forbidden, abort, 403)
+    assert_raises(exceptions.NotFound, abort, 404)
+    assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD'])
+    assert_raises(exceptions.NotAcceptable, abort, 406)
+    assert_raises(exceptions.RequestTimeout, abort, 408)
+    assert_raises(exceptions.Gone, abort, 410)
+    assert_raises(exceptions.LengthRequired, abort, 411)
+    assert_raises(exceptions.PreconditionFailed, abort, 412)
+    assert_raises(exceptions.RequestEntityTooLarge, abort, 413)
+    assert_raises(exceptions.RequestURITooLarge, abort, 414)
+    assert_raises(exceptions.UnsupportedMediaType, abort, 415)
+    assert_raises(exceptions.InternalServerError, abort, 500)
+    assert_raises(exceptions.NotImplemented, abort, 501)
+    assert_raises(exceptions.BadGateway, abort, 502)
+    assert_raises(exceptions.ServiceUnavailable, abort, 503)
+
+    myabort = Aborter({1: exceptions.NotFound})
+    assert_raises(LookupError, myabort, 404)
+    assert_raises(exceptions.NotFound, myabort, 1)
+
+    myabort = Aborter(extra={1: exceptions.NotFound})
+    assert_raises(exceptions.NotFound, myabort, 404)
+    assert_raises(exceptions.NotFound, myabort, 1)
+
+
+def test_exception_repr():
+    """Repr and unicode of exceptions"""
+    exc = exceptions.NotFound()
+    assert unicode(exc) == '404: Not Found'
+    assert repr(exc) == "<NotFound '404: Not Found'>"
+
+    exc = exceptions.NotFound('Not There')
+    assert unicode(exc) == '404: Not There'
+    assert repr(exc) == "<NotFound '404: Not There'>"
+
+
+def test_special_exceptions():
+    """Special HTTP exceptions"""
+    exc = exceptions.MethodNotAllowed(['GET', 'HEAD', 'POST'])
+    h = dict(exc.get_headers({}))
+    assert h['Allow'] == 'GET, HEAD, POST'
+    assert 'The method DELETE is not allowed' in exc.get_description({
+        'REQUEST_METHOD': 'DELETE'
+    })

tests/test_formparser.py

 from cStringIO import StringIO
 
 from werkzeug import Client, Request, Response, parse_form_data, \
-     create_environ, FileStorage
+     create_environ, FileStorage, formparser, create_environ
 from werkzeug.exceptions import RequestEntityTooLarge
 
 
     assert not data.form
 
 
+def test_broken_multipart():
+    """Broken multipart does not break the applicaiton"""
+    data = (
+        '--foo\r\n'
+        'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
+        'Content-Transfer-Encoding: base64\r\n'
+        'Content-Type: text/plain\r\n\r\n'
+        'broken base 64'
+        '--foo--'
+    )
+    _, form, files = parse_form_data(create_environ(data=data, method='POST',
+                                     content_type='multipart/form-data; boundary=foo'))
+    assert not files
+    assert not form
+
+    assert_raises(ValueError, parse_form_data, create_environ(data=data, method='POST',
+                  content_type='multipart/form-data; boundary=foo'),
+                  silent=False)
+
+
 def test_multipart_file_no_content_type():
     """Chrome does not always provide a content type."""
     data = (
                               method='POST')
     req.max_form_memory_size = 400
     assert req.form['foo'] == 'Hello World'
+
+
+def test_large_file():
+    """Test a largish file."""
+    data = 'x' * (1024 * 600)
+    req = Request.from_values(data={'foo': (StringIO(data), 'test.txt')},
+                              method='POST')
+    # make sure we have a real file here, because we expect to be
+    # on the disk.  > 1024 * 500
+    assert isinstance(req.files['foo'].stream, file)
+
+
+def test_lowlevel():
+    """Lowlevel formparser tests"""
+    formparser._line_parse('foo') == ('foo', False)
+    formparser._line_parse('foo\r\n') == ('foo', True)
+    formparser._line_parse('foo\r') == ('foo', True)
+    formparser._line_parse('foo\n') == ('foo', True)
+
+    lineiter = iter('\n\n\nfoo\nbar\nbaz'.splitlines(True))
+    line = formparser._find_terminator(lineiter)
+    assert line == 'foo'
+    assert list(lineiter) == ['bar\n', 'baz']
+    assert formparser._find_terminator([]) == ''
+    assert formparser._find_terminator(['']) == ''
+
+    assert_raises(ValueError, formparser.parse_multipart, None, '', 20)
+    assert_raises(ValueError, formparser.parse_multipart, None, 'broken  ', 20)
+
+    data = '--foo\r\n\r\nHello World\r\n--foo--'
+    assert_raises(ValueError, formparser.parse_multipart, StringIO(data), 'foo', len(data))
+
+    data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n' \
+           'Content-Transfer-Encoding: base64\r\n\r\nHello World\r\n--foo--'
+    assert_raises(ValueError, formparser.parse_multipart, StringIO(data), 'foo', len(data))
+
+    data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\nHello World\r\n'
+    assert_raises(ValueError, formparser.parse_multipart, StringIO(data), 'foo', len(data))
+
+    x = formparser.parse_multipart_headers(['foo: bar\r\n', ' x test\r\n'])
+    assert x['foo'] == 'bar\n x test'
+    assert_raises(ValueError, formparser.parse_multipart_headers, ['foo: bar\r\n', ' x test'])

tests/test_http.py

 from nose.tools import assert_raises
 
 from werkzeug.http import *
-from werkzeug.utils import http_date, redirect
+from werkzeug.utils import http_date, redirect, http_date
+from werkzeug.test import create_environ
 from werkzeug.datastructures import *
 
 
     assert a.response == '6629fae49393a05397450978507c4ef1'
     assert a.opaque == '5ccc069c403ebaf9f0171e9517f40e41'
 
+    assert parse_authorization_header('') is None
+    assert parse_authorization_header(None) is None
+    assert parse_authorization_header('foo') is None
+
 
 def test_www_authenticate_header():
     """WWW Authenticate header parsing and behavior"""
     assert wa.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093'
     assert wa.opaque == '5ccc069c403ebaf9f0171e9517f40e41'
 
+    wa = parse_www_authenticate_header('broken')
+    assert wa.type == 'broken'
+
+    assert not parse_www_authenticate_header('').type
+    assert not parse_www_authenticate_header('')
+
+
 
 def test_etags():
     """ETag tools"""
     assert unquote_etag('"foo"') == ('foo', False)
     assert unquote_etag('w/"foo"') == ('foo', True)
     es = parse_etags('"foo", "bar", w/"baz", blar')
+    assert sorted(es) == ['bar', 'blar', 'foo']
     assert 'foo' in es
     assert 'baz' not in es
     assert es.contains_weak('baz')
     resp = redirect('http://example.com/', 305)
     assert resp.headers['Location'] == 'http://example.com/'
     assert resp.status_code == 305
+
+
+def test_dump_options_header():
+    """Test options header dumping alone"""
+    assert dump_options_header('foo', {'bar': 42}) == \
+        'foo; bar=42'
+    assert dump_options_header('foo', {'bar': 42, 'fizz': None}) == \
+        'foo; bar=42; fizz'
+
+
+def test_dump_header():
+    """Test the header dumping function alone"""
+    assert dump_header([1, 2, 3]) == '1, 2, 3'
+    assert dump_header([1, 2, 3], allow_token=False) == '"1", "2", "3"'
+    assert dump_header({'foo': 'bar'}, allow_token=False) == 'foo="bar"'
+    assert dump_header({'foo': 'bar'}) == 'foo=bar'
+
+
+def test_parse_options_header():
+    """Parse options header"""
+    assert parse_options_header('something; foo="other\"thing"') == \
+        ('something', {'foo': 'other"thing'})
+    assert parse_options_header('something; foo="other\"thing"; meh=42') == \
+        ('something', {'foo': 'other"thing', 'meh': '42'})
+    assert parse_options_header('something; foo="other\"thing"; meh=42; bleh') == \
+        ('something', {'foo': 'other"thing', 'meh': '42', 'bleh': None})
+
+
+def test_is_resource_modified():
+    """Test is_resource_modified alone"""
+    env = create_environ()
+
+    # ignore POST
+    env['REQUEST_METHOD'] = 'POST'
+    assert not is_resource_modified(env, etag='testing')
+    env['REQUEST_METHOD'] = 'GET'
+
+    # etagify from data
+    assert_raises(TypeError, is_resource_modified, env, data='42', etag='23')
+    env['HTTP_IF_NONE_MATCH'] = generate_etag('awesome')
+    assert not is_resource_modified(env, data='awesome')
+
+    env['HTTP_IF_MODIFIED_SINCE'] = http_date(datetime(2008, 1, 1, 12, 30))
+    assert not is_resource_modified(env,
+        last_modified=datetime(2008, 1, 1, 12, 00))
+    assert is_resource_modified(env,
+        last_modified=datetime(2008, 1, 1, 13, 00))

tests/test_internal.py

 # -*- coding: utf-8 -*-
+from nose.tools import assert_raises
+
+from warnings import filterwarnings, resetwarnings
 from datetime import datetime
-from werkzeug import _internal as internal
+from werkzeug import _internal as internal, Request, Response, \
+     create_environ
 
 
 def test_date_to_unix():
     assert internal._date_to_unix(datetime(1970, 1, 1, 1, 1, 1)) == 3661
     x = datetime(2010, 2, 15, 16, 15, 39)
     assert internal._date_to_unix(x) == 1266250539
+
+
+def test_easteregg():
+    """Make sure the easteregg runs"""
+    req = Request.from_values('/?macgybarchakku')
+    resp = Response.force_type(internal._easteregg(None), req)
+    assert 'About Werkzeug' in resp.data
+    assert 'the Swiss Army knife of Python web development' in resp.data
+
+
+def test_wrapper_internals():
+    """Test internals of the wrappers"""
+    from werkzeug import Request
+    req = Request.from_values(data={'foo': 'bar'}, method='POST')
+    req._load_form_data()
+    assert req.form.to_dict() == {'foo': 'bar'}
+
+    # second call does not break
+    req._load_form_data()
+    assert req.form.to_dict() == {'foo': 'bar'}
+
+    # check reprs
+    assert repr(req) == "<Request 'http://localhost/' [POST]>"
+    resp = Response()
+    assert repr(resp) == '<Response 0 bytes [200 OK]>'
+    resp.data = 'Hello World!'
+    assert repr(resp) == '<Response 12 bytes [200 OK]>'
+    resp.response = iter(['Test'])
+    assert repr(resp) == '<Response streamed [200 OK]>'
+
+    # unicode data does not set content length
+    response = Response([u'Hällo Wörld'])
+    headers = response.get_wsgi_headers(create_environ())
+    assert 'Content-Length' not in headers
+
+    response = Response(['Hällo Wörld'])
+    headers = response.get_wsgi_headers(create_environ())
+    assert 'Content-Length' in headers
+
+    # check for internal warnings
+    print 'start'
+    filterwarnings('error', category=Warning)
+    response = Response()
+    environ = create_environ()
+    response.response = 'What the...?'
+    assert_raises(Warning, lambda: list(response.iter_encoded()))
+    assert_raises(Warning, lambda: list(response.get_app_iter(environ)))
+    response.direct_passthrough = True
+    assert_raises(Warning, lambda: list(response.iter_encoded()))
+    assert_raises(Warning, lambda: list(response.get_app_iter(environ)))
+    resetwarnings()

tests/test_local.py

+# -*- coding: utf-8 -*-
+"""
+    werkzeug.local test
+    ~~~~~~~~~~~~~~~~~~~
+
+    :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details.
+    :license: BSD license.
+"""
+import time
+from threading import Thread
+
+from nose.tools import assert_raises
+
+from werkzeug import Local, LocalManager
+
+
+def test_basic_local():
+    """Basic local object support"""
+    l = Local()
+    l.foo = 0
+    values = []
+    def value_setter(idx):
+        time.sleep(0.01 * idx)
+        l.foo = idx
+        time.sleep(0.02)
+        values.append(l.foo)
+    threads = [Thread(target=value_setter, args=(x,))
+               for x in [1, 2, 3]]
+    for thread in threads:
+        thread.start()
+    time.sleep(0.2)
+    assert sorted(values) == [1, 2, 3]
+
+    def delfoo():
+        del l.foo
+    delfoo()
+    assert_raises(AttributeError, lambda: l.foo)
+    assert_raises(AttributeError, delfoo)

tests/test_serving.py

+# -*- coding: utf-8 -*-
+"""
+    werkzeug.serving test
+    ~~~~~~~~~~~~~~~~~~~~
+
+    :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details.
+    :license: BSD, see LICENSE for more details.
+"""
+import time
+import urllib
+from werkzeug import serving, test_app, __version__ as version
+from threading import Thread
+
+
+real_make_server = serving.make_server
+
+
+def run_dev_server(application):
+    servers = []
+    def tracking_make_server(*args, **kwargs):
+        srv = real_make_server(*args, **kwargs)
+        servers.append(srv)
+        return srv
+    serving.make_server = tracking_make_server
+    try:
+        t = Thread(target=serving.run_simple, args=('localhost', 0, application))
+        t.setDaemon(True)
+        t.start()
+        time.sleep(0.25)
+    finally:
+        serving.make_server = real_make_server
+    if not servers:
+        return None, None
+    server ,= servers
+    ip, port = server.socket.getsockname()[:2]
+    if ':' in ip:
+        ip = '[%s]' % ip
+    return server, '%s:%d'  % (ip, port)
+
+
+def test_serving():
+    """Test server"""
+    server, addr = run_dev_server(test_app)
+    rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read()
+    assert 'WSGI Information' in rv
+    assert 'foo=bar&amp;baz=blah' in rv
+    assert ('Werkzeug/%s' % version) in rv
+
+
+def test_broken_app():
+    """Broken apps in server"""
+    def broken_app(environ, start_response):
+        1/0
+    server, addr = run_dev_server(broken_app)
+    rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read()
+    assert 'Internal Server Error' in rv

tests/test_templates.py

         a=[1, 2],
         b=2
     ))) == '1|2'
+
+
+def test_substitute():
+    """Templer rendering responds to substitute as well"""
+    t = Template('<% if a %>1<% endif %>\n2')
+    assert t.render(a=1) == t.substitute(a=1)

tests/test_urls.py

     assert url_encode(d) == 'foo=1&foo=2&foo=3&bar=0&foo=4'
 
 
+def test_href():
+    """Test the Href class"""
+    x = Href('http://www.example.com/')
+    assert x('foo') == 'http://www.example.com/foo'
+    assert x.foo('bar') == 'http://www.example.com/foo/bar'
+    assert x.foo('bar', x=42) == 'http://www.example.com/foo/bar?x=42'
+    assert x.foo('bar', class_=42) == 'http://www.example.com/foo/bar?class=42'
+    assert x.foo('bar', {'class': 42}) == 'http://www.example.com/foo/bar?class=42'
+    assert_raises(AttributeError, lambda: x.__blah__)
+
+    x = Href('blah')
+    assert x.foo('bar') == 'blah/foo/bar'
+
+    assert_raises(TypeError, x.foo, {"foo": 23}, x=42)
+
+    x = Href('')
+    assert x('foo') == 'foo'
+
+
 def test_href_past_root():
     """Href() over root does not break the URL."""
     raise SkipTest('currently not implemented, stdlib bug?')

tests/test_wrappers.py

 from datetime import datetime, timedelta
 from werkzeug.wrappers import *
 from werkzeug.wsgi import LimitedStream
+from werkzeug.http import generate_etag
 from werkzeug.datastructures import MultiDict, ImmutableOrderedMultiDict, \
      ImmutableList, ImmutableTypeConversionDict
-from werkzeug.test import Client, create_environ
+from werkzeug.test import Client, create_environ, run_wsgi_app
 
 
 class RequestTestResponse(BaseResponse):
     assert response['form'] == MultiDict()
 
 
+def test_access_route():
+    """Check access route on the wrappers"""
+    req = Request.from_values(headers={
+        'X-Forwarded-For': '192.168.1.2, 192.168.1.1'
+    })
+    req.environ['REMOTE_ADDR'] = '192.168.1.3'
+    req.is_behind_proxy = True
+    assert req.access_route == ['192.168.1.2', '192.168.1.1']
+    assert req.remote_addr == '192.168.1.2'
+    req.is_behind_proxy = False
+    assert req.access_route == ['192.168.1.2', '192.168.1.1']
+    assert req.remote_addr == '192.168.1.3'
+
+    req = Request.from_values()
+    req.environ['REMOTE_ADDR'] = '192.168.1.3'
+    assert req.access_route == ['192.168.1.3']
+
+
+def test_url_request_descriptors():
+    """Basic URL request descriptors"""
+    req = Request.from_values('/bar?foo=baz', 'http://example.com/test')
+    assert req.path == u'/bar'
+    assert req.script_root == u'/test'
+    assert req.url == 'http://example.com/test/bar?foo=baz'
+    assert req.base_url == 'http://example.com/test/bar'
+    assert req.url_root == 'http://example.com/test/'
+    assert req.host_url == 'http://example.com/'
+    assert req.host == 'example.com'
+
+
+def test_authorization_mixin():
+    """Authorization mixin"""
+    request = Request.from_values(headers={
+        'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
+    })
+    a = request.authorization
+    assert a.type == 'basic'
+    assert a.username == 'Aladdin'
+    assert a.password == 'open sesame'
+
+
 def test_base_response():
     """Base respone behavior"""
     # unicode
          '01-Jan-1970 00:00:00 GMT; Max-Age=60; Path=/blub')
     ]
 
+    # delete cookie
+    response = BaseResponse()
+    response.delete_cookie('foo')
+    assert response.headers.to_list() == [
+        ('Content-Type', 'text/plain; charset=utf-8'),
+        ('Set-Cookie', 'foo=; expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/')
+    ]
+
+    # close call forwarding
+    closed = []
+    class Iterable(object):
+        def next(self):
+            raise StopIteration()
+        def __iter__(self):
+            return self
+        def close(self):
+            closed.append(True)
+    response = BaseResponse(Iterable())
+    response.call_on_close(lambda: closed.append(True))
+    app_iter, status, headers = run_wsgi_app(response,
+                                             create_environ(),
+                                             buffered=True)
+    assert status == '200 OK'
+    assert ''.join(app_iter) == ''
+    assert len(closed) == 2
+
+
+def test_response_status_codes():
+    """Response status codes"""
+    response = BaseResponse()
+    response.status_code = 404
+    assert response.status == '404 NOT FOUND'
+    response.status = '200 OK'
+    assert response.status_code == 200
+    response.status = '999 WTF'
+    assert response.status_code == 999
+    response.status_code = 588
+    assert response.status_code == 588
+    assert response.status == '588 UNKNOWN'
+    response.status = 'wtf'
+    assert response.status_code == 0
+
 
 def test_type_forcing():
     """Response Type forcing"""
         assert response.content_type == 'text/html'
 
     # without env, no arbitrary conversion
-    assert_raises(TypeError, "SpecialResponse.force_type(wsgi_application)")
+    assert_raises(TypeError, SpecialResponse.force_type, wsgi_application)
 
 
 def test_accept_mixin():
     assert request.accept_encodings == CharsetAccept([('gzip', 1), ('deflate', 1)])
     assert request.accept_languages == CharsetAccept([('en-us', 1), ('en', 0.5)])
 
+    request = Request({'HTTP_ACCEPT': ''})
+    assert request.accept_mimetypes == CharsetAccept()
+
 
 def test_etag_request_mixin():
     """ETag request-wrapper mixin"""
         assert request.user_agent.platform == platform
         assert request.user_agent.version == version
         assert request.user_agent.language == lang
+        assert bool(request.user_agent)
+        assert request.user_agent.to_header() == ua
+        assert str(request.user_agent) == ua
+
+    request = Request({'HTTP_USER_AGENT': 'foo'})
+    assert not request.user_agent
 
 
 def test_etag_response_mixin():
     assert not 'content-length' in resp.headers
 
 
+def test_etag_response_mixin_freezing():
+    """Freeze of the etag response mixin adds etag if mixed first"""
+    class WithFreeze(ETagResponseMixin, BaseResponse):
+        pass
+    class WithoutFreeze(BaseResponse, ETagResponseMixin):
+        pass
+
+    response = WithFreeze('Hello World')
+    response.freeze()
+    assert response.get_etag() == (generate_etag('Hello World'), False)
+    response = WithoutFreeze('Hello World')
+    response.freeze()
+    assert response.get_etag() == (None, None)
+    response = Response('Hello World')
+    response.freeze()
+    assert response.get_etag() == (None, None)
+
+
+def test_authenticate_mixin():
+    """Test the authenciate mixin of the response"""
+    resp = Response()
+    resp.www_authenticate.type = 'basic'
+    resp.www_authenticate.realm = 'Testing'
+    assert resp.headers['WWW-Authenticate'] == 'Basic realm="Testing"'
+    resp.www_authenticate.realm = None
+    resp.www_authenticate.type = None
+    assert 'WWW-Authenticate' not in resp.headers
+
+
 def test_response_stream_mixin():
     """Response stream response-wrapper mixin"""
     response = Response()
     class MyRequest(Request):
         dict_storage_class = dict
         list_storage_class = list
-    req = MyRequest.from_values(headers={
+        parameter_storage_class = dict
+    req = MyRequest.from_values('/?foo=baz', headers={
         'Cookie':   'foo=bar'
     })
     assert type(req.cookies) is dict
     assert req.cookies == {'foo': 'bar'}
     assert type(req.access_route) is list
 
+    assert type(req.args) is dict
+    assert type(req.values) is CombinedMultiDict
+    assert req.values['foo'] == 'baz'
+
     req = Request.from_values(headers={
         'Cookie':   'foo=bar'
     })
     MyRequest.list_storage_class = tuple
     req = MyRequest.from_values()
     assert type(req.access_route) is tuple
+
+
+def test_response_headers_passthrough():
+    """If headers are a Headers object they will be stored on the response"""
+    headers = Headers()
+    resp = Response(headers=headers)
+    assert resp.headers is headers

tests/test_wsgi.py

                           'https://example.com/app/hello',
                           collapse_http_schemes=False)
     assert x is None
+
+
+def test_get_host_fallback():
+    """Test non Host header server name guessing"""
+    assert get_host({
+        'SERVER_NAME':      'foobar.example.com',
+        'wsgi.url_scheme':  'http',
+        'SERVER_PORT':      '80'
+    }) == 'foobar.example.com'
+    assert get_host({
+        'SERVER_NAME':      'foobar.example.com',
+        'wsgi.url_scheme':  'http',
+        'SERVER_PORT':      '81'
+    }) == 'foobar.example.com:81'

werkzeug/_internal.py

     return proxy_repr
 
 
+def _get_environ(obj):
+    env = getattr(obj, 'environ', obj)
+    assert isinstance(env, dict), \
+        '%r is not a WSGI environment (has to be a dict)' % type(obj).__name__
+    return env
+
+
 def _log(type, message, *args, **kwargs):
     """Log into the internal werkzeug logger."""
     global _logger

werkzeug/datastructures.py

 
     def update(self, other_dict):
         """update() extends rather than replaces existing key lists."""
-        if isinstance(other_dict, MultiDict):
-            for key, value_list in other_dict.iterlists():
-                self.setlistdefault(key, []).extend(value_list)
-        elif isinstance(other_dict, dict):
-            for key, value in other_dict.iteritems():
-                self.setlistdefault(key, []).append(value)
-        else:
-            for key, value in other_dict:
-                self.setlistdefault(key, []).append(value)
+        for key, value in iter_multi_items(other_dict):
+            MultiDict.add(self, key, value)
 
     def pop(self, key, default=_missing):
         """Pop the first item for a list on the dict.  Afterwards the
        the internal bucket objects are exposed.
     """
 
+    # the key error this class raises.  Because of circular dependencies
+    # with the http exception module this class is created at the end of
+    # this module.
+    KeyError = None
+
     def __init__(self, mapping=None):
         dict.__init__(self)
         self._first_bucket = self._last_bucket = None
         return key, [x.value for x in buckets]
 
 
+def _options_header_vkw(value, kw):
+    if not kw:
+        return value
+    return dump_options_header(value, dict((k.replace('_', '-'), v)
+                                            for k, v in kw.items()))
+
+
 class Headers(object):
     """An object that stores some headers.  It has a dict-like interface
     but is ordered and can store the same keys multiple times.
 
         >>> headerlist = [('Content-Length', '40')]
         >>> headers = Headers.linked(headerlist)
-        >>> headers.add('Content-Type', 'text/html')
+        >>> headers['Content-Type'] = 'text/html'
         >>> headerlist
         [('Content-Length', '40'), ('Content-Type', 'text/html')]
 
         .. versionadded:: 0.4.1
             keyword arguments were added for :mod:`wsgiref` compatibility.
         """
-        if kw:
-            _value = dump_options_header(_value, dict((k.replace('_', '-'), v)
-                                                      for k, v in kw.items()))
-        self._list.append((_key, _value))
+        self._list.append((_key, _options_header_vkw(_value, kw)))
 
     def add_header(self, _key, _value, **_kw):
         """Add a new header tuple to the list.
         """Clears all headers."""
         del self._list[:]
 
-    def set(self, key, value):
+    def set(self, _key, _value, **kw):
         """Remove all header tuples for `key` and add a new one.  The newly
         added key either appears at the end of the list if there was no
         entry or replaces the first one.
 
+        Keyword arguments can specify additional parameters for the header
+        value, with underscores converted to dashes.  See :meth:`add` for
+        more information.
+
+        .. versionchanged:: 0.6.1
+           :meth:`set` now accepts the same arguments as :meth:`add`.
+
         :param key: The key to be inserted.
         :param value: The value to be inserted.
         """
-        lc_key = key.lower()
+        lc_key = _key.lower()
+        _value = _options_header_vkw(_value, kw)
         for idx, (old_key, old_value) in enumerate(self._list):
             if old_key.lower() == lc_key:
                 # replace first ocurrence
-                self._list[idx] = (key, value)
+                self._list[idx] = (_key, _value)
                 break
         else:
-            return self.add(key, value)
+            return self.add(_key, _value)
         self._list[idx + 1:] = [(k, v) for k, v in self._list[idx + 1:]
                                 if k.lower() != lc_key]
 
 
 # create all the special key errors now that the classes are defined.
 from werkzeug.exceptions import BadRequest
-for _cls in MultiDict, CombinedMultiDict, Headers, EnvironHeaders:
+for _cls in MultiDict, OrderedMultiDict, CombinedMultiDict, Headers, \
+            EnvironHeaders:
     _cls.KeyError = BadRequest.wrap(KeyError, _cls.__name__ + '.KeyError')
 del _cls

werkzeug/debug/repr.py

 from traceback import format_exception_only
 try:
     from collections import deque
-except ImportError:
+except ImportError: # pragma: no cover
     deque = None
 from werkzeug.utils import escape
 from werkzeug.debug.utils import render_template
     """
 
     def __call__(self, topic=None):
-        sys.stdout._write(self.get_help(topic))
-
-    def get_help(self, topic):
         title = text = None
         if topic is not None:
             import pydoc
             if len(paragraphs) > 1:
                 title = paragraphs[0]
                 text = '\n\n'.join(paragraphs[1:])
-            else:
+            else: # pragma: no cover
                 title = 'Help'
                 text = paragraphs[0]
-        return render_template('help_command.html', title=title, text=text)
+        rv = render_template('help_command.html', title=title, text=text)
+        sys.stdout._write(rv)
 
 helper = _Helper()
 
     def fallback_repr(self):
         try:
             info = ''.join(format_exception_only(*sys.exc_info()[:2]))
-        except:
+        except: # pragma: no cover
             info = '?'
         return u'<span class="brokenrepr">&lt;broken repr (%s)&gt;' \
                u'</span>' % escape(info.decode('utf-8', 'ignore').strip())

werkzeug/exceptions.py

     :license: BSD, see LICENSE for more details.
 """
 import sys
-from werkzeug._internal import HTTP_STATUS_CODES
+from werkzeug._internal import HTTP_STATUS_CODES, _get_environ
 
 
 class HTTPException(Exception):
         if description is not None:
             self.description = description
 
+    @classmethod
     def wrap(cls, exception, name=None):
-        """
-        This method returns a new subclass of the exception provided that
+        """This method returns a new subclass of the exception provided that
         also is a subclass of `BadRequest`.
         """
         class newcls(cls, exception):
         newcls.__module__ = sys._getframe(1).f_globals.get('__name__')
         newcls.__name__ = name or cls.__name__ + exception.__name__
         return newcls
-    wrap = classmethod(wrap)
 
+    @property
     def name(self):
         """The status name."""
         return HTTP_STATUS_CODES[self.code]
-    name = property(name, doc=name.__doc__)
 
     def get_description(self, environ):
         """Get the description."""
+        environ = _get_environ(environ)
         return self.description
 
     def get_body(self, environ):
         # with custom responses (testing exception instances against types) and
         # so we don't ever have to import the wrappers, but also because there
         # are circular dependencies when bootstrapping the module.
+        environ = _get_environ(environ)
         from werkzeug.wrappers import BaseResponse
         headers = self.get_headers(environ)
         return BaseResponse(self.get_body(environ), self.code, headers)
             if getattr(obj, 'code', None) is not None:
                 default_exceptions[obj.code] = obj
                 __all__.append(obj.__name__)
-        except TypeError:
+        except TypeError: # pragma: no cover
             continue
 _find_exceptions()
 del _find_exceptions

werkzeug/formparser.py

         raise ValueError('Missing boundary')
     if not is_valid_multipart_boundary(boundary):
         raise ValueError('Invalid boundary: %s' % boundary)
-    if len(boundary) > buffer_size:
+    if len(boundary) > buffer_size: # pragma: no cover
+        # this should never happen because we check for a minimum size
+        # of 1024 and boundaries may not be longer than 200.  The only
+        # situation when this happen is for non debug builds where
+        # the assert i skipped.
         raise ValueError('Boundary longer than buffer size')
 
     total_content_length = content_length
                     if in_memory > max_form_memory_size:
                         from werkzeug.exceptions import RequestEntityTooLarge
                         raise RequestEntityTooLarge()
-            else:
+            else: # pragma: no cover
                 raise ValueError('unexpected end of part')
 
             if is_file:
 import inspect
 try:
     from email.utils import parsedate_tz
-except ImportError:
+except ImportError: # pragma: no cover
     from email.Utils import parsedate_tz
 from urllib2 import parse_http_list as _parse_list_header
 from datetime import datetime, timedelta
 try:
     from hashlib import md5
-except ImportError:
+except ImportError: # pragma: no cover
     from md5 import new as md5
 
 
                          '^_`abcdefghijklmnopqrstuvwxyz|~')
 _etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
 _unsafe_header_chars = set('()<>@,;:\"/[]?={} \t')
+_quoted_string_re = r'"[^"\\]*(?:\\.[^"\\]*)*"'
+_option_header_piece_re = re.compile(r';\s*([^\s;=]+|%s)\s*(?:=\s*([^;]+|%s))?\s*' %
+    (_quoted_string_re, _quoted_string_re))
 
 _entity_headers = frozenset([
     'allow', 'content-encoding', 'content-language', 'content-length',
     :return: (str, options)
     """
     def _tokenize(string):
-        while string[:1] == ';':
-            string = string[1:]
-            end = string.find(';')
-            while end > 0 and string.count('"', 0, end) % 2:
-                end = string.find(';', end + 1)
-            if end < 0:
-                end = len(string)
-            value = string[:end]
-            yield value.strip()
-            string = string[end:]
+        for match in _option_header_piece_re.finditer(string):
+            key, value = match.groups()
+            key = unquote_header_value(key)
+            if value is not None:
+                value = unquote_header_value(value)
+            yield key, value
 
     if not value:
         return '', {}
 
     parts = _tokenize(';' + value)
-    name = parts.next()
-    extra = {}
-    for part in parts:
-        if '=' in part:
-            key, value = part.split('=', 1)
-            extra[key.strip().lower()] = unquote_header_value(value.strip())
-        else:
-            extra[part.strip()] = None
+    name = parts.next()[0]
+    extra = dict(parts)
     return name, extra
 
 
         auth_type, auth_info = value.split(None, 1)
         auth_type = auth_type.lower()
     except (ValueError, AttributeError):
-        return WWWAuthenticate(value.lower(), on_update=on_update)
+        return WWWAuthenticate(value.strip().lower(), on_update=on_update)
     return WWWAuthenticate(auth_type, parse_dict_header(auth_info),
                            on_update)
 

werkzeug/local.py

     from py.magic import greenlet
     get_current_greenlet = greenlet.getcurrent
     del greenlet
-except:
+except: # pragma: no cover
     # catch all, py.* fails with so many different errors.
     get_current_greenlet = int
 try:
     from thread import get_ident as get_current_thread, allocate_lock
-except ImportError:
+except ImportError: # pragma: no cover
     from dummy_thread import get_ident as get_current_thread, allocate_lock
 
 from werkzeug.wsgi import ClosingIterator
 # get the best ident function.  if greenlets are not installed we can
 # safely just use the builtin thread function and save a python methodcall
 # and the cost of calculating a hash.
-if get_current_greenlet is int:
+if get_current_greenlet is int: # pragma: no cover
     get_ident = get_current_thread
 else:
     get_ident = lambda: (get_current_thread(), get_current_greenlet())

werkzeug/posixemulation.py

 
 
 can_rename_open_file = False
-if os.name == 'nt':
+if os.name == 'nt': # pragma: no cover
     _rename = lambda src, dst: False
     _rename_atomic = lambda src, dst: False
 

werkzeug/routing.py

 from werkzeug.urls import url_encode, url_quote
 from werkzeug.utils import redirect, format_string
 from werkzeug.exceptions import HTTPException, NotFound, MethodNotAllowed
+from werkzeug._internal import _get_environ
 
 
 _rule_re = re.compile(r'''
         :param server_name: an optional server name hint (see above).
         :param subdomain: optionally the current subdomain (see above).
         """
-        if hasattr(environ, 'environ'):
-            environ = environ.environ
+        environ = _get_environ(environ)
         if server_name is None:
             if 'HTTP_HOST' in environ:
                 server_name = environ['HTTP_HOST']

werkzeug/serving.py

         # environment and subprocess.call does not like this, encode them
         # to latin1 and continue.
         if os.name == 'nt':
-            for key, value in new_environ:
+            for key, value in new_environ.iteritems():
                 if isinstance(value, unicode):
                     new_environ[key] = value.encode('iso-8859-1')
 
 from cookielib import CookieJar
 from urllib2 import Request as U2Request
 
-from werkzeug._internal import _empty_stream
+from werkzeug._internal import _empty_stream, _get_environ
 from werkzeug.wrappers import BaseRequest
 from werkzeug.urls import url_encode, url_fix, iri_to_uri
 from werkzeug.wsgi import get_host, get_current_url
     :param buffered: set to `True` to enforce buffering.
     :return: tuple in the form ``(app_iter, status, headers)``
     """
+    environ = _get_environ(environ)
     response = []
     buffer = []
 
     return _quote(s, safe)
 
 
+
+def _safe_urlsplit(s):
+    """the urllib.urlsplit cache breaks if it contains unicode and
+    we cannot control that.  So we force type cast that thing back
+    to what we think it is.
+    """
+    rv = urlparse.urlsplit(s)
+    if type(rv[1]) is not type(s):
+        try:
+            return tuple(map(type(s), rv))
+        except UnicodeError:
+            # oh well, we most likely will break later again, but
+            # let's just say it worked out well to that point.
+            pass
+    return rv
+
+
 def _unquote(s, unsafe=''):
     assert isinstance(s, str), 'unquote only works on bytes'
     unsafe = set(unsafe)
 
 def _uri_split(uri):
     """Splits up an URI or IRI."""
-    scheme, netloc, path, query, fragment = urlparse.urlsplit(uri)
+    scheme, netloc, path, query, fragment = _safe_urlsplit(uri)
 
     port = None
 
     """
     if isinstance(s, unicode):
         s = s.encode(charset, 'ignore')
-    scheme, netloc, path, qs, anchor = urlparse.urlsplit(s)
+    scheme, netloc, path, qs, anchor = _safe_urlsplit(s)
     path = _quote(path, '/%')
     qs = _quote_plus(qs, ':&%=')
     return urlparse.urlunsplit((scheme, netloc, path, qs, anchor))

werkzeug/wrappers.py

 from werkzeug.utils import cached_property, environ_property, \
      cookie_date, parse_cookie, dump_cookie, http_date, escape, \
      header_property, get_content_type
-from werkzeug.wsgi import get_current_url, get_host, LimitedStream
+from werkzeug.wsgi import get_current_url, get_host, LimitedStream, \
+     ClosingIterator
 from werkzeug.datastructures import MultiDict, CombinedMultiDict, Headers, \
      EnvironHeaders, ImmutableMultiDict, ImmutableTypeConversionDict, \
      ImmutableList, MIMEAccept, CharsetAccept, LanguageAccept, \
      ResponseCacheControl, RequestCacheControl, CallbackDict
 from werkzeug._internal import _empty_stream, _decode_unicode, \
-     _patch_wrapper
+     _patch_wrapper, _get_environ
 
 
 def _run_wsgi_app(*args):
     to the WSGI server is not a string.
     """
     if isinstance(iterable, basestring):
-        from warnings import Warning
+        from warnings import warn
         warn(Warning('response iterable was set to a string.  This appears '
                      'to work but means that the server will send the '
                      'data to the client char, by char.  This is almost '
            The `charset` parameter was deprecated and became a no-op.
         """
         # XXX: deprecated
-        if __debug__ and charset is not None:
+        if __debug__ and charset is not None: # pragma: no cover
             from warnings import warn
             warn(DeprecationWarning('charset was deprecated and is ignored.'),
                  stacklevel=2)
         self.set_cookie(key, expires=0, max_age=0, path=path, domain=domain)
 
     @property
-    def header_list(self):
+    def header_list(self): # pragma: no cover
         # XXX: deprecated
         if __debug__:
             from warnings import warn
             if __debug__:
                 _warn_if_string(self.response)
             return self.response
-        return self.iter_encoded()
+        return ClosingIterator(self.iter_encoded(), self.close)
 
     def get_wsgi_response(self, environ):
         """Returns the final WSGI response as tuple.  The first item in
     """Adds extra functionality to a response object for etag and cache
     handling.  This mixin requires an object with at least a `headers`
     object that implements a dict like interface similar to :class:`Headers`.
+
+    If you want the :meth:`freeze` method to automatically add an etag, you
+    have to mixin this method before the response base class.  The default
+    response class does not do that.
     """
 
     @property
                                    used to make the response conditional
                                    against.
         """
-        environ = getattr(request_or_environ, 'environ', request_or_environ)
+        environ = _get_environ(request_or_environ)
         if environ['REQUEST_METHOD'] in ('GET', 'HEAD'):
             self.headers['Date'] = http_date()
             if 'content-length' in self.headers:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.