1. Armin Ronacher
  2. pip

Commits

Hugo Lopes Tavares  committed ded8bfc

removed webob, wsgiproxy and wsgi_intercept as dependencies - now the tests use our own cache

  • Participants
  • Parent commits f7f4198
  • Branches default

Comments (0)

Files changed (3)

File setup.py

View file
       packages=['pip', 'pip.commands', 'pip.vcs'],
       entry_points=dict(console_scripts=['pip=pip:main']),
       test_suite='nose.collector',
-      tests_require=['nose', 'virtualenv', 'scripttest', 'wsgi_intercept', 'wsgiproxy', 'WebOb==dev,>=0.9.8post1', 'mock'],
+      tests_require=['nose', 'virtualenv', 'scripttest', 'mock'],
       zip_safe=False)

File tests/pypi_server.py

View file
+import urllib
+import urllib2
 import os
-import wsgi_intercept
-import urllib
-import webob
 from UserDict import DictMixin
-from wsgiproxy.exactproxy import proxy_exact_request
-from webob.dec import wsgify
-from wsgi_intercept.urllib2_intercept import install_opener
 
 
-class HasEverythingProxiedWSGIIntercept(DictMixin):
+urlopen_original = urllib2.urlopen
 
-    def __contains__(self, key):
-        return True
+
+class IgnoringCaseDict(DictMixin):
+
+    def __init__(self):
+        self._dict = dict()
+
+    def __getitem__(self, key):
+        return self._dict[key.lower()]
+
+    def __setitem__(self, key, value):
+        self._dict[key.lower()] = value
+
+    def keys(self):
+        return self._dict.keys()
+
+
+class CachedResponse(object):
+    """
+    CachedResponse always cache url access and returns the cached response.
+    It returns an object compatible with ``urllib.addinfourl``,
+    it means the object is like the result of a call like::
     
-    def __getitem__(self, item):
-        return (PyPIProxy, '')
+        >>> response = urllib2.urlopen('http://example.com')
+    """
+
+    def __init__(self, url, folder):
+        self.headers = IgnoringCaseDict() # maybe use httplib.HTTPMessage ??
+        self.code = 500
+        self.msg = 'Internal Server Error'
+        # url can be a simple string, or a urllib2.Request object
+        if isinstance(url, basestring):
+            self.url = url
+        else:
+            self.url = url.get_full_url()
+            self.headers.update(url.headers)
+        self._body = ''
+        self._set_all_fields(folder)
+
+    def _set_all_fields(self, folder):
+        filename = os.path.join(folder, urllib.quote(self.url, ''))
+        if not os.path.exists(filename):
+            self._cache_url(filename)
+        fp = open(filename, 'rb')
+        self.code, self.msg = fp.next().strip().split()
+        self.code = int(self.code)
+        for line in fp:
+            if line == '\n':
+                break
+            key, value = line.split(': ')
+            self.headers[key] = value.strip()
+        for line in fp:
+            self._body += line
+        fp.close()
+
+    def getcode(self):
+        return self.code
+
+    def geturl(self):
+        return self.url
+
+    def info(self):
+        return self.headers
+
+    def read(self, bytes=None):
+        """
+        it can read a chunk of bytes or everything
+        """
+        if bytes:
+            result = self._body[:bytes]
+            self._body = self._body[bytes:]
+            return result
+        return self._body
+
+    def close(self):
+        pass
+
+    def _cache_url(self, filepath):
+        response = urlopen_original(self.url)
+        fp = open(filepath, 'wb')
+        # when it uses file:// scheme, code is None and there is no msg attr
+        # but it has been successfully opened
+        status = '%s %s' % (response.getcode() or 200, getattr(response, 'msg', 'OK'))
+        headers = ['%s: %s' % (key, value) for key, value in response.headers.items()]
+        body = response.read()
+        fp.write('\n'.join([status] + headers + ['', body]))
+        fp.close()
 
 
 class PyPIProxy(object):
     def setup(cls):
         instance = cls()
         instance._create_cache_folder()
-        instance._add_wsgi_intercepts()
-        return instance
+        instance._monkey_patch_urllib2_to_cache_everything()
 
-    @wsgify
-    def __call__(self, request):
-        response = request.get_response(proxy_exact_request)
-        if self._is_a_request_to_cached_file(request):
-            return self._get_cached_response(request)
-        elif self._is_a_request_to_non_cached_file(request):
-            self._cache_file(request, response)
-        return response
-
-    def _get_cache_filename(self, request):
-        filename = urllib.quote(request.url,  '')
-        return os.path.join(self.CACHE_PATH, filename)
-
-    def _is_a_request_to_cached_file(self, request):
-        return (os.path.exists(self._get_cache_filename(request)) and
-                request.method == 'GET')
-
-    def _get_cached_response(self, request):
-        fp = open(self._get_cache_filename(request), 'rb')
-        response = webob.Response.from_file(fp)
-        fp.close()
-        return response
-
-    def _cache_file(self, request, response):
-        fp = open(self._get_cache_filename(request), 'wb')
-        fp.write(str(response))
-        fp.close()
-
-    def _is_a_request_to_non_cached_file(self, request):
-        return (not os.path.exists(self._get_cache_filename(request)) and
-                request.method == 'GET')
-
-    def _add_wsgi_intercepts(self):
-        """allow wsgi_intercept to work with urllib2 fakes"""
-        wsgi_intercept._wsgi_intercept = HasEverythingProxiedWSGIIntercept()
-        install_opener()
+    def _monkey_patch_urllib2_to_cache_everything(self):
+        def urlopen(url):
+            return CachedResponse(url, self.CACHE_PATH)
+        urllib2.urlopen = urlopen
 
     def _create_cache_folder(self):
         if not os.path.exists(self.CACHE_PATH):
             os.mkdir(self.CACHE_PATH)
+
+
+def test_cache_proxy():
+    url = 'http://example.com'
+    here = os.path.dirname(os.path.abspath(__file__))
+    filepath = os.path.join(here, urllib.quote(url, ''))
+    if os.path.exists(filepath):
+        os.remove(filepath)
+    response = urllib2.urlopen(url)
+    r = CachedResponse(url, here)
+    try:
+        assert r.code == response.code
+        assert r.getcode() == response.getcode()
+        assert r.msg == response.msg
+        assert r.read() == response.read()
+        assert r.url == response.url
+        assert r.geturl() == response.geturl()
+        assert r.headers.keys() == response.headers.keys()
+        assert r.info().keys() == response.info().keys()
+        assert r.headers['content-length'] == response.headers['content-length']
+    finally:
+        os.remove(filepath)

File tests/test_pip.py

View file
         site_packages = self.root_path / self.site_packages
         pth = open(os.path.join(site_packages, 'wsgi_intercept_pypi.pth'), 'w')
         pth.write('import sys; ')
-        cache_dependencies = 'webob paste wsgiproxy wsgi_intercept'.split()
-        dependency_paths = [self._find_package_path(d) for d in cache_dependencies]
-        for path in dependency_paths:
-            pth.write('sys.path.insert(0, %r); ' % path)
         pth.write('sys.path.insert(0, %r); ' % str(here))
         pth.write('import pypi_server; pypi_server.PyPIProxy.setup(); ')
-        pth.write('import pkg_resources; ')
-        # it is necessary to remove them to not affect the tests
-        for path in dependency_paths:
-            pth.write('pkg_resources.working_set.entries.remove(%r); ' % path)
         pth.write('sys.path.remove(%r); ' % str(here))
         pth.close()