Commits

Hugo Lopes Tavares committed bb00d7d

changed to cache all domains that are openend with urllib2.urlopen, starting with pypi.python.org, at the second time the tests are run

Comments (0)

Files changed (2)

tests/pypi_server.py

 import os
-import sys
 import re
+import urllib2
 import wsgi_intercept
+import urllib
+import webob
+from wsgiproxy.exactproxy import proxy_exact_request
+from webob.dec import wsgify
 from wsgi_intercept.urllib2_intercept import install_opener
 
 
-PYPI_DEFAULT_STATIC_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pypiserver")
+class PyPIProxy(object):
 
+    CACHE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pypi_cache")
+    DOMAIN_NAMES_FILEPATH = os.path.join(CACHE_PATH, 'domains.txt')
 
-def _test_get_similar_urls():
-    assert get_similar_urls('/simple/initools') == '/simple/INITools'
-    assert get_similar_urls('/simple/initools/0.2') == '/simple/INITools/0.2'
-    assert get_similar_urls('/simple/setuptools_git') == '/simple/setuptools-git'
-    assert get_similar_urls('/simple/setuptools_git/') == '/simple/setuptools-git/'
-    assert get_similar_urls('/simple/setuptools_git/setuptools_git-0.3.4.tar.gz') == '/simple/setuptools-git/setuptools_git-0.3.4.tar.gz'
+    @classmethod
+    def setup(cls):
+        instance = cls()
+        instance._monkey_patch_urllib2()
+        instance._create_cache_folder()
+        instance._add_wsgi_intercepts()
+        return instance
 
+    @wsgify
+    def __call__(self, request):
+        response = request.get_response(proxy_exact_request)
+        if self._is_a_request_to_cached_file(request):
+            return self._get_cached_response(request)
+        elif self._is_a_request_to_non_cached_file(request):
+            self._cache_file(request, response)
+        return response
 
+    def _get_cache_filename(self, request):
+        filename = urllib.quote(request.url,  '')
+        return os.path.join(self.CACHE_PATH, filename)
 
-def get_similar_urls(url):
-    r = re.search(r'/simple/([^/]+)', url)
-    here = os.path.dirname(os.path.abspath(__file__))
-    all_packages = os.listdir(os.path.join(here, 'pypiserver', 'simple'))
-    if r:
-        package_name = r.group(1)
-        for package in all_packages:
-            if re.match(package_name, package, re.IGNORECASE):
-                return re.sub(package_name, package, url, 1)
-            if re.match(package_name.replace('_', '-'), package):
-                return re.sub(package_name, package, url, 1)
-    return url
+    def _is_a_request_to_cached_file(self, request):
+        return (os.path.exists(self._get_cache_filename(request)) and
+                request.method == 'GET')
 
-def pypi_app():
-    def wsgi_app(environ, start_response):
-        headers = [('Content-type', 'text/html')]
-        path_tree = get_similar_urls(environ['PATH_INFO']).split('/')
-        url = os.path.join(PYPI_DEFAULT_STATIC_PATH, *path_tree)
-        filepath = url
-        if environ['PATH_INFO'].endswith('.gz'):
-            headers = [('Content-type', 'application/x-gtar')]
-        else:
-            filepath = os.path.join(url, 'index.html')
-        start_response('200 OK', headers)
-        if not os.path.exists(filepath):
-                return ''
-        return [open(filepath, 'rb').read()]
-    return wsgi_app
+    def _get_cached_response(self, request):
+        fp = open(self._get_cache_filename(request), 'rb')
+        response = webob.Response.from_file(fp)
+        fp.close()
+        return response
 
+    def _cache_file(self, request, response):
+        fp = open(self._get_cache_filename(request), 'wb')
+        fp.write(str(response))
+        fp.close()
 
-def use_fake_pypi():
-    # allow wsgi_intercept to work with urllib2 fakes
-    install_opener()
-    wsgi_intercept.add_wsgi_intercept('pypi.python.org', 80, pypi_app)
+    def _is_a_request_to_non_cached_file(self, request):
+        return (not os.path.exists(self._get_cache_filename(request)) and
+                request.method == 'GET')
 
+    def _monkey_patch_urllib2(self):
+        urlopen = urllib2.urlopen
+        def open_and_add_unknown_domains(arg):
+            if isinstance(arg, basestring):
+                self._store_domain(arg)
+            else:
+                self._store_domain(arg.get_full_url())
+            return urlopen(arg)
+        urllib2.urlopen = open_and_add_unknown_domains
 
-if __name__ == '__main__':
-    _test_get_similar_urls()
+    def _add_wsgi_intercepts(self):
+        """allow wsgi_intercept to work with urllib2 fakes"""
+        install_opener()
+        domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH)
+        for line in domain_fp:
+            wsgi_intercept.add_wsgi_intercept(line.strip(), 80, PyPIProxy)
+        domain_fp.close()
+
+    def _create_cache_folder(self):
+        if not os.path.exists(self.CACHE_PATH):
+            os.mkdir(self.CACHE_PATH)
+            domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH, 'w')
+            domain_fp.write('pypi.python.org\n')
+            domain_fp.close()
+
+    def _store_domain(self, url):
+        r = re.match(r'https?://([^/]+)', url)
+        if not r:
+            return
+        domain_line = r.group(1) + '\n'
+        if domain_line not in self._get_domains_to_be_intercepted():
+            domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH, 'a')
+            domain_fp.write(domain_line)
+            domain_fp.close()
+
+    def _get_domains_to_be_intercepted(self):
+        domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH)
+        domains = domain_fp.readlines()
+        domain_fp.close()
+        return domains

tests/test_pip.py

 
         # Install this version instead
         self.run('python', 'setup.py', 'install', cwd=src_folder, expect_stderr=True)
-        self._use_fake_pypi_server()
+        self._use_cached_pypi_server()
 
     def run(self, *args, **kw):
         if self.verbose:
     def __del__(self):
         shutil.rmtree(self.root_path, ignore_errors=True)
 
-    def _copy_wsgi_intercept_files(self):
-        download_and_extract_bz2_file_to_here(
-                'http://bitbucket.org/hltbra/'
-                'pip-fake-pypi-server/raw/0859b02feb0f/'
-                'wsgi_intercept_clean.tar.bz2')
-    
-    def _copy_pypiserver_fake_files(self):
-        download_and_extract_bz2_file_to_here(
-                'http://bitbucket.org/hltbra/'
-                'pip-fake-pypi-server/raw/10ef24c13180/'
-                'pypiserver.tar.bz2')
+    def _find_package_path(self, module):
+        return os.path.dirname(os.path.dirname(__import__(module).__file__))
 
-    def _use_fake_pypi_server(self):
-        if not os.path.exists(here/'pypiserver'):
-            self._copy_pypiserver_fake_files()
-        if not os.path.exists(here/'wsgi_intercept'):
-            self._copy_wsgi_intercept_files()
+    def _use_cached_pypi_server(self):
         site_packages = self.root_path / self.site_packages
         pth = open(os.path.join(site_packages, 'wsgi_intercept_pypi.pth'), 'w')
-        pth.write('import sys; sys.path.insert(0, %r); '
-                  'import pypi_server; '
-                  'pypi_server.use_fake_pypi(); '
-                  'sys.path.pop(0)' % str(here.abspath))
+        pth.write('import sys; ')
+        cache_dependencies = 'webob paste wsgiproxy wsgi_intercept'.split()
+        dependency_paths = [self._find_package_path(d) for d in cache_dependencies]
+        for path in dependency_paths:
+            pth.write('sys.path.insert(0, %r); ' % path)
+        pth.write('sys.path.insert(0, %r); ' % str(here))
+        pth.write('import pypi_server; pypi_server.PyPIProxy.setup(); ')
+        pth.write('import pkg_resources; ')
+        # it is necessary to remove them to not affect the tests
+        for path in dependency_paths:
+            pth.write('pkg_resources.working_set.entries.remove(%r); ' % path)
+        pth.write('sys.path.remove(%r); ' % str(here))
         pth.close()