Jason R. Coombs avatar Jason R. Coombs committed 64d11dc

Fixed failing test case on Windows

Comments (0)

Files changed (1)

python2/httplib2test.py

 
 class HttpTest(unittest.TestCase):
     def setUp(self):
-        if os.path.exists(cacheDirName): 
+        if os.path.exists(cacheDirName):
             [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
 
         if sys.version_info < (2, 6):
           pass
 
     def testConnectionType(self):
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
         response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
         self.assertEqual(response['content-location'], "http://bitworking.org")
         self.assertEqual(content, "the body")
 
     def testGetUnknownServer(self):
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
         try:
             self.http.request("http://fred.bitworking.org/")
             self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
 
         (response, content) = self.http.request("http://localhost:7777/")
         self.assertEqual(response['content-type'], 'text/plain')
-        self.assertTrue("Connection refused" in content)
+        self.assertTrue("Connection refused" in content
+            or "actively refused" in content,
+            "Unexpected status %(content)s" % vars())
         self.assertEqual(response.status, 400)
 
     def testGetIRI(self):
             uri = urlparse.urljoin(base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
             (response, content) = self.http.request(uri, "GET")
             d = self.reflector(content)
-            self.assertTrue(d.has_key('QUERY_STRING')) 
-            self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0) 
-    
+            self.assertTrue(d.has_key('QUERY_STRING'))
+            self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
+
     def testGetIsDefaultMethod(self):
         # Test that GET is the default method
         uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
         # Test that we can set a lower redirection limit
         # and that we raise an exception when we exceed
         # that limit.
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
 
         uri = urlparse.urljoin(base, "302/twostep.asis")
         try:
             self.fail("Threw wrong kind of exception ")
 
         # Re-run the test with out the exceptions
-        self.http.force_exception_to_status_code = True 
+        self.http.force_exception_to_status_code = True
 
         (response, content) = self.http.request(uri, "GET", redirections = 1)
         self.assertEqual(response.status, 500)
     def testGet302NoLocation(self):
         # Test that we throw an exception when we get
         # a 302 with no Location: header.
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
         uri = urlparse.urljoin(base, "302/no-location.asis")
         try:
             (response, content) = self.http.request(uri, "GET")
             self.fail("Threw wrong kind of exception ")
 
         # Re-run the test with out the exceptions
-        self.http.force_exception_to_status_code = True 
+        self.http.force_exception_to_status_code = True
 
         (response, content) = self.http.request(uri, "GET")
         self.assertEqual(response.status, 500)
     def testGetViaHttpsSpecViolationOnLocation(self):
         # Test that we follow redirects through HTTPS
         # even if they violate the spec by including
-        # a relative Location: header instead of an 
+        # a relative Location: header instead of an
         # absolute one.
         (response, content) = self.http.request("https://www.google.com/adsense", "GET")
         self.assertEqual(200, response.status)
 
     def testGetViaHttpsKeyCert(self):
         #  At this point I can only test
-        #  that the key and cert files are passed in 
-        #  correctly to httplib. It would be nice to have 
+        #  that the key and cert files are passed in
+        #  correctly to httplib. It would be nice to have
         #  a real https endpoint to test against.
 
         # bitworking.org presents an certificate for a non-matching host
     def test303ForDifferentMethods(self):
         # Test that all methods can be used
         uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
-        for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]: 
+        for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
             (response, content) = self.http.request(uri, method, body=" ")
             self.assertEqual(response['x-method'], method_on_303)
 
         self.assertEqual(response.fromcache, False)
 
     def testGetIgnoreEtag(self):
-        # Test that we can forcibly ignore ETags 
+        # Test that we can forcibly ignore ETags
         uri = urlparse.urljoin(base, "reflector/reflector.cgi")
         (response, content) = self.http.request(uri, "GET")
         self.assertNotEqual(response['etag'], "")
 
         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
         d = self.reflector(content)
-        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
+        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH'))
 
         self.http.ignore_etag = True
         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
         d = self.reflector(content)
         self.assertEqual(response.fromcache, False)
-        self.assertFalse(d.has_key('HTTP_IF_NONE_MATCH')) 
+        self.assertFalse(d.has_key('HTTP_IF_NONE_MATCH'))
 
     def testOverrideEtag(self):
-        # Test that we can forcibly ignore ETags 
+        # Test that we can forcibly ignore ETags
         uri = urlparse.urljoin(base, "reflector/reflector.cgi")
         (response, content) = self.http.request(uri, "GET")
         self.assertNotEqual(response['etag'], "")
 
         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
         d = self.reflector(content)
-        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
-        self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
+        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH'))
+        self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
 
         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
         d = self.reflector(content)
-        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
-        self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
+        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH'))
+        self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
 
 #MAP-commented this out because it consistently fails
 #    def testGet304EndToEnd(self):
 #        self.assertEqual(response.fromcache, True)
 
     def testGet304LastModified(self):
-        # Test that we can still handle a 304 
+        # Test that we can still handle a 304
         # by only using the last-modified cache validator.
         uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
         (response, content) = self.http.request(uri, "GET")
         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
         self.assertEqual(response.status, 200)
         self.assertEqual(response.fromcache, True, msg="Should be from cache")
-        
+
         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
         self.assertEqual(response.status, 200)
         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 
 
     def testHeadGZip(self):
-        # Test that we don't try to decompress a HEAD response 
+        # Test that we don't try to decompress a HEAD response
         uri = urlparse.urljoin(base, "gzip/final-destination.txt")
         (response, content) = self.http.request(uri, "HEAD")
         self.assertEqual(response.status, 200)
 
     def testGetGZipFailure(self):
         # Test that we raise a good exception when the gzip fails
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
         uri = urlparse.urljoin(base, "gzip/failed-compression.asis")
         try:
             (response, content) = self.http.request(uri, "GET")
             self.fail("Threw wrong kind of exception")
 
         # Re-run the test with out the exceptions
-        self.http.force_exception_to_status_code = True 
+        self.http.force_exception_to_status_code = True
 
         (response, content) = self.http.request(uri, "GET")
         self.assertEqual(response.status, 500)
         self.assertTrue(response.reason.startswith("Content purported"))
 
     def testTimeout(self):
-        self.http.force_exception_to_status_code = True 
+        self.http.force_exception_to_status_code = True
         uri = urlparse.urljoin(base, "timeout/timeout.cgi")
         try:
             import socket
-            socket.setdefaulttimeout(1) 
+            socket.setdefaulttimeout(1)
         except:
             # Don't run the test if we can't set the timeout
-            return 
+            return
         (response, content) = self.http.request(uri)
         self.assertEqual(response.status, 408)
         self.assertTrue(response.reason.startswith("Request Timeout"))
     def testIndividualTimeout(self):
         uri = urlparse.urljoin(base, "timeout/timeout.cgi")
         http = httplib2.Http(timeout=1)
-        http.force_exception_to_status_code = True 
+        http.force_exception_to_status_code = True
 
         (response, content) = http.request(uri)
         self.assertEqual(response.status, 408)
 
     def testGetDeflateFailure(self):
         # Test that we raise a good exception when the deflate fails
-        self.http.force_exception_to_status_code = False 
+        self.http.force_exception_to_status_code = False
 
         uri = urlparse.urljoin(base, "deflate/failed-compression.asis")
         try:
             self.fail("Threw wrong kind of exception")
 
         # Re-run the test with out the exceptions
-        self.http.force_exception_to_status_code = True 
+        self.http.force_exception_to_status_code = True
 
         (response, content) = self.http.request(uri, "GET")
         self.assertEqual(response.status, 500)
         self.assertEqual(response.fromcache, False)
 
     def testUpdateInvalidatesCache(self):
-        # Test that calling PUT or DELETE on a 
+        # Test that calling PUT or DELETE on a
         # URI that is cache invalidates that cache.
         uri = urlparse.urljoin(base, "304/test_etag.txt")
 
 
 
     def testUpdateUsesCachedETagAndOCMethod(self):
-        # Test that we natively support http://www.w3.org/1999/04/Editing/ 
+        # Test that we natively support http://www.w3.org/1999/04/Editing/
         uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
 
         (response, content) = self.http.request(uri, "GET")
 
 
     def testUpdateUsesCachedETagOverridden(self):
-        # Test that we natively support http://www.w3.org/1999/04/Editing/ 
+        # Test that we natively support http://www.w3.org/1999/04/Editing/
         uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
 
         (response, content) = self.http.request(uri, "GET")
         (response, content) = self.http.request(uri, "GET")
         self.assertEqual(response.status, 401)
 
-        domain = urlparse.urlparse(base)[1] 
+        domain = urlparse.urlparse(base)[1]
         self.http.add_credentials('joe', 'password', domain)
         (response, content) = self.http.request(uri, "GET")
         self.assertEqual(response.status, 200)
         uri = urlparse.urljoin(base, "reflector/reflector.cgi")
         (response, content) = self.http.request(uri, "GET")
         d = self.reflector(content)
-        self.assertTrue(d.has_key('HTTP_USER_AGENT')) 
+        self.assertTrue(d.has_key('HTTP_USER_AGENT'))
 
     def testConnectionClose(self):
         uri = "http://www.google.com/"
             self.fail("Should not throw exception")
 
     def testNormalizeHeaders(self):
-        # Test that we normalize headers to lowercase 
+        # Test that we normalize headers to lowercase
         h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
         self.assertTrue(h.has_key('cache-control'))
         self.assertTrue(h.has_key('other'))
 
     def testParseWWWAuthenticateEmpty(self):
         res = httplib2._parse_www_authenticate({})
-        self.assertEqual(len(res.keys()), 0) 
+        self.assertEqual(len(res.keys()), 0)
 
     def testParseWWWAuthenticate(self):
         # different uses of spaces around commas
         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
         self.assertEqual(len(res.keys()), 1)
         self.assertEqual(len(res['test'].keys()), 5)
-        
+
         # tokens with non-alphanum
         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
         self.assertEqual(len(res.keys()), 1)
         self.assertEqual(len(res['t*!%#st'].keys()), 2)
-        
+
         # quoted string with quoted pairs
         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
         self.assertEqual(len(res.keys()), 1)
 
 
     def testParseWWWAuthenticateDigest(self):
-        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
+        res = httplib2._parse_www_authenticate({ 'www-authenticate':
                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
         digest = res['digest']
         self.assertEqual('testrealm@host.com', digest['realm'])
 
 
     def testParseWWWAuthenticateMultiple(self):
-        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
+        res = httplib2._parse_www_authenticate({ 'www-authenticate':
                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
         digest = res['digest']
         self.assertEqual('testrealm@host.com', digest['realm'])
     def testParseWWWAuthenticateMultiple2(self):
         # Handle an added comma between challenges, which might get thrown in if the challenges were
         # originally sent in separate www-authenticate headers.
-        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
+        res = httplib2._parse_www_authenticate({ 'www-authenticate':
                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
         digest = res['digest']
         self.assertEqual('testrealm@host.com', digest['realm'])
     def testParseWWWAuthenticateMultiple3(self):
         # Handle an added comma between challenges, which might get thrown in if the challenges were
         # originally sent in separate www-authenticate headers.
-        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
+        res = httplib2._parse_www_authenticate({ 'www-authenticate':
                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
         digest = res['digest']
         self.assertEqual('testrealm@host.com', digest['realm'])
         self.assertEqual('UsernameToken', wsse['profile'])
 
     def testParseWWWAuthenticateMultiple4(self):
-        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
-                'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'}) 
+        res = httplib2._parse_www_authenticate({ 'www-authenticate':
+                'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
         digest = res['digest']
         self.assertEqual('test-real.m@host.com', digest['realm'])
         self.assertEqual('\tauth,auth-int', digest['qop'])
     def testDigestObject(self):
         credentials = ('joe', 'password')
         host = None
-        request_uri = '/projects/httplib2/test/digest/' 
+        request_uri = '/projects/httplib2/test/digest/'
         headers = {}
         response = {
             'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
         content = ""
 
         d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
-        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 
+        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
         our_request = "authorization: %s" % headers['authorization']
         working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
         self.assertEqual(our_request, working_request)
     def testDigestObjectStale(self):
         credentials = ('joe', 'password')
         host = None
-        request_uri = '/projects/httplib2/test/digest/' 
+        request_uri = '/projects/httplib2/test/digest/'
         headers = {}
         response = httplib2.Response({ })
         response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
     def testDigestObjectAuthInfo(self):
         credentials = ('joe', 'password')
         host = None
-        request_uri = '/projects/httplib2/test/digest/' 
+        request_uri = '/projects/httplib2/test/digest/'
         headers = {}
         response = httplib2.Response({ })
         response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
         end2end = httplib2._get_end2end_headers(response)
         self.assertEquals(0, len(end2end))
 
-        # Degenerate case of connection referrring to a header not passed in 
+        # Degenerate case of connection referrring to a header not passed in
         response = {'connection': 'content-type'}
         end2end = httplib2._get_end2end_headers(response)
         self.assertEquals(0, len(end2end))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.