Commits

Jeff Hardy committed 3ebc05d

Upgrade tests to Python 2.7.

Comments (0)

Files changed (5)

 
 # based on Andrew Kuchling's minigzip.py distributed with the zlib module
 
-import struct, sys, time
+import struct, sys, time, os
 import zlib
+import io
 import __builtin__
 
 __all__ = ["GzipFile","open"]
     """
     return GzipFile(filename, mode, compresslevel)
 
-class GzipFile:
+class GzipFile(io.BufferedIOBase):
     """The GzipFile class simulates most of the methods of a file object with
     the exception of the readinto() and truncate() methods.
 
     max_read_chunk = 10 * 1024 * 1024   # 10Mb
 
     def __init__(self, filename=None, mode=None,
-                 compresslevel=9, fileobj=None):
+                 compresslevel=9, fileobj=None, mtime=None):
         """Constructor for the GzipFile class.
 
         At least one of fileobj and filename must be given a
         level of compression; 1 is fastest and produces the least compression,
         and 9 is slowest and produces the most compression.  The default is 9.
 
+        The mtime argument is an optional numeric timestamp to be written
+        to the stream when compressing.  All gzip compressed streams
+        are required to contain a timestamp.  If omitted or None, the
+        current time is used.  This module ignores the timestamp when
+        decompressing; however, some programs, such as gunzip, make use
+        of it.  The format of the timestamp is the same as that of the
+        return value of time.time() and of the st_mtime member of the
+        object returned by os.stat().
+
         """
 
         # guarantee the file is opened in binary mode on platforms
             self.mode = READ
             # Set flag indicating start of a new member
             self._new_member = True
+            # Buffer data read from gzip file. extrastart is offset in
+            # stream where buffer starts. extrasize is number of
+            # bytes remaining in buffer from current stream position.
             self.extrabuf = ""
             self.extrasize = 0
+            self.extrastart = 0
             self.name = filename
             # Starts small, scales exponentially
             self.min_readsize = 100
 
         self.fileobj = fileobj
         self.offset = 0
+        self.mtime = mtime
 
         if self.mode == WRITE:
             self._write_gzip_header()
     @property
     def filename(self):
         import warnings
-        warnings.warn("use the name attribute", DeprecationWarning)
+        warnings.warn("use the name attribute", DeprecationWarning, 2)
         if self.mode == WRITE and self.name[-3:] != ".gz":
             return self.name + ".gz"
         return self.name
         s = repr(self.fileobj)
         return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'
 
+    def _check_closed(self):
+        """Raises a ValueError if the underlying file object has been closed.
+
+        """
+        if self.closed:
+            raise ValueError('I/O operation on closed file.')
+
     def _init_write(self, filename):
         self.name = filename
         self.crc = zlib.crc32("") & 0xffffffffL
     def _write_gzip_header(self):
         self.fileobj.write('\037\213')             # magic header
         self.fileobj.write('\010')                 # compression method
-        fname = self.name
+        fname = os.path.basename(self.name)
         if fname.endswith(".gz"):
             fname = fname[:-3]
         flags = 0
         if fname:
             flags = FNAME
         self.fileobj.write(chr(flags))
-        write32u(self.fileobj, long(time.time()))
+        mtime = self.mtime
+        if mtime is None:
+            mtime = time.time()
+        write32u(self.fileobj, long(mtime))
         self.fileobj.write('\002')
         self.fileobj.write('\377')
         if fname:
         if method != 8:
             raise IOError, 'Unknown compression method'
         flag = ord( self.fileobj.read(1) )
-        # modtime = self.fileobj.read(4)
+        self.mtime = read32(self.fileobj)
         # extraflag = self.fileobj.read(1)
         # os = self.fileobj.read(1)
-        self.fileobj.read(6)
+        self.fileobj.read(2)
 
         if flag & FEXTRA:
             # Read & discard the extra field, if present
         if flag & FHCRC:
             self.fileobj.read(2)     # Read & discard the 16-bit header CRC
 
-
     def write(self,data):
+        self._check_closed()
         if self.mode != WRITE:
             import errno
             raise IOError(errno.EBADF, "write() on read-only GzipFile object")
 
         if self.fileobj is None:
             raise ValueError, "write() on closed GzipFile object"
+
+        # Convert data type if called by io.BufferedWriter.
+        if isinstance(data, memoryview):
+            data = data.tobytes()
+
         if len(data) > 0:
             self.size = self.size + len(data)
             self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
             self.fileobj.write( self.compress.compress(data) )
             self.offset += len(data)
 
+        return len(data)
+
     def read(self, size=-1):
+        self._check_closed()
         if self.mode != READ:
             import errno
             raise IOError(errno.EBADF, "read() on write-only GzipFile object")
                 if size > self.extrasize:
                     size = self.extrasize
 
-        chunk = self.extrabuf[:size]
-        self.extrabuf = self.extrabuf[size:]
+        offset = self.offset - self.extrastart
+        chunk = self.extrabuf[offset: offset + size]
         self.extrasize = self.extrasize - size
 
         self.offset += size
         return chunk
 
     def _unread(self, buf):
-        self.extrabuf = buf + self.extrabuf
         self.extrasize = len(buf) + self.extrasize
         self.offset -= len(buf)
 
 
     def _add_read_data(self, data):
         self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
-        self.extrabuf = self.extrabuf + data
+        offset = self.offset - self.extrastart
+        self.extrabuf = self.extrabuf[offset:] + data
         self.extrasize = self.extrasize + len(data)
+        self.extrastart = self.offset
         self.size = self.size + len(data)
 
     def _read_eof(self):
         elif isize != (self.size & 0xffffffffL):
             raise IOError, "Incorrect length of data produced"
 
+        # Gzip files can be padded with zeroes and still have archives.
+        # Consume all zero bytes and set the file position to the first
+        # non-zero byte. See http://www.gzip.org/#faq8
+        c = "\x00"
+        while c == "\x00":
+            c = self.fileobj.read(1)
+        if c:
+            self.fileobj.seek(-1, 1)
+
+    @property
+    def closed(self):
+        return self.fileobj is None
+
     def close(self):
         if self.fileobj is None:
             return
             self.myfileobj.close()
             self.myfileobj = None
 
-    def __del__(self):
-        try:
-            if (self.myfileobj is None and
-                self.fileobj is None):
-                return
-        except AttributeError:
-            return
-        self.close()
-
     def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
+        self._check_closed()
         if self.mode == WRITE:
             # Ensure the compressor's buffer is flushed
             self.fileobj.write(self.compress.flush(zlib_mode))
-        self.fileobj.flush()
+            self.fileobj.flush()
 
     def fileno(self):
         """Invoke the underlying file object's fileno() method.
         """
         return self.fileobj.fileno()
 
-    def isatty(self):
-        return False
-
-    def tell(self):
-        return self.offset
-
     def rewind(self):
         '''Return the uncompressed stream file position indicator to the
         beginning of the file'''
         self._new_member = True
         self.extrabuf = ""
         self.extrasize = 0
+        self.extrastart = 0
         self.offset = 0
 
+    def readable(self):
+        return self.mode == READ
+
+    def writable(self):
+        return self.mode == WRITE
+
+    def seekable(self):
+        return True
+
     def seek(self, offset, whence=0):
         if whence:
             if whence == 1:
                 self.read(1024)
             self.read(count % 1024)
 
+        return self.offset
+
     def readline(self, size=-1):
         if size < 0:
+            # Shortcut common case - newline found in buffer.
+            offset = self.offset - self.extrastart
+            i = self.extrabuf.find('\n', offset) + 1
+            if i > 0:
+                self.extrasize -= i - offset
+                self.offset += i - offset
+                return self.extrabuf[offset: i]
+
             size = sys.maxint
             readsize = self.min_readsize
         else:
             self.min_readsize = min(readsize, self.min_readsize * 2, 512)
         return ''.join(bufs) # Return resulting line
 
-    def readlines(self, sizehint=0):
-        # Negative numbers result in reading all the lines
-        if sizehint <= 0:
-            sizehint = sys.maxint
-        L = []
-        while sizehint > 0:
-            line = self.readline()
-            if line == "":
-                break
-            L.append(line)
-            sizehint = sizehint - len(line)
-
-        return L
-
-    def writelines(self, L):
-        for line in L:
-            self.write(line)
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        line = self.readline()
-        if line:
-            return line
-        else:
-            raise StopIteration
-
 
 def _test():
     # Act like gzip; with -d, act like gunzip.

tests/test/test_support.py

 
 import contextlib
 import errno
+import functools
+import gc
 import socket
 import sys
 import os
+import platform
 import shutil
 import warnings
 import unittest
+import importlib
+import UserDict
+import re
+import time
+try:
+    import thread
+except ImportError:
+    thread = None
 
-__all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module",
+__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
            "verbose", "use_resources", "max_memuse", "record_original_stdout",
            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
            "is_resource_enabled", "requires", "find_unused_port", "bind_port",
            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
-           "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
-           "open_urlresource", "check_warnings", "CleanImport",
-           "EnvironmentVarGuard", "captured_output",
+           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
+           "open_urlresource", "check_warnings", "check_py3k_warnings",
+           "CleanImport", "EnvironmentVarGuard", "captured_output",
            "captured_stdout", "TransientResource", "transient_internet",
            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
-           "threading_cleanup", "reap_children"]
+           "threading_cleanup", "reap_children", "cpython_only",
+           "check_impl_detail", "get_attribute", "py3k_bytes"]
+
 
 class Error(Exception):
     """Base class for regression test exceptions."""
 class TestFailed(Error):
     """Test failed."""
 
-class TestSkipped(Error):
-    """Test skipped.
-
-    This can be raised to indicate that a test was deliberatly
-    skipped, but not because a feature wasn't available.  For
-    example, if some resource can't be used, such as the network
-    appears to be unavailable, this should be raised instead of
-    TestFailed.
-    """
-
-class ResourceDenied(TestSkipped):
+class ResourceDenied(unittest.SkipTest):
     """Test skipped because it requested a disallowed resource.
 
     This is raised when a test calls requires() for a resource that
-    has not be enabled.  It is used to distinguish between expected
+    has not been enabled.  It is used to distinguish between expected
     and unexpected skips.
     """
 
-def import_module(name, deprecated=False):
-    """Import the module to be tested, raising TestSkipped if it is not
-    available."""
-    with warnings.catch_warnings():
-        if deprecated:
+@contextlib.contextmanager
+def _ignore_deprecated_imports(ignore=True):
+    """Context manager to suppress package and module deprecation
+    warnings when importing them.
+
+    If ignore is False, this context manager has no effect."""
+    if ignore:
+        with warnings.catch_warnings():
             warnings.filterwarnings("ignore", ".+ (module|package)",
                                     DeprecationWarning)
+            yield
+    else:
+        yield
+
+
+def import_module(name, deprecated=False):
+    """Import and return the module to be tested, raising SkipTest if
+    it is not available.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    with _ignore_deprecated_imports(deprecated):
         try:
-            module = __import__(name, level=0)
-        except ImportError:
-            raise TestSkipped("No module named " + name)
-        else:
-            return module
+            return importlib.import_module(name)
+        except ImportError, msg:
+            raise unittest.SkipTest(str(msg))
+
+
+def _save_and_remove_module(name, orig_modules):
+    """Helper function to save and remove a module from sys.modules
+
+       Return value is True if the module was in sys.modules and
+       False otherwise."""
+    saved = True
+    try:
+        orig_modules[name] = sys.modules[name]
+    except KeyError:
+        saved = False
+    else:
+        del sys.modules[name]
+    return saved
+
+
+def _save_and_block_module(name, orig_modules):
+    """Helper function to save and block a module in sys.modules
+
+       Return value is True if the module was in sys.modules and
+       False otherwise."""
+    saved = True
+    try:
+        orig_modules[name] = sys.modules[name]
+    except KeyError:
+        saved = False
+    sys.modules[name] = None
+    return saved
+
+
+def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
+    """Imports and returns a module, deliberately bypassing the sys.modules cache
+    and importing a fresh copy of the module. Once the import is complete,
+    the sys.modules cache is restored to its original state.
+
+    Modules named in fresh are also imported anew if needed by the import.
+
+    Importing of modules named in blocked is prevented while the fresh import
+    takes place.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    # NOTE: test_heapq and test_warnings include extra sanity checks to make
+    # sure that this utility function is working as expected
+    with _ignore_deprecated_imports(deprecated):
+        # Keep track of modules saved for later restoration as well
+        # as those which just need a blocking entry removed
+        orig_modules = {}
+        names_to_remove = []
+        _save_and_remove_module(name, orig_modules)
+        try:
+            for fresh_name in fresh:
+                _save_and_remove_module(fresh_name, orig_modules)
+            for blocked_name in blocked:
+                if not _save_and_block_module(blocked_name, orig_modules):
+                    names_to_remove.append(blocked_name)
+            fresh_module = importlib.import_module(name)
+        finally:
+            for orig_name, module in orig_modules.items():
+                sys.modules[orig_name] = module
+            for name_to_remove in names_to_remove:
+                del sys.modules[name_to_remove]
+        return fresh_module
+
+
+def get_attribute(obj, name):
+    """Get an attribute, raising SkipTest if AttributeError is raised."""
+    try:
+        attribute = getattr(obj, name)
+    except AttributeError:
+        raise unittest.SkipTest("module %s has no attribute %s" % (
+            obj.__name__, name))
+    else:
+        return attribute
+
 
 verbose = 1              # Flag set to 0 by regrtest.py
 use_resources = None     # Flag set to [] by regrtest.py
     possibility of False being returned occurs when regrtest.py is executing."""
     # see if the caller's module is __main__ - if so, treat as if
     # the resource was set
-    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
+    if sys._getframe(1).f_globals.get("__name__") == "__main__":
         return
     if not is_resource_enabled(resource):
         if msg is None:
             # 2 latin characters.
             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
         TESTFN_ENCODING = sys.getfilesystemencoding()
-        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
+        # TESTFN_UNENCODABLE is a filename that should *not* be
         # able to be encoded by *either* the default or filesystem encoding.
         # This test really only makes sense on Windows NT platforms
         # which have special Unicode support in posixmodule.
         if (not hasattr(sys, "getwindowsversion") or
                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
-            TESTFN_UNICODE_UNENCODEABLE = None
+            TESTFN_UNENCODABLE = None
         else:
             # Japanese characters (I think - from bug 846133)
-            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
+            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
             try:
                 # XXX - Note - should be using TESTFN_ENCODING here - but for
                 # Windows, "mbcs" currently always operates as if in
                 # errors=ignore' mode - hence we get '?' characters rather than
                 # the exception.  'Latin1' operates as we expect - ie, fails.
                 # See [ 850997 ] mbcs encoding ignores errors
-                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
+                TESTFN_UNENCODABLE.encode("Latin1")
             except UnicodeEncodeError:
                 pass
             else:
                 print \
                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
                 'Unicode filename tests may not be effective' \
-                % TESTFN_UNICODE_UNENCODEABLE
+                % TESTFN_UNENCODABLE
 
-# Make sure we can write to TESTFN, try in /tmp if we can't
-fp = None
-try:
-    fp = open(TESTFN, 'w+')
-except IOError:
-    TMP_TESTFN = os.path.join('/tmp', TESTFN)
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+@contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+    """
+    Context manager that creates a temporary directory and set it as CWD.
+
+    The new CWD is created in the current directory and it's named *name*.
+    If *quiet* is False (default) and it's not possible to create or change
+    the CWD, an error is raised.  If it's True, only a warning is raised
+    and the original CWD is used.
+    """
+    if isinstance(name, unicode):
+        try:
+            name = name.encode(sys.getfilesystemencoding() or 'ascii')
+        except UnicodeEncodeError:
+            if not quiet:
+                raise unittest.SkipTest('unable to encode the cwd name with '
+                                        'the filesystem encoding.')
+    saved_dir = os.getcwd()
+    is_temporary = False
     try:
-        fp = open(TMP_TESTFN, 'w+')
-        TESTFN = TMP_TESTFN
-        del TMP_TESTFN
-    except IOError:
-        print ('WARNING: tests will fail, unable to write to: %s or %s' %
-                (TESTFN, TMP_TESTFN))
-if fp is not None:
-    fp.close()
-    unlink(TESTFN)
-del fp
+        os.mkdir(name)
+        os.chdir(name)
+        is_temporary = True
+    except OSError:
+        if not quiet:
+            raise
+        warnings.warn('tests may fail, unable to change the CWD to ' + name,
+                      RuntimeWarning, stacklevel=3)
+    try:
+        yield os.getcwd()
+    finally:
+        os.chdir(saved_dir)
+        if is_temporary:
+            rmtree(name)
 
-def findfile(file, here=__file__):
+
+def findfile(file, here=__file__, subdir=None):
     """Try to find a file on sys.path and the working directory.  If it is not
     found the argument passed to the function is returned (this does not
     necessarily signal failure; could still be the legitimate path)."""
     if os.path.isabs(file):
         return file
+    if subdir is not None:
+        file = os.path.join(subdir, file)
     path = sys.path
     path = [os.path.dirname(here)] + path
     for dn in path:
         if os.path.exists(fn): return fn
     return file
 
-def verify(condition, reason='test failed'):
-    """Verify that condition is true. If not, raise TestFailed.
-
-       The optional argument reason can be given to provide
-       a better error text.
-    """
-
-    if not condition:
-        raise TestFailed(reason)
-
-def vereq(a, b):
-    """Raise TestFailed if a == b is false.
-
-    This is better than verify(a == b) because, in case of failure, the
-    error message incorporates repr(a) and repr(b) so you can see the
-    inputs.
-
-    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
-    former is tested.
-    """
-
-    if not (a == b):
-        raise TestFailed("%r == %r" % (a, b))
-
 def sortdict(dict):
     "Like repr(dict), but in sorted order."
     items = dict.items()
         unlink(TESTFN)
 
 def check_syntax_error(testcase, statement):
-    try:
-        compile(statement, '<test string>', 'exec')
-    except SyntaxError:
-        pass
-    else:
-        testcase.fail('Missing SyntaxError: "%s"' % statement)
+    testcase.assertRaises(SyntaxError, compile, statement,
+                          '<test string>', 'exec')
 
-def open_urlresource(url):
-    import urllib, urlparse
+def open_urlresource(url, check=None):
+    import urlparse, urllib2
 
-    requires('urlfetch')
     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
 
-    for path in [os.path.curdir, os.path.pardir]:
-        fn = os.path.join(path, filename)
-        if os.path.exists(fn):
-            return open(fn)
+    fn = os.path.join(os.path.dirname(__file__), "data", filename)
+
+    def check_valid_file(fn):
+        f = open(fn)
+        if check is None:
+            return f
+        elif check(f):
+            f.seek(0)
+            return f
+        f.close()
+
+    if os.path.exists(fn):
+        f = check_valid_file(fn)
+        if f is not None:
+            return f
+        unlink(fn)
+
+    # Verify the requirement before downloading the file
+    requires('urlfetch')
 
     print >> get_original_stdout(), '\tfetching %s ...' % url
-    fn, _ = urllib.urlretrieve(url, filename)
-    return open(fn)
+    f = urllib2.urlopen(url, timeout=15)
+    try:
+        with open(fn, "wb") as out:
+            s = f.read()
+            while s:
+                out.write(s)
+                s = f.read()
+    finally:
+        f.close()
+
+    f = check_valid_file(fn)
+    if f is not None:
+        return f
+    raise TestFailed('invalid resource "%s"' % fn)
 
 
 class WarningsRecorder(object):
        entry to the warnings.catch_warnings() context manager.
     """
     def __init__(self, warnings_list):
-        self.warnings = warnings_list
+        self._warnings = warnings_list
+        self._last = 0
 
     def __getattr__(self, attr):
-        if self.warnings:
-            return getattr(self.warnings[-1], attr)
+        if len(self._warnings) > self._last:
+            return getattr(self._warnings[-1], attr)
         elif attr in warnings.WarningMessage._WARNING_DETAILS:
             return None
         raise AttributeError("%r has no attribute %r" % (self, attr))
 
+    @property
+    def warnings(self):
+        return self._warnings[self._last:]
+
     def reset(self):
-        del self.warnings[:]
+        self._last = len(self._warnings)
+
+
+def _filterwarnings(filters, quiet=False):
+    """Catch the warnings, then check if all the expected
+    warnings have been raised and re-raise unexpected warnings.
+    If 'quiet' is True, only re-raise the unexpected warnings.
+    """
+    # Clear the warning registry of the calling module
+    # in order to re-raise the warnings.
+    frame = sys._getframe(2)
+    registry = frame.f_globals.get('__warningregistry__')
+    if registry:
+        registry.clear()
+    with warnings.catch_warnings(record=True) as w:
+        # Set filter "always" to record all warnings.  Because
+        # test_warnings swap the module, we need to look up in
+        # the sys.modules dictionary.
+        sys.modules['warnings'].simplefilter("always")
+        yield WarningsRecorder(w)
+    # Filter the recorded warnings
+    reraise = [warning.message for warning in w]
+    missing = []
+    for msg, cat in filters:
+        seen = False
+        for exc in reraise[:]:
+            message = str(exc)
+            # Filter out the matching messages
+            if (re.match(msg, message, re.I) and
+                issubclass(exc.__class__, cat)):
+                seen = True
+                reraise.remove(exc)
+        if not seen and not quiet:
+            # This filter caught nothing
+            missing.append((msg, cat.__name__))
+    if reraise:
+        raise AssertionError("unhandled warning %r" % reraise[0])
+    if missing:
+        raise AssertionError("filter (%r, %s) did not catch any warning" %
+                             missing[0])
+
 
 @contextlib.contextmanager
-def check_warnings():
-    with warnings.catch_warnings(record=True) as w:
-        yield WarningsRecorder(w)
+def check_warnings(*filters, **kwargs):
+    """Context manager to silence warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default True without argument,
+         default False if some filters are defined)
+
+    Without argument, it defaults to:
+        check_warnings(("", Warning), quiet=True)
+    """
+    quiet = kwargs.get('quiet')
+    if not filters:
+        filters = (("", Warning),)
+        # Preserve backward compatibility
+        if quiet is None:
+            quiet = True
+    return _filterwarnings(filters, quiet)
+
+
+@contextlib.contextmanager
+def check_py3k_warnings(*filters, **kwargs):
+    """Context manager to silence py3k warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default False)
+
+    Without argument, it defaults to:
+        check_py3k_warnings(("", DeprecationWarning), quiet=False)
+    """
+    if sys.py3kwarning:
+        if not filters:
+            filters = (("", DeprecationWarning),)
+    else:
+        # It should not raise any py3k warning
+        filters = ()
+    return _filterwarnings(filters, kwargs.get('quiet'))
 
 
 class CleanImport(object):
     Use like this:
 
         with CleanImport("foo"):
-            __import__("foo") # new reference
+            importlib.import_module("foo") # new reference
     """
 
     def __init__(self, *module_names):
         sys.modules.update(self.original_modules)
 
 
-class EnvironmentVarGuard(object):
+class EnvironmentVarGuard(UserDict.DictMixin):
 
     """Class to help protect the environment variable properly.  Can be used as
     a context manager."""
 
     def __init__(self):
+        self._environ = os.environ
         self._changed = {}
 
-    def set(self, envvar, value):
+    def __getitem__(self, envvar):
+        return self._environ[envvar]
+
+    def __setitem__(self, envvar, value):
         # Remember the initial value on the first access
         if envvar not in self._changed:
-            self._changed[envvar] = os.environ.get(envvar)
-        os.environ[envvar] = value
+            self._changed[envvar] = self._environ.get(envvar)
+        self._environ[envvar] = value
+
+    def __delitem__(self, envvar):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        if envvar in self._environ:
+            del self._environ[envvar]
+
+    def keys(self):
+        return self._environ.keys()
+
+    def set(self, envvar, value):
+        self[envvar] = value
 
     def unset(self, envvar):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = os.environ.get(envvar)
-        if envvar in os.environ:
-            del os.environ[envvar]
+        del self[envvar]
 
     def __enter__(self):
         return self
     def __exit__(self, *ignore_exc):
         for (k, v) in self._changed.items():
             if v is None:
-                if k in os.environ:
-                    del os.environ[k]
+                if k in self._environ:
+                    del self._environ[k]
             else:
-                os.environ[k] = v
+                self._environ[k] = v
+        os.environ = self._environ
+
+
+class DirsOnSysPath(object):
+    """Context manager to temporarily add directories to sys.path.
+
+    This makes a copy of sys.path, appends any directories given
+    as positional arguments, then reverts sys.path to the copied
+    settings when the context ends.
+
+    Note that *all* sys.path modifications in the body of the
+    context manager, including replacement of the object,
+    will be reverted at the end of the block.
+    """
+
+    def __init__(self, *paths):
+        self.original_value = sys.path[:]
+        self.original_object = sys.path
+        sys.path.extend(paths)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.path = self.original_object
+        sys.path[:] = self.original_value
 
 
 class TransientResource(object):
                 raise ResourceDenied("an optional resource is not available")
 
 
-def transient_internet():
+@contextlib.contextmanager
+def transient_internet(resource_name, timeout=30.0, errnos=()):
     """Return a context manager that raises ResourceDenied when various issues
     with the Internet connection manifest themselves as exceptions."""
-    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
-    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
-    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
-    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
+    default_errnos = [
+        ('ECONNREFUSED', 111),
+        ('ECONNRESET', 104),
+        ('ENETUNREACH', 101),
+        ('ETIMEDOUT', 110),
+    ]
+    default_gai_errnos = [
+        ('EAI_NONAME', -2),
+        ('EAI_NODATA', -5),
+    ]
+
+    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
+    captured_errnos = errnos
+    gai_errnos = []
+    if not captured_errnos:
+        captured_errnos = [getattr(errno, name, num)
+                           for (name, num) in default_errnos]
+        gai_errnos = [getattr(socket, name, num)
+                      for (name, num) in default_gai_errnos]
+
+    def filter_error(err):
+        n = getattr(err, 'errno', None)
+        if (isinstance(err, socket.timeout) or
+            (isinstance(err, socket.gaierror) and n in gai_errnos) or
+            n in captured_errnos):
+            if not verbose:
+                sys.stderr.write(denied.args[0] + "\n")
+            raise denied
+
+    old_timeout = socket.getdefaulttimeout()
+    try:
+        if timeout is not None:
+            socket.setdefaulttimeout(timeout)
+        yield
+    except IOError as err:
+        # urllib can wrap original socket errors multiple times (!), we must
+        # unwrap to get at the original error.
+        while True:
+            a = err.args
+            if len(a) >= 1 and isinstance(a[0], IOError):
+                err = a[0]
+            # The error can also be wrapped as args[1]:
+            #    except socket.error as msg:
+            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
+            elif len(a) >= 2 and isinstance(a[1], IOError):
+                err = a[1]
+            else:
+                break
+        filter_error(err)
+        raise
+    # XXX should we catch generic exceptions and look for their
+    # __cause__ or __context__?
+    finally:
+        socket.setdefaulttimeout(old_timeout)
 
 
 @contextlib.contextmanager
 def captured_stdout():
     return captured_output("stdout")
 
+def captured_stdin():
+    return captured_output("stdin")
+
+def gc_collect():
+    """Force as many objects as possible to be collected.
+
+    In non-CPython implementations of Python, this is needed because timely
+    deallocation is not guaranteed by the garbage collector.  (Even in CPython
+    this can be the case in case of reference cycles.)  This means that __del__
+    methods may be called later than expected and weakrefs may remain alive for
+    longer than expected.  This function tries its best to force all garbage
+    objects to disappear.
+    """
+    gc.collect()
+    if is_jython:
+        time.sleep(0.1)
+    gc.collect()
+    gc.collect()
+
 
 #=======================================================================
 # Decorator for running a function in a different locale, correctly resetting
 MAX_Py_ssize_t = sys.maxsize
 
 def set_memlimit(limit):
-    import re
     global max_memuse
     global real_max_memuse
     sizes = {
                 # to make sure they work. We still want to avoid using
                 # too much memory, though, but we do that noisily.
                 maxsize = 5147
-                self.failIf(maxsize * memuse + overhead > 20 * _1M)
+                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
             else:
                 maxsize = int((max_memuse - overhead) / memuse)
                 if maxsize < minsize:
         test(result)
         return result
 
+def _id(obj):
+    return obj
+
+def requires_resource(resource):
+    if is_resource_enabled(resource):
+        return _id
+    else:
+        return unittest.skip("resource {0!r} is not enabled".format(resource))
+
+def cpython_only(test):
+    """
+    Decorator for tests only applicable on CPython.
+    """
+    return impl_detail(cpython=True)(test)
+
+def impl_detail(msg=None, **guards):
+    if check_impl_detail(**guards):
+        return _id
+    if msg is None:
+        guardnames, default = _parse_guards(guards)
+        if default:
+            msg = "implementation detail not available on {0}"
+        else:
+            msg = "implementation detail specific to {0}"
+        guardnames = sorted(guardnames.keys())
+        msg = msg.format(' or '.join(guardnames))
+    return unittest.skip(msg)
+
+def _parse_guards(guards):
+    # Returns a tuple ({platform_name: run_me}, default_value)
+    if not guards:
+        return ({'cpython': True}, False)
+    is_true = guards.values()[0]
+    assert guards.values() == [is_true] * len(guards)   # all True or all False
+    return (guards, not is_true)
+
+# Use the following check to guard CPython's implementation-specific tests --
+# or to run them only on the implementation(s) guarded by the arguments.
+def check_impl_detail(**guards):
+    """This function returns True or False depending on the host platform.
+       Examples:
+          if check_impl_detail():               # only on CPython (default)
+          if check_impl_detail(jython=True):    # only on Jython
+          if check_impl_detail(cpython=False):  # everywhere except on CPython
+    """
+    guards, default = _parse_guards(guards)
+    return guards.get(platform.python_implementation().lower(), default)
+
+
 
 def _run_suite(suite):
     """Run tests from a unittest.TestSuite-derived class."""
         elif len(result.failures) == 1 and not result.errors:
             err = result.failures[0][1]
         else:
-            err = "errors occurred; run in verbose mode for details"
+            err = "multiple errors occurred"
+            if not verbose:
+                err += "; run in verbose mode for details"
         raise TestFailed(err)
 
 
 #=======================================================================
 # Threading support to prevent reporting refleaks when running regrtest.py -R
 
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+
 def threading_setup():
-    import threading
-    return len(threading._active), len(threading._limbo)
+    if thread:
+        return thread._count(),
+    else:
+        return 1,
 
-def threading_cleanup(num_active, num_limbo):
-    import threading
-    import time
+def threading_cleanup(nb_threads):
+    if not thread:
+        return
 
     _MAX_COUNT = 10
-    count = 0
-    while len(threading._active) != num_active and count < _MAX_COUNT:
-        count += 1
+    for count in range(_MAX_COUNT):
+        n = thread._count()
+        if n == nb_threads:
+            break
         time.sleep(0.1)
+    # XXX print a warning in case of failure?
 
-    count = 0
-    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
-        count += 1
-        time.sleep(0.1)
+def reap_threads(func):
+    """Use this function when threads are being used.  This will
+    ensure that the threads are cleaned up even when the test fails.
+    If threading is unavailable this function does nothing.
+    """
+    if not thread:
+        return func
+
+    @functools.wraps(func)
+    def decorator(*args):
+        key = threading_setup()
+        try:
+            return func(*args)
+        finally:
+            threading_cleanup(*key)
+    return decorator
 
 def reap_children():
     """Use this function at the end of test_main() whenever sub-processes
                     break
             except:
                 break
+
+def py3k_bytes(b):
+    """Emulate the py3k bytes() constructor.
+
+    NOTE: This is only a best effort function.
+    """
+    try:
+        # memoryview?
+        return b.tobytes()
+    except AttributeError:
+        try:
+            # iterable of ints?
+            return b"".join(chr(x) for x in b)
+        except TypeError:
+            return bytes(b)
+
+def args_from_interpreter_flags():
+    """Return a list of command-line arguments reproducing the current
+    settings in sys.flags."""
+    flag_opt_map = {
+        'bytes_warning': 'b',
+        'dont_write_bytecode': 'B',
+        'ignore_environment': 'E',
+        'no_user_site': 's',
+        'no_site': 'S',
+        'optimize': 'O',
+        'py3k_warning': '3',
+        'verbose': 'v',
+    }
+    args = []
+    for flag, opt in flag_opt_map.items():
+        v = getattr(sys.flags, flag)
+        if v > 0:
+            args.append('-' + opt * v)
+    return args

tests/test_gzip.py

 import unittest
 from test import test_support
 import os
-import gzip
-
+import io
+import struct
+gzip = test_support.import_module('gzip')
 
 data1 = """  int length=DEFAULTALLOC, err = Z_OK;
   PyObject *RetVal;
 
 
     def test_write(self):
-        f = gzip.GzipFile(self.filename, 'wb') ; f.write(data1 * 50)
+        with gzip.GzipFile(self.filename, 'wb') as f:
+            f.write(data1 * 50)
 
-        # Try flush and fileno.
-        f.flush()
-        f.fileno()
-        if hasattr(os, 'fsync'):
-            os.fsync(f.fileno())
-        f.close()
+            # Try flush and fileno.
+            f.flush()
+            f.fileno()
+            if hasattr(os, 'fsync'):
+                os.fsync(f.fileno())
+            f.close()
 
         # Test multiple close() calls.
         f.close()
     def test_read(self):
         self.test_write()
         # Try reading.
-        f = gzip.GzipFile(self.filename, 'r') ; d = f.read() ; f.close()
+        with gzip.GzipFile(self.filename, 'r') as f:
+            d = f.read()
         self.assertEqual(d, data1*50)
 
+    def test_io_on_closed_object(self):
+        # Test that I/O operations on closed GzipFile objects raise a
+        # ValueError, just like the corresponding functions on file objects.
+
+        # Write to a file, open it for reading, then close it.
+        self.test_write()
+        f = gzip.GzipFile(self.filename, 'r')
+        f.close()
+        with self.assertRaises(ValueError):
+            f.read(1)
+        with self.assertRaises(ValueError):
+            f.seek(0)
+        with self.assertRaises(ValueError):
+            f.tell()
+        # Open the file for writing, then close it.
+        f = gzip.GzipFile(self.filename, 'w')
+        f.close()
+        with self.assertRaises(ValueError):
+            f.write('')
+        with self.assertRaises(ValueError):
+            f.flush()
+
     def test_append(self):
         self.test_write()
         # Append to the previous file
-        f = gzip.GzipFile(self.filename, 'ab') ; f.write(data2 * 15) ; f.close()
+        with gzip.GzipFile(self.filename, 'ab') as f:
+            f.write(data2 * 15)
 
-        f = gzip.GzipFile(self.filename, 'rb') ; d = f.read() ; f.close()
+        with gzip.GzipFile(self.filename, 'rb') as f:
+            d = f.read()
         self.assertEqual(d, (data1*50) + (data2*15))
 
     def test_many_append(self):
         # Bug #1074261 was triggered when reading a file that contained
         # many, many members.  Create such a file and verify that reading it
         # works.
-        f = gzip.open(self.filename, 'wb', 9)
-        f.write('a')
-        f.close()
-        for i in range(0,200):
-            f = gzip.open(self.filename, "ab", 9) # append
+        with gzip.open(self.filename, 'wb', 9) as f:
             f.write('a')
-            f.close()
+        for i in range(0, 200):
+            with gzip.open(self.filename, "ab", 9) as f: # append
+                f.write('a')
 
         # Try reading the file
-        zgfile = gzip.open(self.filename, "rb")
-        contents = ""
-        while 1:
-            ztxt = zgfile.read(8192)
-            contents += ztxt
-            if not ztxt: break
-        zgfile.close()
+        with gzip.open(self.filename, "rb") as zgfile:
+            contents = ""
+            while 1:
+                ztxt = zgfile.read(8192)
+                contents += ztxt
+                if not ztxt: break
         self.assertEquals(contents, 'a'*201)
 
+    def test_buffered_reader(self):
+        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
+        # performance.
+        self.test_write()
+
+        with gzip.GzipFile(self.filename, 'rb') as f:
+            with io.BufferedReader(f) as r:
+                lines = [line for line in r]
+
+        self.assertEqual(lines, 50 * data1.splitlines(True))
 
     def test_readline(self):
         self.test_write()
         # Try .readline() with varying line lengths
 
-        f = gzip.GzipFile(self.filename, 'rb')
-        line_length = 0
-        while 1:
-            L = f.readline(line_length)
-            if L == "" and line_length != 0: break
-            self.assert_(len(L) <= line_length)
-            line_length = (line_length + 1) % 50
-        f.close()
+        with gzip.GzipFile(self.filename, 'rb') as f:
+            line_length = 0
+            while 1:
+                L = f.readline(line_length)
+                if not L and line_length != 0: break
+                self.assertTrue(len(L) <= line_length)
+                line_length = (line_length + 1) % 50
 
     def test_readlines(self):
         self.test_write()
         # Try .readlines()
 
-        f = gzip.GzipFile(self.filename, 'rb')
-        L = f.readlines()
-        f.close()
+        with gzip.GzipFile(self.filename, 'rb') as f:
+            L = f.readlines()
 
-        f = gzip.GzipFile(self.filename, 'rb')
-        while 1:
-            L = f.readlines(150)
-            if L == []: break
-        f.close()
+        with gzip.GzipFile(self.filename, 'rb') as f:
+            while 1:
+                L = f.readlines(150)
+                if L == []: break
 
     def test_seek_read(self):
         self.test_write()
         # Try seek, read test
 
-        f = gzip.GzipFile(self.filename)
-        while 1:
-            oldpos = f.tell()
-            line1 = f.readline()
-            if not line1: break
-            newpos = f.tell()
-            f.seek(oldpos)  # negative seek
-            if len(line1)>10:
-                amount = 10
-            else:
-                amount = len(line1)
-            line2 = f.read(amount)
-            self.assertEqual(line1[:amount], line2)
-            f.seek(newpos)  # positive seek
-        f.close()
+        with gzip.GzipFile(self.filename) as f:
+            while 1:
+                oldpos = f.tell()
+                line1 = f.readline()
+                if not line1: break
+                newpos = f.tell()
+                f.seek(oldpos)  # negative seek
+                if len(line1)>10:
+                    amount = 10
+                else:
+                    amount = len(line1)
+                line2 = f.read(amount)
+                self.assertEqual(line1[:amount], line2)
+                f.seek(newpos)  # positive seek
 
     def test_seek_whence(self):
         self.test_write()
         # Try seek(whence=1), read test
 
-        f = gzip.GzipFile(self.filename)
-        f.read(10)
-        f.seek(10, whence=1)
-        y = f.read(10)
-        f.close()
+        with gzip.GzipFile(self.filename) as f:
+            f.read(10)
+            f.seek(10, whence=1)
+            y = f.read(10)
         self.assertEquals(y, data1[20:30])
 
     def test_seek_write(self):
         # Try seek, write test
-        f = gzip.GzipFile(self.filename, 'w')
-        for pos in range(0, 256, 16):
-            f.seek(pos)
-            f.write('GZ\n')
-        f.close()
+        with gzip.GzipFile(self.filename, 'w') as f:
+            for pos in range(0, 256, 16):
+                f.seek(pos)
+                f.write('GZ\n')
 
     def test_mode(self):
         self.test_write()
-        f = gzip.GzipFile(self.filename, 'r')
-        self.assertEqual(f.myfileobj.mode, 'rb')
-        f.close()
+        with gzip.GzipFile(self.filename, 'r') as f:
+            self.assertEqual(f.myfileobj.mode, 'rb')
 
     def test_1647484(self):
         for mode in ('wb', 'rb'):
-            f = gzip.GzipFile(self.filename, mode)
-            self.assert_(hasattr(f, "name"))
-            self.assertEqual(f.name, self.filename)
-            f.close()
+            with gzip.GzipFile(self.filename, mode) as f:
+                self.assertTrue(hasattr(f, "name"))
+                self.assertEqual(f.name, self.filename)
+
+    def test_mtime(self):
+        mtime = 123456789
+        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
+            fWrite.write(data1)
+        with gzip.GzipFile(self.filename) as fRead:
+            dataRead = fRead.read()
+            self.assertEqual(dataRead, data1)
+            self.assertTrue(hasattr(fRead, 'mtime'))
+            self.assertEqual(fRead.mtime, mtime)
+
+    def test_metadata(self):
+        mtime = 123456789
+
+        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
+            fWrite.write(data1)
+
+        with open(self.filename, 'rb') as fRead:
+            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
+
+            idBytes = fRead.read(2)
+            self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
+
+            cmByte = fRead.read(1)
+            self.assertEqual(cmByte, '\x08') # deflate
+
+            flagsByte = fRead.read(1)
+            self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
+
+            mtimeBytes = fRead.read(4)
+            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
+
+            xflByte = fRead.read(1)
+            self.assertEqual(xflByte, '\x02') # maximum compression
+
+            osByte = fRead.read(1)
+            self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
+
+            # Since the FNAME flag is set, the zero-terminated filename follows.
+            # RFC 1952 specifies that this is the name of the input file, if any.
+            # However, the gzip module defaults to storing the name of the output
+            # file in this field.
+            expected = self.filename.encode('Latin-1') + '\x00'
+            nameBytes = fRead.read(len(expected))
+            self.assertEqual(nameBytes, expected)
+
+            # Since no other flags were set, the header ends here.
+            # Rather than process the compressed data, let's seek to the trailer.
+            fRead.seek(os.stat(self.filename).st_size - 8)
+
+            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
+            self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
+
+            isizeBytes = fRead.read(4)
+            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
+
+    def test_with_open(self):
+        # GzipFile supports the context management protocol
+        with gzip.GzipFile(self.filename, "wb") as f:
+            f.write(b"xxx")
+        f = gzip.GzipFile(self.filename, "rb")
+        f.close()
+        try:
+            with f:
+                pass
+        except ValueError:
+            pass
+        else:
+            self.fail("__enter__ on a closed file didn't raise an exception")
+        try:
+            with gzip.GzipFile(self.filename, "wb") as f:
+                1 // 0
+        except ZeroDivisionError:
+            pass
+        else:
+            self.fail("1 // 0 didn't raise an exception")
+
+    def test_zero_padded_file(self):
+        with gzip.GzipFile(self.filename, "wb") as f:
+            f.write(data1 * 50)
+
+        # Pad the file with zeroes
+        with open(self.filename, "ab") as f:
+            f.write("\x00" * 50)
+
+        with gzip.GzipFile(self.filename, "rb") as f:
+            d = f.read()
+            self.assertEqual(d, data1 * 50, "Incorrect data in file")
 
 def test_main(verbose=None):
     test_support.run_unittest(TestGzip)

tests/test_zipfile.py

 except ImportError:
     zlib = None
 
-import zipfile, os, unittest, sys, shutil, struct
+import os
+import io
+import sys
+import time
+import shutil
+import struct
+import zipfile
+import unittest
 
 from StringIO import StringIO
 from tempfile import TemporaryFile
 from random import randint, random
+from unittest import skipUnless
 
-import test.test_support as support
-from test.test_support import TESTFN, run_unittest, findfile
+from test.test_support import TESTFN, run_unittest, findfile, unlink
 
 TESTFN2 = TESTFN + "2"
 TESTFNDIR = TESTFN + "d"
                    ('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
                    ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
 
+
 class TestsWithSourceFile(unittest.TestCase):
     def setUp(self):
         self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
-                          for i in xrange(FIXEDTEST_SIZE)]
+                         for i in xrange(FIXEDTEST_SIZE)]
         self.data = '\n'.join(self.line_gen) + '\n'
 
         # Make a source file with some lines
-        fp = open(TESTFN, "wb")
-        fp.write(self.data)
-        fp.close()
+        with open(TESTFN, "wb") as fp:
+            fp.write(self.data)
 
-    def makeTestArchive(self, f, compression):
+    def make_test_archive(self, f, compression):
         # Create the ZIP archive
-        zipfp = zipfile.ZipFile(f, "w", compression)
-        zipfp.write(TESTFN, "another"+os.extsep+"name")
-        zipfp.write(TESTFN, TESTFN)
-        zipfp.writestr("strfile", self.data)
+        with zipfile.ZipFile(f, "w", compression) as zipfp:
+            zipfp.write(TESTFN, "another.name")
+            zipfp.write(TESTFN, TESTFN)
+            zipfp.writestr("strfile", self.data)
+
+    def zip_test(self, f, compression):
+        self.make_test_archive(f, compression)
+
+        # Read the ZIP archive
+        with zipfile.ZipFile(f, "r", compression) as zipfp:
+            self.assertEqual(zipfp.read(TESTFN), self.data)
+            self.assertEqual(zipfp.read("another.name"), self.data)
+            self.assertEqual(zipfp.read("strfile"), self.data)
+
+            # Print the ZIP directory
+            fp = StringIO()
+            stdout = sys.stdout
+            try:
+                sys.stdout = fp
+                zipfp.printdir()
+            finally:
+                sys.stdout = stdout
+
+            directory = fp.getvalue()
+            lines = directory.splitlines()
+            self.assertEqual(len(lines), 4) # Number of files + header
+
+            self.assertIn('File Name', lines[0])
+            self.assertIn('Modified', lines[0])
+            self.assertIn('Size', lines[0])
+
+            fn, date, time_, size = lines[1].split()
+            self.assertEqual(fn, 'another.name')
+            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
+            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
+            self.assertEqual(size, str(len(self.data)))
+
+            # Check the namelist
+            names = zipfp.namelist()
+            self.assertEqual(len(names), 3)
+            self.assertIn(TESTFN, names)
+            self.assertIn("another.name", names)
+            self.assertIn("strfile", names)
+
+            # Check infolist
+            infos = zipfp.infolist()
+            names = [i.filename for i in infos]
+            self.assertEqual(len(names), 3)
+            self.assertIn(TESTFN, names)
+            self.assertIn("another.name", names)
+            self.assertIn("strfile", names)
+            for i in infos:
+                self.assertEqual(i.file_size, len(self.data))
+
+            # check getinfo
+            for nm in (TESTFN, "another.name", "strfile"):
+                info = zipfp.getinfo(nm)
+                self.assertEqual(info.filename, nm)
+                self.assertEqual(info.file_size, len(self.data))
+
+            # Check that testzip doesn't raise an exception
+            zipfp.testzip()
+
+    def test_stored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_test(f, zipfile.ZIP_STORED)
+
+    def zip_open_test(self, f, compression):
+        self.make_test_archive(f, compression)
+
+        # Read the ZIP archive
+        with zipfile.ZipFile(f, "r", compression) as zipfp:
+            zipdata1 = []
+            zipopen1 = zipfp.open(TESTFN)
+            while True:
+                read_data = zipopen1.read(256)
+                if not read_data:
+                    break
+                zipdata1.append(read_data)
+
+            zipdata2 = []
+            zipopen2 = zipfp.open("another.name")
+            while True:
+                read_data = zipopen2.read(256)
+                if not read_data:
+                    break
+                zipdata2.append(read_data)
+
+            self.assertEqual(''.join(zipdata1), self.data)
+            self.assertEqual(''.join(zipdata2), self.data)
+
+    def test_open_stored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_open_test(f, zipfile.ZIP_STORED)
+
+    def test_open_via_zip_info(self):
+        # Create the ZIP archive
+        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
+            zipfp.writestr("name", "foo")
+            zipfp.writestr("name", "bar")
+
+        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
+            infos = zipfp.infolist()
+            data = ""
+            for info in infos:
+                data += zipfp.open(info).read()
+            self.assertTrue(data == "foobar" or data == "barfoo")
+            data = ""
+            for info in infos:
+                data += zipfp.read(info)
+            self.assertTrue(data == "foobar" or data == "barfoo")
+
+    def zip_random_open_test(self, f, compression):
+        self.make_test_archive(f, compression)
+
+        # Read the ZIP archive
+        with zipfile.ZipFile(f, "r", compression) as zipfp:
+            zipdata1 = []
+            zipopen1 = zipfp.open(TESTFN)
+            while True:
+                read_data = zipopen1.read(randint(1, 1024))
+                if not read_data:
+                    break
+                zipdata1.append(read_data)
+
+            self.assertEqual(''.join(zipdata1), self.data)
+
+    def test_random_open_stored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_random_open_test(f, zipfile.ZIP_STORED)
+
+    def test_univeral_readaheads(self):
+        f = StringIO()
+
+        data = 'a\r\n' * 16 * 1024
+        zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED)
+        zipfp.writestr(TESTFN, data)
         zipfp.close()
 
-    def zipTest(self, f, compression):
-        self.makeTestArchive(f, compression)
-
-        # Read the ZIP archive
-        zipfp = zipfile.ZipFile(f, "r", compression)
-        self.assertEqual(zipfp.read(TESTFN), self.data)
-        self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
-        self.assertEqual(zipfp.read("strfile"), self.data)
-
-        # Print the ZIP directory
-        fp = StringIO()
-        stdout = sys.stdout
-        try:
-            sys.stdout = fp
-
-            zipfp.printdir()
-        finally:
-            sys.stdout = stdout
-
-        directory = fp.getvalue()
-        lines = directory.splitlines()
-        self.assertEquals(len(lines), 4) # Number of files + header
-
-        self.assert_('File Name' in lines[0])
-        self.assert_('Modified' in lines[0])
-        self.assert_('Size' in lines[0])
-
-        fn, date, time, size = lines[1].split()
-        self.assertEquals(fn, 'another.name')
-        # XXX: timestamp is not tested
-        self.assertEquals(size, str(len(self.data)))
-
-        # Check the namelist
-        names = zipfp.namelist()
-        self.assertEquals(len(names), 3)
-        self.assert_(TESTFN in names)
-        self.assert_("another"+os.extsep+"name" in names)
-        self.assert_("strfile" in names)
-
-        # Check infolist
-        infos = zipfp.infolist()
-        names = [ i.filename for i in infos ]
-        self.assertEquals(len(names), 3)
-        self.assert_(TESTFN in names)
-        self.assert_("another"+os.extsep+"name" in names)
-        self.assert_("strfile" in names)
-        for i in infos:
-            self.assertEquals(i.file_size, len(self.data))
-
-        # check getinfo
-        for nm in (TESTFN, "another"+os.extsep+"name", "strfile"):
-            info = zipfp.getinfo(nm)
-            self.assertEquals(info.filename, nm)
-            self.assertEquals(info.file_size, len(self.data))
-
-        # Check that testzip doesn't raise an exception
-        zipfp.testzip()
+        data2 = ''
+        zipfp = zipfile.ZipFile(f, 'r')
+        zipopen = zipfp.open(TESTFN, 'rU')
+        for line in zipopen:
+            data2 += line
         zipfp.close()
 
-    def testStored(self):
-        for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipTest(f, zipfile.ZIP_STORED)
+        self.assertEqual(data, data2.replace('\n', '\r\n'))
 
-    def zipOpenTest(self, f, compression):
-        self.makeTestArchive(f, compression)
-
-        # Read the ZIP archive
-        zipfp = zipfile.ZipFile(f, "r", compression)
-        zipdata1 = []
-        zipopen1 = zipfp.open(TESTFN)
-        while 1:
-            read_data = zipopen1.read(256)
-            if not read_data:
-                break
-            zipdata1.append(read_data)
-
-        zipdata2 = []
-        zipopen2 = zipfp.open("another"+os.extsep+"name")
-        while 1:
-            read_data = zipopen2.read(256)
-            if not read_data:
-                break
-            zipdata2.append(read_data)
-
-        self.assertEqual(''.join(zipdata1), self.data)
-        self.assertEqual(''.join(zipdata2), self.data)
-        zipfp.close()
-
-    def testOpenStored(self):
-        for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipOpenTest(f, zipfile.ZIP_STORED)
-
-    def testOpenViaZipInfo(self):
-        # Create the ZIP archive
-        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
-        zipfp.writestr("name", "foo")
-        zipfp.writestr("name", "bar")
-        zipfp.close()
-
-        zipfp = zipfile.ZipFile(TESTFN2, "r")
-        infos = zipfp.infolist()
-        data = ""
-        for info in infos:
-            data += zipfp.open(info).read()
-        self.assert_(data == "foobar" or data == "barfoo")
-        data = ""
-        for info in infos:
-            data += zipfp.read(info)
-        self.assert_(data == "foobar" or data == "barfoo")
-        zipfp.close()
-
-    def zipRandomOpenTest(self, f, compression):
-        self.makeTestArchive(f, compression)
-
-        # Read the ZIP archive
-        zipfp = zipfile.ZipFile(f, "r", compression)
-        zipdata1 = []
-        zipopen1 = zipfp.open(TESTFN)
-        while 1:
-            read_data = zipopen1.read(randint(1, 1024))
-            if not read_data:
-                break
-            zipdata1.append(read_data)
-
-        self.assertEqual(''.join(zipdata1), self.data)
-        zipfp.close()
-
-    def testRandomOpenStored(self):
-        for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
-
-    def zipReadlineTest(self, f, compression):
-        self.makeTestArchive(f, compression)
+    def zip_readline_read_test(self, f, compression):
+        self.make_test_archive(f, compression)
 
         # Read the ZIP archive
         zipfp = zipfile.ZipFile(f, "r")
         zipopen = zipfp.open(TESTFN)
-        for line in self.line_gen:
-            linedata = zipopen.readline()
-            self.assertEqual(linedata, line + '\n')
 
+        data = ''
+        while True:
+            read = zipopen.readline()
+            if not read:
+                break
+            data += read
+
+            read = zipopen.read(100)
+            if not read:
+                break
+            data += read
+
+        self.assertEqual(data, self.data)
         zipfp.close()
 
-    def zipReadlinesTest(self, f, compression):
-        self.makeTestArchive(f, compression)
+    def zip_readline_test(self, f, compression):
+        self.make_test_archive(f, compression)
 
         # Read the ZIP archive
-        zipfp = zipfile.ZipFile(f, "r")
-        ziplines = zipfp.open(TESTFN).readlines()
-        for line, zipline in zip(self.line_gen, ziplines):
-            self.assertEqual(zipline, line + '\n')
+        with zipfile.ZipFile(f, "r") as zipfp:
+            zipopen = zipfp.open(TESTFN)
+            for line in self.line_gen:
+                linedata = zipopen.readline()
+                self.assertEqual(linedata, line + '\n')
 
-        zipfp.close()
-
-    def zipIterlinesTest(self, f, compression):
-        self.makeTestArchive(f, compression)
+    def zip_readlines_test(self, f, compression):
+        self.make_test_archive(f, compression)
 
         # Read the ZIP archive
-        zipfp = zipfile.ZipFile(f, "r")
-        for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
-            self.assertEqual(zipline, line + '\n')
+        with zipfile.ZipFile(f, "r") as zipfp:
+            ziplines = zipfp.open(TESTFN).readlines()
+            for line, zipline in zip(self.line_gen, ziplines):
+                self.assertEqual(zipline, line + '\n')
 
-        zipfp.close()
+    def zip_iterlines_test(self, f, compression):
+        self.make_test_archive(f, compression)
 
-    def testReadlineStored(self):
+        # Read the ZIP archive
+        with zipfile.ZipFile(f, "r") as zipfp:
+            for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
+                self.assertEqual(zipline, line + '\n')
+
+    def test_readline_read_stored(self):
+        # Issue #7610: calls to readline() interleaved with calls to read().
         for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipReadlineTest(f, zipfile.ZIP_STORED)
+            self.zip_readline_read_test(f, zipfile.ZIP_STORED)
 
-    def testReadlinesStored(self):
+    def test_readline_stored(self):
         for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipReadlinesTest(f, zipfile.ZIP_STORED)
+            self.zip_readline_test(f, zipfile.ZIP_STORED)
 
-    def testIterlinesStored(self):
+    def test_readlines_stored(self):
         for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipIterlinesTest(f, zipfile.ZIP_STORED)
+            self.zip_readlines_test(f, zipfile.ZIP_STORED)
 
-    if zlib:
-        def testDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipTest(f, zipfile.ZIP_DEFLATED)
+    def test_iterlines_stored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_iterlines_test(f, zipfile.ZIP_STORED)
 
-        def testOpenDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_test(f, zipfile.ZIP_DEFLATED)
 
-        def testRandomOpenDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_open_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
 
-        def testReadlineDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_random_open_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
 
-        def testReadlinesDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_readline_read_deflated(self):
+        # Issue #7610: calls to readline() interleaved with calls to read().
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
 
-        def testIterlinesDeflated(self):
-            for f in (TESTFN2, TemporaryFile(), StringIO()):
-                self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_readline_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
 
-        def testLowCompression(self):
-            # Checks for cases where compressed data is larger than original
-            # Create the ZIP archive
-            zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+    @skipUnless(zlib, "requires zlib")
+    def test_readlines_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
+
+    @skipUnless(zlib, "requires zlib")
+    def test_iterlines_deflated(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
+
+    @skipUnless(zlib, "requires zlib")
+    def test_low_compression(self):
+        """Check for cases where compressed data is larger than original."""
+        # Create the ZIP archive
+        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
             zipfp.writestr("strfile", '12')
-            zipfp.close()
 
-            # Get an open object for strfile
-            zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
+        # Get an open object for strfile
+        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
             openobj = zipfp.open("strfile")
             self.assertEqual(openobj.read(1), '1')
             self.assertEqual(openobj.read(1), '2')
 
-    def testAbsoluteArcnames(self):
-        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
-        zipfp.write(TESTFN, "/absolute")
-        zipfp.close()
+    def test_absolute_arcnames(self):
+        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
+            zipfp.write(TESTFN, "/absolute")
 
-        zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED)
-        self.assertEqual(zipfp.namelist(), ["absolute"])
-        zipfp.close()
+        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
+            self.assertEqual(zipfp.namelist(), ["absolute"])
 
-    def testAppendToZipFile(self):
-        # Test appending to an existing zipfile
-        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
-        zipfp.write(TESTFN, TESTFN)
-        zipfp.close()
-        zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED)
-        zipfp.writestr("strfile", self.data)
-        self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
-        zipfp.close()
+    def test_append_to_zip_file(self):
+        """Test appending to an existing zipfile."""
+        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
+            zipfp.write(TESTFN, TESTFN)
 
-    def testAppendToNonZipFile(self):
-        # Test appending to an existing file that is not a zipfile
+        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
+            zipfp.writestr("strfile", self.data)
+            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
+
+    def test_append_to_non_zip_file(self):
+        """Test appending to an existing file that is not a zipfile."""
         # NOTE: this test fails if len(d) < 22 because of the first
         # line "fpin.seek(-22, 2)" in _EndRecData
-        d = 'I am not a ZipFile!'*10
-        f = file(TESTFN2, 'wb')
-        f.write(d)
-        f.close()
-        zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED)
-        zipfp.write(TESTFN, TESTFN)
-        zipfp.close()
+        data = 'I am not a ZipFile!'*10
+        with open(TESTFN2, 'wb') as f:
+            f.write(data)
 
-        f = file(TESTFN2, 'rb')
-        f.seek(len(d))
-        zipfp = zipfile.ZipFile(f, "r")
-        self.assertEqual(zipfp.namelist(), [TESTFN])
-        zipfp.close()
-        f.close()
+        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
+            zipfp.write(TESTFN, TESTFN)
 
-    def test_WriteDefaultName(self):
-        # Check that calling ZipFile.write without arcname specified produces the expected result
-        zipfp = zipfile.ZipFile(TESTFN2, "w")
-        zipfp.write(TESTFN)
-        self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read())
-        zipfp.close()
+        with open(TESTFN2, 'rb') as f:
+            f.seek(len(data))
+            with zipfile.ZipFile(f, "r") as zipfp:
+                self.assertEqual(zipfp.namelist(), [TESTFN])
 
-    def test_PerFileCompression(self):
-        # Check that files within a Zip archive can have different compression options
-        zipfp = zipfile.ZipFile(TESTFN2, "w")
-        zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
-        zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
-        sinfo = zipfp.getinfo('storeme')
-        dinfo = zipfp.getinfo('deflateme')
-        self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
-        self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
-        zipfp.close()
+    def test_write_default_name(self):
+        """Check that calling ZipFile.write without arcname specified
+        produces the expected result."""
+        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
+            zipfp.write(TESTFN)
+            self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read())
 
-    def test_WriteToReadonly(self):
-        # Check that trying to call write() on a readonly ZipFile object
-        # raises a RuntimeError
-        zipf = zipfile.ZipFile(TESTFN2, mode="w")
-        zipf.writestr("somefile.txt", "bogus")
-        zipf.close()
-        zipf = zipfile.ZipFile(TESTFN2, mode="r")
-        self.assertRaises(RuntimeError, zipf.write, TESTFN)
-        zipf.close()
+    @skipUnless(zlib, "requires zlib")
+    def test_per_file_compression(self):
+        """Check that files within a Zip archive can have different
+        compression options."""
+        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
+            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
+            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
+            sinfo = zipfp.getinfo('storeme'