Commits

Sergey Schetinin committed 968291f

add PendingDeprecationWarning for setting req.body_file to a string value

Comments (0)

Files changed (2)

tests/test_request.py

-import unittest
-from webob import Request, BaseRequest
+import unittest, warnings
+from webob import Request, BaseRequest, UTC
 
 _marker = object()
 
                    'CONTENT_LENGTH': len('before'),
                   }
         req = BaseRequest(environ)
+        warnings.simplefilter('ignore', PendingDeprecationWarning)
         req.body_file = AFTER
-        self.assertEqual(req.body_file.getvalue(), AFTER)
+        warnings.resetwarnings()
         self.assertEqual(req.content_length, len(AFTER))
+        self.assertEqual(req.body_file.read(), AFTER)
+        del req.body_file
+        self.assertEqual(req.content_length, 0)
+        assert req.is_body_seekable
+        req.body_file.seek(0)
+        self.assertEqual(req.body_file.read(), '')
+
+
 
     def test_body_file_setter_non_string(self):
         BEFORE = self._makeStringIO('before')
     # params
 
     def test_str_cookies_wo_webob_parsed_cookies(self):
-        from webob import Request
         environ = {
             'HTTP_COOKIE': 'a=b',
         }
     # copy
 
     def test_copy_get(self):
-        from webob import Request
         environ = {
             'HTTP_COOKIE': 'a=b',
         }
                 self.assertEqual(clone.environ[k], v)
 
     def test_remove_conditional_headers_accept_encoding(self):
-        from webob import Request
         req = Request.blank('/')
         req.accept_encoding='gzip,deflate'
         req.remove_conditional_headers()
 
     def test_remove_conditional_headers_if_modified_since(self):
         from datetime import datetime
-        from webob import Request, UTC
         req = Request.blank('/')
         req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC)
         req.remove_conditional_headers()
         self.assertEqual(req.if_modified_since, None)
 
     def test_remove_conditional_headers_if_none_match(self):
-        from webob import Request
         req = Request.blank('/')
         req.if_none_match = 'foo, bar'
         req.remove_conditional_headers()
         self.assertEqual(bool(req.if_none_match), False)
 
     def test_remove_conditional_headers_if_range(self):
-        from webob import Request
         req = Request.blank('/')
         req.if_range = 'foo, bar'
         req.remove_conditional_headers()
         self.assertEqual(bool(req.if_range), False)
 
     def test_remove_conditional_headers_range(self):
-        from webob import Request
         req = Request.blank('/')
         req.range = 'bytes=0-100'
         req.remove_conditional_headers()
         self.assertEqual(body, '<body skipped (len=337)>')
 
     def test_adhoc_attrs_set(self):
-        from webob import Request
         req = Request.blank('/')
         req.foo = 1
         self.assertEqual(req.environ['webob.adhoc_attrs'], {'foo': 1})
 
     def test_adhoc_attrs_set_nonadhoc(self):
-        from webob import Request
         req = Request.blank('/', environ={'webob.adhoc_attrs':{}})
         req.request_body_tempfile_limit = 1
         self.assertEqual(req.environ['webob.adhoc_attrs'], {})
 
     def test_adhoc_attrs_get(self):
-        from webob import Request
         req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}})
         self.assertEqual(req.foo, 1)
 
     def test_adhoc_attrs_get_missing(self):
-        from webob import Request
         req = Request.blank('/')
         self.assertRaises(AttributeError, getattr, req, 'some_attr')
 
     def test_adhoc_attrs_del(self):
-        from webob import Request
         req = Request.blank('/', environ={'webob.adhoc_attrs': {'foo': 1}})
         del req.foo
         self.assertEqual(req.environ['webob.adhoc_attrs'], {})
 
     def test_adhoc_attrs_del_missing(self):
-        from webob import Request
         req = Request.blank('/')
         self.assertRaises(AttributeError, delattr, req, 'some_attr')
 
         self.assert_("accepttypes is: application/xml" in res)
 
     def test_accept_best_match(self):
-        from webob import Request
         self.assert_(not Request.blank('/').accept)
         self.assert_(not Request.blank('/', headers={'Accept': ''}).accept)
         req = Request.blank('/', headers={'Accept':'text/plain'})
 
     def test_from_mimeparse(self):
         # http://mimeparse.googlecode.com/svn/trunk/mimeparse.py
-        from webob import Request
         supported = ['application/xbel+xml', 'application/xml']
         tests = [('application/xbel+xml', 'application/xbel+xml'),
                 ('application/xbel+xml; q=1', 'application/xbel+xml'),
             )
 
     def test_bad_cookie(self):
-        from webob import Request
         req = Request.blank('/')
         req.headers['Cookie'] = '070-it-:><?0'
         self.assertEqual(req.cookies, {})
         self.assertEqual(req.cookies, {'blub': 'Blah'})
 
     def test_cookie_quoting(self):
-        from webob import Request
         req = Request.blank('/')
         req.headers['Cookie'] = 'foo="?foo"; Path=/'
         self.assertEqual(req.cookies, {'foo': '?foo'})
 
     def test_path_quoting(self):
-        from webob import Request
         path = '/:@&+$,/bar'
         req = Request.blank(path)
         self.assertEqual(req.path, path)
         self.assert_(req.url.endswith(path))
 
     def test_params(self):
-        from webob import Request
         req = Request.blank('/?a=1&b=2')
         req.method = 'POST'
         req.body = 'b=3'
         self.assertEqual(req.GET[u'\u1000'], 'x')
 
     def test_copy_body(self):
-        from webob import Request
         req = Request.blank('/', method='POST', body='some text',
                             request_body_tempfile_limit=1)
         old_body_file = req.body_file_raw
 
     def test_broken_seek(self):
         # copy() should work even when the input has a broken seek method
-        from webob import Request
         req = Request.blank('/', method='POST',
                 body_file=UnseekableInputWithSeek('0123456789'),
                 content_length=10)
     def test_broken_clen_header(self):
         # if the UA sends "content_length: ..' header (the name is wrong)
         # it should not break the req.headers.items()
-        from webob import Request
         req = Request.blank('/')
         req.environ['HTTP_CONTENT_LENGTH'] = '0'
         req.headers.items()
 
     def test_nonstr_keys(self):
         # non-string env keys shouldn't break req.headers
-        from webob import Request
         req = Request.blank('/')
         req.environ[1] = 1
         req.headers.items()
 
 
     def test_authorization(self):
-        from webob import Request
         req = Request.blank('/')
         req.authorization = 'Digest uri="/?a=b"'
         self.assertEqual(req.authorization, ('Digest', {'uri': '/?a=b'}))
 
 
     def test_from_file(self):
-        from webob import Request
         req = Request.blank('http://example.com:8000/test.html?params')
         self.equal_req(req)
 
         self.equal_req(req)
 
     def test_req_kw_none_val(self):
-        from webob import Request
         request = Request({}, content_length=None)
         self.assert_('content-length' not in request.headers)
         self.assert_('content-type' not in request.headers)
 
     def test_env_keys(self):
-        from webob import Request
         req = Request.blank('/')
         # SCRIPT_NAME can be missing
         del req.environ['SCRIPT_NAME']
 
     def test_request_noenviron_param(self):
         # Environ is a a mandatory not null param in Request.
-        from webob import Request
         self.assertRaises(TypeError, Request, environ=None)
 
     def test_unicode_errors(self):
         # Passing unicode_errors != NoDefault should assign value to
         # dictionary['unicode_errors'], else not
-        from webob.request import NoDefault
-        from webob import Request
         r = Request({'a':1}, unicode_errors='strict')
         self.assert_('unicode_errors' in r.__dict__)
-        r = Request({'a':1}, unicode_errors=NoDefault)
+        r = Request({'a':1})
         self.assert_('unicode_errors' not in r.__dict__)
 
     def test_charset_deprecation(self):
         # Passed an attr in kw that does not exist in the class, should
         # raise an error
         # Passed an attr in kw that does exist in the class, should be ok
-        from webob import Request
         self.assertRaises(TypeError,
                           Request, {'a':1}, this_does_not_exist=1)
         r = Request({'a':1}, **{'charset':'utf-8', 'server_name':'127.0.0.1'})
         self.assertEqual(getattr(r, 'charset', None), 'utf-8')
         self.assertEqual(getattr(r, 'server_name', None), '127.0.0.1')
 
-    def test_body_file_setter(self):
-        # If body_file is passed and it's instance of str, we define
-        # environ['wsgi.input'] and content_length. Plus, while deleting the
-        # attribute, we should get '' and 0 respectively
-        from webob import Request
-        r = Request({'a':1}, **{'body_file':'hello world'})
-        self.assertEqual(r.environ['wsgi.input'].getvalue(), 'hello world')
-        self.assertEqual(int(r.environ['CONTENT_LENGTH']), len('hello world'))
-        del r.body_file
-        self.assertEqual(r.environ['wsgi.input'].getvalue(), '')
-        self.assertEqual(int(r.environ['CONTENT_LENGTH']), 0)
-
     def test_conttype_set_del(self):
         # Deleting content_type attr from a request should update the
         # environ dict
         # Assigning content_type should replace first option of the environ
         # dict
-        from webob import Request
         r = Request({'a':1}, **{'content_type':'text/html'})
         self.assert_('CONTENT_TYPE' in r.environ)
         self.assert_(hasattr(r, 'content_type'))
     def test_headers2(self):
         # Setting headers in init and later with a property, should update
         # the info
-        from webob import Request
         headers = {'Host': 'www.example.com',
                 'Accept-Language': 'en-us,en;q=0.5',
                 'Accept-Encoding': 'gzip,deflate',
     def test_host_url(self):
         # Request has a read only property host_url that combines several
         # keys to create a host_url
-        from webob import Request
         a = Request({'wsgi.url_scheme':'http'}, **{'host':'www.example.com'})
         self.assertEqual(a.host_url, 'http://www.example.com')
         a = Request({'wsgi.url_scheme':'http'}, **{'server_name':'localhost',
     def test_path_info_p(self):
         # Peek path_info to see what's coming
         # Pop path_info until there's nothing remaining
-        from webob import Request
         a = Request({'a':1}, **{'path_info':'/foo/bar','script_name':''})
         self.assertEqual(a.path_info_peek(), 'foo')
         self.assertEqual(a.path_info_pop(), 'foo')
 
     def test_urlvars_property(self):
         # Testing urlvars setter/getter/deleter
-        from webob import Request
         a = Request({'wsgiorg.routing_args':((),{'x':'y'}),
                     'paste.urlvars':{'test':'value'}})
         a.urlvars = {'hello':'world'}
 
     def test_urlargs_property(self):
         # Testing urlargs setter/getter/deleter
-        from webob import Request
         a = Request({'paste.urlvars':{'test':'value'}})
         self.assertEqual(a.urlargs, ())
         a.urlargs = {'hello':'world'}
 
     def test_host_property(self):
         # Testing host setter/getter/deleter
-        from webob import Request
         a = Request({'wsgi.url_scheme':'http'}, server_name='localhost',
                                                 server_port=5000)
         self.assertEqual(a.host, "localhost:5000")
         #self.assertEqual(a.body, '')
         # I need to implement a not seekable stringio like object.
         import string
-        from webob import Request
-        from webob import BaseRequest
         from cStringIO import StringIO
         class DummyIO(object):
             def __init__(self, txt):
     def test_post_does_not_reparse(self):
         # test that there's no repetitive parsing is happening on every
         # req.POST access
-        from webob import Request
         req = Request.blank('/',
             content_type='multipart/form-data; boundary=boundary',
             POST=_cgi_escaping_body
 
 
     def test_middleware_body(self):
-        from webob import Request
         def app(env, sr):
             sr('200 OK', [])
             return [env['wsgi.input'].read()]
         self.assertEqual(resp.headers['x-data'], 'abc')
 
     def test_body_file_noseek(self):
-        from webob import Request
         req = Request.blank('/', method='PUT', body='abc')
         lst = [req.body_file.read(1) for i in range(3)]
         self.assertEqual(lst, ['a','b','c'])
 
     def test_cgi_escaping_fix(self):
-        from webob import Request
         req = Request.blank('/',
             content_type='multipart/form-data; boundary=boundary',
             POST=_cgi_escaping_body
         self.assertEqual(req.POST.keys(), ['%20%22"'])
 
     def test_content_type_none(self):
-        from webob import Request
         r = Request.blank('/', content_type='text/html')
         self.assertEqual(r.content_type, 'text/html')
         r.content_type = None
 
     def test_charset_in_content_type(self):
-        from webob import Request
         r = Request({'CONTENT_TYPE':'text/html;charset=ascii'})
         r.charset = 'shift-jis'
         self.assertEqual(r.charset, 'shift-jis')
 
     def test_body_file_seekable(self):
         from cStringIO import StringIO
-        from webob import Request
         r = Request.blank('/')
         r.body_file = StringIO('body')
         self.assertEqual(r.body_file_seekable.read(), 'body')
 
     def test_request_init(self):
         # port from doctest (docs/reference.txt)
-        from webob import Request
         req = Request.blank('/article?id=1')
         self.assertEqual(req.environ['HTTP_HOST'], 'localhost:80')
         self.assertEqual(req.environ['PATH_INFO'], '/article')
         # port from doctest (docs/reference.txt)
 
         # Query & POST variables
-        from webob import Request
         from webob.multidict import MultiDict
         from webob.multidict import NestedMultiDict
         from webob.multidict import NoVars
 
     def test_request_put(self):
         from datetime import datetime
-        from webob import Request
         from webob import Response
         from webob import UTC
         from webob.acceptparse import MIMEAccept
         self.assert_(not server_token in req.if_match)
 
     def test_call_WSGI_app(self):
-        from webob import Request
         req = Request.blank('/')
         def wsgi_app(environ, start_response):
             start_response('200 OK', [('Content-type', 'text/plain')])
 
     def equal_req(self, req):
         from cStringIO import StringIO
-        from webob import Request
         input = StringIO(str(req))
         req2 = Request.from_file(input)
         self.assertEqual(req.url, req2.url)
 
 
 def simpleapp(environ, start_response):
-    from webob import Request
     status = '200 OK'
     response_headers = [('Content-type','text/plain')]
     start_response(status, response_headers)
                         "Unexpected keyword: %s=%r" % (name, value))
                 setattr(self, name, value)
 
+    # this is necessary for correct warnings depth for both
+    # BaseRequest and Request (due to AdhocAttrMixin.__setattr__)
+    _setattr_stacklevel = 2
 
     def _body_file__get(self):
         """
         return self.body_file_raw
     def _body_file__set(self, value):
         if isinstance(value, str):
-            # FIXME: This should issue a warning
+            # FIXME: change to DeprecationWarning in 1.1, raise exc in 1.2
+            warnings.warn(
+                "Please use req.body = 'str' or req.body_file = fileobj",
+                PendingDeprecationWarning,
+                stacklevel=self._setattr_stacklevel,
+            )
             self.body = value
-        else:
-            self.content_length = None
-            self.body_file_raw = value
-            self.is_body_seekable = False
-            self.is_body_readable = True
+            return
+        self.content_length = None
+        self.body_file_raw = value
+        self.is_body_seekable = False
+        self.is_body_readable = True
     def _body_file__del(self):
         self.body = ''
     body_file = property(_body_file__get,
 
 
 class AdhocAttrMixin(object):
+    _setattr_stacklevel = 3
+
     def __setattr__(self, attr, value, DEFAULT=object()):
         if (getattr(self.__class__, attr, DEFAULT) is not DEFAULT or
                     attr.startswith('_')):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.