Source

pypy / pypy / module / zipimport / test / test_zipimport.py

Full commit
import py, os
import time
import struct
from pypy.module.imp.importing import get_pyc_magic, _w_long
from pypy.module.imp.test.support import BaseImportTest
from StringIO import StringIO

from rpython.tool.udir import udir
from zipfile import ZIP_STORED, ZIP_DEFLATED


class AppTestZipimport(BaseImportTest):
    """ A bit structurized tests stolen and adapted from
    cpy's regression tests
    """
    compression = ZIP_STORED
    spaceconfig = {
        "usemodules": ['zipimport', 'rctime', 'struct', 'binascii', 'marshal'],
    }
    pathsep = os.path.sep

    @classmethod
    def make_pyc(cls, space, w_co, mtime):
        w_data = space.call_method(space.getbuiltinmodule('marshal'),
                                   'dumps', w_co)
        data = space.bytes_w(w_data)
        if type(mtime) is type(0.0):
            # Mac mtimes need a bit of special casing
            if mtime < 0x7fffffff:
                mtime = int(mtime)
            else:
                mtime = int(-0x100000000L + long(mtime))
        s = StringIO()
        try:
            _w_long(s, get_pyc_magic(space))
        except AttributeError:
            import imp
            s.write(imp.get_magic())
        pyc = s.getvalue() + struct.pack("<i", int(mtime)) + data
        return pyc

    @classmethod
    def make_class(cls):
        BaseImportTest.setup_class.im_func(cls)
        source = """\
def get_name():
    return __name__
def get_file():
    return __file__"""
        space = cls.space
        w = space.wrap
        w_co = space.call_method(space.builtin, 'compile',
                                 w(source), w('uuu.py'), w('exec'))

        tmpdir = udir.ensure('zipimport_%s' % cls.__name__, dir=1)
        now = time.time()
        cls.w_now = w(now)
        test_pyc = cls.make_pyc(space, w_co, now)
        cls.w_test_pyc = space.wrapbytes(test_pyc)
        cls.w_compression = w(cls.compression)
        cls.w_pathsep = w(cls.pathsep)
        #ziptestmodule = tmpdir.ensure('ziptestmodule.zip').write(
        ziptestmodule = tmpdir.join("somezip.zip")
        cls.w_tmpzip = w(str(ziptestmodule))
        cls.tmpdir = tmpdir

    def setup_class(cls):
        cls.make_class()

    def setup_method(self, meth):
        space = self.space
        name = "test_%s_%s.zip" % (self.__class__.__name__, meth.__name__)
        self.w_zipfile = space.wrap(str(self.tmpdir.join(name)))
        self.w_write_files = space.newlist([])
        w_cache = space.getattr(space.getbuiltinmodule('zipimport'),
                                space.wrap('_zip_directory_cache'))
        space.call_function(space.getattr(w_cache, space.wrap('clear')))
        self.w_modules = space.call_function(
            space.w_list,
            space.getattr(space.getbuiltinmodule('sys'),
                          space.wrap('modules')))

    def teardown_method(self, meth):
        space = self.space
        space.appexec([], """():
        import sys
        while sys.path[0].endswith('.zip'):
            sys.path.pop(0)
        """)
        space.appexec([self.w_modules], """(modules):
        import sys
        for module in sys.modules.copy():
            if module not in modules:
                del sys.modules[module]
        """)
        self.w_modules = []

    def w_now_in_the_future(self, delta):
        self.now += delta

    def w_writefile(self, filename, data):
        import sys
        import time
        from zipfile import ZipFile, ZipInfo
        z = ZipFile(self.zipfile, 'w')
        write_files = self.write_files
        filename = filename.replace('/', self.pathsep)
        write_files.append((filename, data))
        for filename, data in write_files:
            zinfo = ZipInfo(filename, time.localtime(self.now))
            zinfo.compress_type = self.compression
            z.writestr(zinfo, data)
        self.write_files = write_files
        # XXX populates sys.path, but at applevel
        if sys.path[0] != self.zipfile:
            sys.path.insert(0, self.zipfile)
        z.close()

    def test_cache(self):
        self.writefile('x.py', 'y')
        from zipimport import _zip_directory_cache, zipimporter
        new_importer = zipimporter(self.zipfile)
        try:
            assert zipimporter(self.zipfile) is not new_importer
        finally:
            del _zip_directory_cache[self.zipfile]

    def test_cache_subdir(self):
        import os
        self.writefile('x.py', '')
        self.writefile('sub/__init__.py', '')
        self.writefile('sub/yy.py', '')
        from zipimport import _zip_directory_cache, zipimporter
        sub_importer = zipimporter(self.zipfile + os.path.sep + 'sub')
        main_importer = zipimporter(self.zipfile)

        assert main_importer is not sub_importer
        assert main_importer.prefix == ""
        assert sub_importer.prefix == "sub" + os.path.sep

    def test_good_bad_arguments(self):
        from zipimport import zipimporter
        import os
        self.writefile("x.py", "y")
        zipimporter(self.zipfile) # should work
        raises(ImportError, "zipimporter(os.path.dirname(self.zipfile))")
        raises(ImportError, 'zipimporter("fsafdetrssffdsagadfsafdssadasa")')
        name = os.path.join(os.path.dirname(self.zipfile), "x.zip")
        f = open(name, "w")
        f.write("zzz")
        f.close()
        raises(ImportError, 'zipimporter(name)')
        # this should work as well :-/
        zipimporter(os.path.join(self.zipfile, 'x'))

    def test_py(self):
        import sys, os
        self.writefile("uuu.py", "def f(x): return x")
        mod = __import__('uuu', globals(), locals(), [])
        print(mod)
        assert mod.f(3) == 3
        expected = {
            '__doc__' : None,
            '__name__' : 'uuu',
            'f': mod.f}
        for key, val in expected.items():
            assert mod.__dict__[key] == val
        assert mod.__file__.endswith('.zip'+os.sep+'uuu.py')

    def test_pyc(self):
        import sys, os
        self.writefile("uuu.pyc", self.test_pyc)
        self.writefile("uuu.py", "def f(x): return x")
        mod = __import__('uuu', globals(), locals(), [])
        expected = {
            '__doc__' : None,
            '__name__' : 'uuu',
            'get_name' : mod.get_name,
            'get_file' : mod.get_file
        }
        for key, val in expected.items():
            assert mod.__dict__[key] == val
        assert mod.__file__.endswith('.zip'+os.sep+'uuu.pyc')
        assert mod.get_file() == mod.__file__
        assert mod.get_name() == mod.__name__
        #
        import zipimport
        z = zipimport.zipimporter(self.zipfile)
        code = z.get_code('uuu')
        assert isinstance(code, type((lambda:0).__code__))

    def test_bad_pyc(self):
        import zipimport
        import sys
        m0 = self.test_pyc[0]
        m0 ^= 0x04
        test_pyc = bytes([m0]) + self.test_pyc[1:]
        self.writefile("uu.pyc", test_pyc)
        raises(ImportError, "__import__('uu', globals(), locals(), [])")
        assert 'uu' not in sys.modules

    def test_force_py(self):
        import sys
        m0 = self.test_pyc[0]
        m0 ^= 0x04
        test_pyc = bytes([m0]) + self.test_pyc[1:]
        self.writefile("uu.pyc", test_pyc)
        self.writefile("uu.py", "def f(x): return x")
        mod = __import__("uu", globals(), locals(), [])
        assert mod.f(3) == 3

    def test_sys_modules(self):
        m0 = self.test_pyc[0]
        m0 ^= 0x04
        test_pyc = bytes([m0]) + self.test_pyc[1:]
        self.writefile("uuu.pyc", test_pyc)
        import sys
        import zipimport
        z = zipimport.zipimporter(self.zipfile)
        sys.modules['uuu'] = lambda x : x + 1
        raises(ImportError, z.load_module, 'uuu')
        raises(zipimport.ZipImportError, z.get_code, 'uuu')

    def test_package(self):
        import os, sys
        self.writefile("xxuuu/__init__.py", "")
        self.writefile("xxuuu/yy.py", "def f(x): return x")
        mod = __import__("xxuuu", globals(), locals(), ['yy'])
        assert mod.__path__ == [self.zipfile + os.path.sep + "xxuuu"]
        assert mod.__file__ == (self.zipfile + os.path.sep
                                + "xxuuu" + os.path.sep
                                + "__init__.py")
        assert mod.yy.f(3) == 3

    def test_package_bug(self):
        import os, sys
        import types
        mod = types.ModuleType('xxuuv')
        mod.__path__ = [self.zipfile + '/xxuuv']
        sys.modules['xxuuv'] = mod
        #
        self.writefile("xxuuv/__init__.py", "")
        self.writefile("xxuuv/yy.py", "def f(x): return x")
        mod = __import__("xxuuv.yy", globals(), locals(), ['__doc__'])
        assert mod.__file__ == (self.zipfile + os.path.sep
                                + "xxuuv" + os.path.sep
                                + "yy.py")
        assert mod.f(3) == 3

    def test_pyc_in_package(self):
        import os, sys
        import types
        mod = types.ModuleType('xxuuw')
        mod.__path__ = [self.zipfile + '/xxuuw']
        sys.modules['xxuuw'] = mod
        #
        self.writefile("xxuuw/__init__.py", "")
        self.writefile("xxuuw/zz.pyc", self.test_pyc)
        mod = __import__("xxuuw.zz", globals(), locals(), ['__doc__'])
        assert mod.__file__ == (self.zipfile + os.path.sep
                                + "xxuuw" + os.path.sep
                                + "zz.pyc")
        assert mod.get_file() == mod.__file__
        assert mod.get_name() == mod.__name__

    def test_functions(self):
        import os
        import zipimport
        data = b"saddsadsa"
        pyc_data = self.test_pyc
        self.now_in_the_future(+5)   # write the zipfile 5 secs after the .pyc
        self.writefile("xxx", data)
        self.writefile("xx/__init__.py", "5")
        self.writefile("yy.py", "3")
        self.writefile('uu.pyc', pyc_data)
        z = zipimport.zipimporter(self.zipfile)
        assert z.get_data(self.zipfile + os.sep + "xxx") == data
        assert z.is_package("xx")
        assert not z.is_package("yy")
        assert z.get_source("yy") == '3'
        assert z.get_source('uu') is None
        raises(ImportError, "z.get_source('zz')")
        #assert z.get_code('yy') == py.code.Source('3').compile()
        #assert z.get_code('uu') == self.co
        assert z.get_code('uu')
        assert z.get_code('xx')
        assert z.get_source('xx') == "5"
        assert z.archive == self.zipfile
        mod = z.load_module('xx')
        assert z.get_filename('xx') == mod.__file__

    def test_archive(self):
        """
        The archive attribute of zipimport.zipimporter gives the path to the
        zipfile itself.
        """
        import os
        import zipimport
        self.writefile("directory/package/__init__.py", "")
        importer = zipimport.zipimporter(self.zipfile + "/directory")
        # Grab this so if the assertion fails, py.test will display its
        # value.  Not sure why it doesn't the assertion uses import.archive
        # directly. -exarkun
        archive = importer.archive
        realprefix = importer.prefix
        allbutlast = self.zipfile.split(os.path.sep)[:-1]
        prefix = 'directory' + os.path.sep
        assert archive == self.zipfile
        assert realprefix == prefix

    def test_subdirectory_importer(self):
        import os
        import zipimport
        self.writefile("directory/package/__init__.py", "")
        z = zipimport.zipimporter(self.zipfile + "/directory")
        mod = z.load_module("package")
        assert z.is_package("package")
        assert z.get_filename("package") == mod.__file__

    def test_subdirectory_twice(self):
        #import os, zipimport

        self.writefile("package/__init__.py", "")
        self.writefile("package/subpackage/__init__.py", "")
        self.writefile("package/subpackage/foo.py", "")
        mod = __import__('package.subpackage.foo', None, None, [])
        assert mod

    def test_zip_directory_cache(self):
        """ Check full dictionary interface
        """
        import os
        import zipimport
        self.writefile("directory/package/__init__.py", "")
        importer = zipimport.zipimporter(self.zipfile + "/directory")
        l = [i for i in zipimport._zip_directory_cache]
        assert len(l)

    def test_path_hooks(self):
        import sys
        import zipimport
        assert sys.path_hooks.count(zipimport.zipimporter) == 1

    def test_co_filename(self):
        self.writefile('mymodule.py', """
def get_co_filename():
    return get_co_filename.__code__.co_filename
""")
        import os
        expected = self.zipfile + os.sep + 'mymodule.py'
        #
        import mymodule
        co_filename = mymodule.get_co_filename()
        assert co_filename == expected
        #
        import zipimport
        z = zipimport.zipimporter(self.zipfile)
        code = z.get_code('mymodule')
        co_filename = code.co_filename
        assert co_filename == expected

    def test_unencodable(self):
        if not self.testfn_unencodable:
            skip("need an unencodable filename")
        import os
        import time
        import zipimport
        from zipfile import ZipFile, ZipInfo
        filename = self.testfn_unencodable + ".zip"
        z = ZipFile(filename, "w")
        zinfo = ZipInfo("uu.py", time.localtime(self.now))
        zinfo.compress_type = self.compression
        z.writestr(zinfo, '')
        z.close()
        try:
            zipimport.zipimporter(filename)
        finally:
            os.remove(filename)


class AppTestZipimportDeflated(AppTestZipimport):
    compression = ZIP_DEFLATED
    spaceconfig = {
        "usemodules": ['zipimport', 'zlib', 'rctime', 'struct', 'binascii'],
    }

    def setup_class(cls):
        try:
            import rpython.rlib.rzlib
        except ImportError:
            py.test.skip("zlib not available, cannot test compressed zipfiles")
        cls.make_class()


if os.sep != '/':
    class AppTestNativePathSep(AppTestZipimport):
        pathsep = os.sep