Commits

Jeff Hardy committed 2208161

Add the test files for zlib, gzip, & zipfile.

Comments (0)

Files changed (8)

IronPython.Zlib/IronPython.Zlib.csproj

   </Target>
   -->
   <PropertyGroup>
-    <PostBuildEvent>copy /Y "$(TargetPath)" "C:\Users\Jeff\Bin\IronPython-2.6B2\DLLs"</PostBuildEvent>
+    <PostBuildEvent>copy /Y "$(TargetPath)" "$(ProjectDir).."</PostBuildEvent>
   </PropertyGroup>
 </Project>
+"""Functions that read and write gzipped files.
+
+The user of the file doesn't have to worry about the compression,
+but random access is not allowed."""
+
+# based on Andrew Kuchling's minigzip.py distributed with the zlib module
+
+import struct, sys, time
+import zlib
+import __builtin__
+
+__all__ = ["GzipFile","open"]
+
+FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16
+
+READ, WRITE = 1, 2
+
+def write32u(output, value):
+    # The L format writes the bit pattern correctly whether signed
+    # or unsigned.
+    output.write(struct.pack("<L", value))
+
+def read32(input):
+    return struct.unpack("<I", input.read(4))[0]
+
+def open(filename, mode="rb", compresslevel=9):
+    """Shorthand for GzipFile(filename, mode, compresslevel).
+
+    The filename argument is required; mode defaults to 'rb'
+    and compresslevel defaults to 9.
+
+    """
+    return GzipFile(filename, mode, compresslevel)
+
+class GzipFile:
+    """The GzipFile class simulates most of the methods of a file object with
+    the exception of the readinto() and truncate() methods.
+
+    """
+
+    myfileobj = None
+    max_read_chunk = 10 * 1024 * 1024   # 10Mb
+
+    def __init__(self, filename=None, mode=None,
+                 compresslevel=9, fileobj=None):
+        """Constructor for the GzipFile class.
+
+        At least one of fileobj and filename must be given a
+        non-trivial value.
+
+        The new class instance is based on fileobj, which can be a regular
+        file, a StringIO object, or any other object which simulates a file.
+        It defaults to None, in which case filename is opened to provide
+        a file object.
+
+        When fileobj is not None, the filename argument is only used to be
+        included in the gzip file header, which may includes the original
+        filename of the uncompressed file.  It defaults to the filename of
+        fileobj, if discernible; otherwise, it defaults to the empty string,
+        and in this case the original filename is not included in the header.
+
+        The mode argument can be any of 'r', 'rb', 'a', 'ab', 'w', or 'wb',
+        depending on whether the file will be read or written.  The default
+        is the mode of fileobj if discernible; otherwise, the default is 'rb'.
+        Be aware that only the 'rb', 'ab', and 'wb' values should be used
+        for cross-platform portability.
+
+        The compresslevel argument is an integer from 1 to 9 controlling the
+        level of compression; 1 is fastest and produces the least compression,
+        and 9 is slowest and produces the most compression.  The default is 9.
+
+        """
+
+        # guarantee the file is opened in binary mode on platforms
+        # that care about that sort of thing
+        if mode and 'b' not in mode:
+            mode += 'b'
+        if fileobj is None:
+            fileobj = self.myfileobj = __builtin__.open(filename, mode or 'rb')
+        if filename is None:
+            if hasattr(fileobj, 'name'): filename = fileobj.name
+            else: filename = ''
+        if mode is None:
+            if hasattr(fileobj, 'mode'): mode = fileobj.mode
+            else: mode = 'rb'
+
+        if mode[0:1] == 'r':
+            self.mode = READ
+            # Set flag indicating start of a new member
+            self._new_member = True
+            self.extrabuf = ""
+            self.extrasize = 0
+            self.name = filename
+            # Starts small, scales exponentially
+            self.min_readsize = 100
+
+        elif mode[0:1] == 'w' or mode[0:1] == 'a':
+            self.mode = WRITE
+            self._init_write(filename)
+            self.compress = zlib.compressobj(compresslevel,
+                                             zlib.DEFLATED,
+                                             -zlib.MAX_WBITS,
+                                             zlib.DEF_MEM_LEVEL,
+                                             0)
+        else:
+            raise IOError, "Mode " + mode + " not supported"
+
+        self.fileobj = fileobj
+        self.offset = 0
+
+        if self.mode == WRITE:
+            self._write_gzip_header()
+
+    @property
+    def filename(self):
+        import warnings
+        warnings.warn("use the name attribute", DeprecationWarning)
+        if self.mode == WRITE and self.name[-3:] != ".gz":
+            return self.name + ".gz"
+        return self.name
+
+    def __repr__(self):
+        s = repr(self.fileobj)
+        return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'
+
+    def _init_write(self, filename):
+        self.name = filename
+        self.crc = zlib.crc32("") & 0xffffffffL
+        self.size = 0
+        self.writebuf = []
+        self.bufsize = 0
+
+    def _write_gzip_header(self):
+        self.fileobj.write('\037\213')             # magic header
+        self.fileobj.write('\010')                 # compression method
+        fname = self.name
+        if fname.endswith(".gz"):
+            fname = fname[:-3]
+        flags = 0
+        if fname:
+            flags = FNAME
+        self.fileobj.write(chr(flags))
+        write32u(self.fileobj, long(time.time()))
+        self.fileobj.write('\002')
+        self.fileobj.write('\377')
+        if fname:
+            self.fileobj.write(fname + '\000')
+
+    def _init_read(self):
+        self.crc = zlib.crc32("") & 0xffffffffL
+        self.size = 0
+
+    def _read_gzip_header(self):
+        magic = self.fileobj.read(2)
+        if magic != '\037\213':
+            raise IOError, 'Not a gzipped file'
+        method = ord( self.fileobj.read(1) )
+        if method != 8:
+            raise IOError, 'Unknown compression method'
+        flag = ord( self.fileobj.read(1) )
+        # modtime = self.fileobj.read(4)
+        # extraflag = self.fileobj.read(1)
+        # os = self.fileobj.read(1)
+        self.fileobj.read(6)
+
+        if flag & FEXTRA:
+            # Read & discard the extra field, if present
+            xlen = ord(self.fileobj.read(1))
+            xlen = xlen + 256*ord(self.fileobj.read(1))
+            self.fileobj.read(xlen)
+        if flag & FNAME:
+            # Read and discard a null-terminated string containing the filename
+            while True:
+                s = self.fileobj.read(1)
+                if not s or s=='\000':
+                    break
+        if flag & FCOMMENT:
+            # Read and discard a null-terminated string containing a comment
+            while True:
+                s = self.fileobj.read(1)
+                if not s or s=='\000':
+                    break
+        if flag & FHCRC:
+            self.fileobj.read(2)     # Read & discard the 16-bit header CRC
+
+
+    def write(self,data):
+        if self.mode != WRITE:
+            import errno
+            raise IOError(errno.EBADF, "write() on read-only GzipFile object")
+
+        if self.fileobj is None:
+            raise ValueError, "write() on closed GzipFile object"
+        if len(data) > 0:
+            self.size = self.size + len(data)
+            self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
+            self.fileobj.write( self.compress.compress(data) )
+            self.offset += len(data)
+
+    def read(self, size=-1):
+        if self.mode != READ:
+            import errno
+            raise IOError(errno.EBADF, "read() on write-only GzipFile object")
+
+        if self.extrasize <= 0 and self.fileobj is None:
+            return ''
+
+        readsize = 1024
+        if size < 0:        # get the whole thing
+            try:
+                while True:
+                    self._read(readsize)
+                    readsize = min(self.max_read_chunk, readsize * 2)
+            except EOFError:
+                size = self.extrasize
+        else:               # just get some more of it
+            try:
+                while size > self.extrasize:
+                    self._read(readsize)
+                    readsize = min(self.max_read_chunk, readsize * 2)
+            except EOFError:
+                if size > self.extrasize:
+                    size = self.extrasize
+
+        chunk = self.extrabuf[:size]
+        self.extrabuf = self.extrabuf[size:]
+        self.extrasize = self.extrasize - size
+
+        self.offset += size
+        return chunk
+
+    def _unread(self, buf):
+        self.extrabuf = buf + self.extrabuf
+        self.extrasize = len(buf) + self.extrasize
+        self.offset -= len(buf)
+
+    def _read(self, size=1024):
+        if self.fileobj is None:
+            raise EOFError, "Reached EOF"
+
+        if self._new_member:
+            # If the _new_member flag is set, we have to
+            # jump to the next member, if there is one.
+            #
+            # First, check if we're at the end of the file;
+            # if so, it's time to stop; no more members to read.
+            pos = self.fileobj.tell()   # Save current position
+            self.fileobj.seek(0, 2)     # Seek to end of file
+            if pos == self.fileobj.tell():
+                raise EOFError, "Reached EOF"
+            else:
+                self.fileobj.seek( pos ) # Return to original position
+
+            self._init_read()
+            self._read_gzip_header()
+            self.decompress = zlib.decompressobj(-zlib.MAX_WBITS)
+            self._new_member = False
+
+        # Read a chunk of data from the file
+        buf = self.fileobj.read(size)
+
+        # If the EOF has been reached, flush the decompression object
+        # and mark this object as finished.
+
+        if buf == "":
+            uncompress = self.decompress.flush()
+            self._read_eof()
+            self._add_read_data( uncompress )
+            raise EOFError, 'Reached EOF'
+
+        uncompress = self.decompress.decompress(buf)
+        self._add_read_data( uncompress )
+
+        if self.decompress.unused_data != "":
+            # Ending case: we've come to the end of a member in the file,
+            # so seek back to the start of the unused data, finish up
+            # this member, and read a new gzip header.
+            # (The number of bytes to seek back is the length of the unused
+            # data, minus 8 because _read_eof() will rewind a further 8 bytes)
+            self.fileobj.seek( -len(self.decompress.unused_data)+8, 1)
+
+            # Check the CRC and file size, and set the flag so we read
+            # a new member on the next call
+            self._read_eof()
+            self._new_member = True
+
+    def _add_read_data(self, data):
+        self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
+        self.extrabuf = self.extrabuf + data
+        self.extrasize = self.extrasize + len(data)
+        self.size = self.size + len(data)
+
+    def _read_eof(self):
+        # We've read to the end of the file, so we have to rewind in order
+        # to reread the 8 bytes containing the CRC and the file size.
+        # We check the that the computed CRC and size of the
+        # uncompressed data matches the stored values.  Note that the size
+        # stored is the true file size mod 2**32.
+        self.fileobj.seek(-8, 1)
+        crc32 = read32(self.fileobj)
+        isize = read32(self.fileobj)  # may exceed 2GB
+        if crc32 != self.crc:
+            raise IOError("CRC check failed %s != %s" % (hex(crc32),
+                                                         hex(self.crc)))
+        elif isize != (self.size & 0xffffffffL):
+            raise IOError, "Incorrect length of data produced"
+
+    def close(self):
+        if self.fileobj is None:
+            return
+        if self.mode == WRITE:
+            self.fileobj.write(self.compress.flush())
+            write32u(self.fileobj, self.crc)
+            # self.size may exceed 2GB, or even 4GB
+            write32u(self.fileobj, self.size & 0xffffffffL)
+            self.fileobj = None
+        elif self.mode == READ:
+            self.fileobj = None
+        if self.myfileobj:
+            self.myfileobj.close()
+            self.myfileobj = None
+
+    def __del__(self):
+        try:
+            if (self.myfileobj is None and
+                self.fileobj is None):
+                return
+        except AttributeError:
+            return
+        self.close()
+
+    def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
+        if self.mode == WRITE:
+            # Ensure the compressor's buffer is flushed
+            self.fileobj.write(self.compress.flush(zlib_mode))
+        self.fileobj.flush()
+
+    def fileno(self):
+        """Invoke the underlying file object's fileno() method.
+
+        This will raise AttributeError if the underlying file object
+        doesn't support fileno().
+        """
+        return self.fileobj.fileno()
+
+    def isatty(self):
+        return False
+
+    def tell(self):
+        return self.offset
+
+    def rewind(self):
+        '''Return the uncompressed stream file position indicator to the
+        beginning of the file'''
+        if self.mode != READ:
+            raise IOError("Can't rewind in write mode")
+        self.fileobj.seek(0)
+        self._new_member = True
+        self.extrabuf = ""
+        self.extrasize = 0
+        self.offset = 0
+
+    def seek(self, offset, whence=0):
+        if whence:
+            if whence == 1:
+                offset = self.offset + offset
+            else:
+                raise ValueError('Seek from end not supported')
+        if self.mode == WRITE:
+            if offset < self.offset:
+                raise IOError('Negative seek in write mode')
+            count = offset - self.offset
+            for i in range(count // 1024):
+                self.write(1024 * '\0')
+            self.write((count % 1024) * '\0')
+        elif self.mode == READ:
+            if offset < self.offset:
+                # for negative seek, rewind and do positive seek
+                self.rewind()
+            count = offset - self.offset
+            for i in range(count // 1024):
+                self.read(1024)
+            self.read(count % 1024)
+
+    def readline(self, size=-1):
+        if size < 0:
+            size = sys.maxint
+            readsize = self.min_readsize
+        else:
+            readsize = size
+        bufs = []
+        while size != 0:
+            c = self.read(readsize)
+            i = c.find('\n')
+
+            # We set i=size to break out of the loop under two
+            # conditions: 1) there's no newline, and the chunk is
+            # larger than size, or 2) there is a newline, but the
+            # resulting line would be longer than 'size'.
+            if (size <= i) or (i == -1 and len(c) > size):
+                i = size - 1
+
+            if i >= 0 or c == '':
+                bufs.append(c[:i + 1])    # Add portion of last chunk
+                self._unread(c[i + 1:])   # Push back rest of chunk
+                break
+
+            # Append chunk to list, decrease 'size',
+            bufs.append(c)
+            size = size - len(c)
+            readsize = min(size, readsize * 2)
+        if readsize > self.min_readsize:
+            self.min_readsize = min(readsize, self.min_readsize * 2, 512)
+        return ''.join(bufs) # Return resulting line
+
+    def readlines(self, sizehint=0):
+        # Negative numbers result in reading all the lines
+        if sizehint <= 0:
+            sizehint = sys.maxint
+        L = []
+        while sizehint > 0:
+            line = self.readline()
+            if line == "":
+                break
+            L.append(line)
+            sizehint = sizehint - len(line)
+
+        return L
+
+    def writelines(self, L):
+        for line in L:
+            self.write(line)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        line = self.readline()
+        if line:
+            return line
+        else:
+            raise StopIteration
+
+
+def _test():
+    # Act like gzip; with -d, act like gunzip.
+    # The input file is not deleted, however, nor are any other gzip
+    # options or features supported.
+    args = sys.argv[1:]
+    decompress = args and args[0] == "-d"
+    if decompress:
+        args = args[1:]
+    if not args:
+        args = ["-"]
+    for arg in args:
+        if decompress:
+            if arg == "-":
+                f = GzipFile(filename="", mode="rb", fileobj=sys.stdin)
+                g = sys.stdout
+            else:
+                if arg[-3:] != ".gz":
+                    print "filename doesn't end in .gz:", repr(arg)
+                    continue
+                f = open(arg, "rb")
+                g = __builtin__.open(arg[:-3], "wb")
+        else:
+            if arg == "-":
+                f = sys.stdin
+                g = GzipFile(filename="", mode="wb", fileobj=sys.stdout)
+            else:
+                f = __builtin__.open(arg, "rb")
+                g = open(arg + ".gz", "wb")
+        while True:
+            chunk = f.read(1024)
+            if not chunk:
+                break
+            g.write(chunk)
+        if g is not sys.stdout:
+            g.close()
+        if f is not sys.stdin:
+            f.close()
+
+if __name__ == '__main__':
+    _test()
+# Dummy file to make this directory a package.

test/test_support.py

+"""Supporting definitions for the Python regression tests."""
+
+if __name__ != 'test.test_support':
+    raise ImportError('test_support must be imported from the test package')
+
+import contextlib
+import errno
+import socket
+import sys
+import os
+import shutil
+import warnings
+import unittest
+
+__all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module",
+           "verbose", "use_resources", "max_memuse", "record_original_stdout",
+           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
+           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
+           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
+           "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
+           "open_urlresource", "check_warnings", "CleanImport",
+           "EnvironmentVarGuard", "captured_output",
+           "captured_stdout", "TransientResource", "transient_internet",
+           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+           "threading_cleanup", "reap_children"]
+
+class Error(Exception):
+    """Base class for regression test exceptions."""
+
+class TestFailed(Error):
+    """Test failed."""
+
+class TestSkipped(Error):
+    """Test skipped.
+
+    This can be raised to indicate that a test was deliberatly
+    skipped, but not because a feature wasn't available.  For
+    example, if some resource can't be used, such as the network
+    appears to be unavailable, this should be raised instead of
+    TestFailed.
+    """
+
+class ResourceDenied(TestSkipped):
+    """Test skipped because it requested a disallowed resource.
+
+    This is raised when a test calls requires() for a resource that
+    has not be enabled.  It is used to distinguish between expected
+    and unexpected skips.
+    """
+
+def import_module(name, deprecated=False):
+    """Import the module to be tested, raising TestSkipped if it is not
+    available."""
+    with warnings.catch_warnings():
+        if deprecated:
+            warnings.filterwarnings("ignore", ".+ (module|package)",
+                                    DeprecationWarning)
+        try:
+            module = __import__(name, level=0)
+        except ImportError:
+            raise TestSkipped("No module named " + name)
+        else:
+            return module
+
+verbose = 1              # Flag set to 0 by regrtest.py
+use_resources = None     # Flag set to [] by regrtest.py
+max_memuse = 0           # Disable bigmem tests (they will still be run with
+                         # small sizes, to make sure they work.)
+real_max_memuse = 0
+
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+    global _original_stdout
+    _original_stdout = stdout
+
+def get_original_stdout():
+    return _original_stdout or sys.stdout
+
+def unload(name):
+    try:
+        del sys.modules[name]
+    except KeyError:
+        pass
+
+def unlink(filename):
+    try:
+        os.unlink(filename)
+    except OSError:
+        pass
+
+def rmtree(path):
+    try:
+        shutil.rmtree(path)
+    except OSError, e:
+        # Unix returns ENOENT, Windows returns ESRCH.
+        if e.errno not in (errno.ENOENT, errno.ESRCH):
+            raise
+
+def forget(modname):
+    '''"Forget" a module was ever imported by removing it from sys.modules and
+    deleting any .pyc and .pyo files.'''
+    unload(modname)
+    for dirname in sys.path:
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
+        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
+        # the chance exists that there is no .pyc (and thus the 'try' statement
+        # is exited) but there is a .pyo file.
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
+
+def is_resource_enabled(resource):
+    """Test whether a resource is enabled.  Known resources are set by
+    regrtest.py."""
+    return use_resources is not None and resource in use_resources
+
+def requires(resource, msg=None):
+    """Raise ResourceDenied if the specified resource is not available.
+
+    If the caller's module is __main__ then automatically return True.  The
+    possibility of False being returned occurs when regrtest.py is executing."""
+    # see if the caller's module is __main__ - if so, treat as if
+    # the resource was set
+    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
+        return
+    if not is_resource_enabled(resource):
+        if msg is None:
+            msg = "Use of the `%s' resource not enabled" % resource
+        raise ResourceDenied(msg)
+
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+    """Returns an unused port that should be suitable for binding.  This is
+    achieved by creating a temporary socket with the same family and type as
+    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+    the specified host address (defaults to 0.0.0.0) with the port set to 0,
+    eliciting an unused ephemeral port from the OS.  The temporary socket is
+    then closed and deleted, and the ephemeral port is returned.
+
+    Either this method or bind_port() should be used for any tests where a
+    server socket needs to be bound to a particular port for the duration of
+    the test.  Which one to use depends on whether the calling code is creating
+    a python socket, or if an unused port needs to be provided in a constructor
+    or passed to an external program (i.e. the -accept argument to openssl's
+    s_server mode).  Always prefer bind_port() over find_unused_port() where
+    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
+    socket is bound to a hard coded port, the ability to run multiple instances
+    of the test simultaneously on the same host is compromised, which makes the
+    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+    may simply manifest as a failed test, which can be recovered from without
+    intervention in most cases, but on Windows, the entire python process can
+    completely and utterly wedge, requiring someone to log in to the buildbot
+    and manually kill the affected process.
+
+    (This is easy to reproduce on Windows, unfortunately, and can be traced to
+    the SO_REUSEADDR socket option having different semantics on Windows versus
+    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+    listen and then accept connections on identical host/ports.  An EADDRINUSE
+    socket.error will be raised at some point (depending on the platform and
+    the order bind and listen were called on each socket).
+
+    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+    will ever be raised when attempting to bind two identical host/ports. When
+    accept() is called on each socket, the second caller's process will steal
+    the port from the first caller, leaving them both in an awkwardly wedged
+    state where they'll no longer respond to any signals or graceful kills, and
+    must be forcibly killed via OpenProcess()/TerminateProcess().
+
+    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+    instead of SO_REUSEADDR, which effectively affords the same semantics as
+    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
+    Source world compared to Windows ones, this is a common mistake.  A quick
+    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+    openssl.exe is called with the 's_server' option, for example. See
+    http://bugs.python.org/issue2550 for more info.  The following site also
+    has a very thorough description about the implications of both REUSEADDR
+    and EXCLUSIVEADDRUSE on Windows:
+    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+    XXX: although this approach is a vast improvement on previous attempts to
+    elicit unused ports, it rests heavily on the assumption that the ephemeral
+    port returned to us by the OS won't immediately be dished back out to some
+    other process when we close and delete our temporary socket but before our
+    calling code has a chance to bind the returned port.  We can deal with this
+    issue if/when we come across it."""
+    tempsock = socket.socket(family, socktype)
+    port = bind_port(tempsock)
+    tempsock.close()
+    del tempsock
+    return port
+
+def bind_port(sock, host=HOST):
+    """Bind the socket to a free port and return the port number.  Relies on
+    ephemeral ports in order to ensure we are using an unbound port.  This is
+    important as many tests may be running simultaneously, especially in a
+    buildbot environment.  This method raises an exception if the sock.family
+    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
+    for TCP/IP sockets.  The only case for setting these options is testing
+    multicasting via multiple UDP sockets.
+
+    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+    on Windows), it will be set on the socket.  This will prevent anyone else
+    from bind()'ing to our host/port for the duration of the test.
+    """
+    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+        if hasattr(socket, 'SO_REUSEADDR'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+                raise TestFailed("tests should never set the SO_REUSEADDR "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_REUSEPORT'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+                raise TestFailed("tests should never set the SO_REUSEPORT "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+    sock.bind((host, 0))
+    port = sock.getsockname()[1]
+    return port
+
+FUZZ = 1e-6
+
+def fcmp(x, y): # fuzzy comparison function
+    if isinstance(x, float) or isinstance(y, float):
+        try:
+            fuzz = (abs(x) + abs(y)) * FUZZ
+            if abs(x-y) <= fuzz:
+                return 0
+        except:
+            pass
+    elif type(x) == type(y) and isinstance(x, (tuple, list)):
+        for i in range(min(len(x), len(y))):
+            outcome = fcmp(x[i], y[i])
+            if outcome != 0:
+                return outcome
+        return (len(x) > len(y)) - (len(x) < len(y))
+    return (x > y) - (x < y)
+
+try:
+    unicode
+    have_unicode = True
+except NameError:
+    have_unicode = False
+
+is_jython = sys.platform.startswith('java')
+
+# Filename used for testing
+if os.name == 'java':
+    # Jython disallows @ in module names
+    TESTFN = '$test'
+elif os.name == 'riscos':
+    TESTFN = 'testfile'
+else:
+    TESTFN = '@test'
+    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
+    if have_unicode:
+        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
+        # TESTFN_UNICODE is a filename that can be encoded using the
+        # file system encoding, but *not* with the default (ascii) encoding
+        if isinstance('', unicode):
+            # python -U
+            # XXX perhaps unicode() should accept Unicode strings?
+            TESTFN_UNICODE = "@test-\xe0\xf2"
+        else:
+            # 2 latin characters.
+            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
+        TESTFN_ENCODING = sys.getfilesystemencoding()
+        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
+        # able to be encoded by *either* the default or filesystem encoding.
+        # This test really only makes sense on Windows NT platforms
+        # which have special Unicode support in posixmodule.
+        if (not hasattr(sys, "getwindowsversion") or
+                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
+            TESTFN_UNICODE_UNENCODEABLE = None
+        else:
+            # Japanese characters (I think - from bug 846133)
+            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
+            try:
+                # XXX - Note - should be using TESTFN_ENCODING here - but for
+                # Windows, "mbcs" currently always operates as if in
+                # errors=ignore' mode - hence we get '?' characters rather than
+                # the exception.  'Latin1' operates as we expect - ie, fails.
+                # See [ 850997 ] mbcs encoding ignores errors
+                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
+            except UnicodeEncodeError:
+                pass
+            else:
+                print \
+                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
+                'Unicode filename tests may not be effective' \
+                % TESTFN_UNICODE_UNENCODEABLE
+
+# Make sure we can write to TESTFN, try in /tmp if we can't
+fp = None
+try:
+    fp = open(TESTFN, 'w+')
+except IOError:
+    TMP_TESTFN = os.path.join('/tmp', TESTFN)
+    try:
+        fp = open(TMP_TESTFN, 'w+')
+        TESTFN = TMP_TESTFN
+        del TMP_TESTFN
+    except IOError:
+        print ('WARNING: tests will fail, unable to write to: %s or %s' %
+                (TESTFN, TMP_TESTFN))
+if fp is not None:
+    fp.close()
+    unlink(TESTFN)
+del fp
+
+def findfile(file, here=__file__):
+    """Try to find a file on sys.path and the working directory.  If it is not
+    found the argument passed to the function is returned (this does not
+    necessarily signal failure; could still be the legitimate path)."""
+    if os.path.isabs(file):
+        return file
+    path = sys.path
+    path = [os.path.dirname(here)] + path
+    for dn in path:
+        fn = os.path.join(dn, file)
+        if os.path.exists(fn): return fn
+    return file
+
+def verify(condition, reason='test failed'):
+    """Verify that condition is true. If not, raise TestFailed.
+
+       The optional argument reason can be given to provide
+       a better error text.
+    """
+
+    if not condition:
+        raise TestFailed(reason)
+
+def vereq(a, b):
+    """Raise TestFailed if a == b is false.
+
+    This is better than verify(a == b) because, in case of failure, the
+    error message incorporates repr(a) and repr(b) so you can see the
+    inputs.
+
+    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
+    former is tested.
+    """
+
+    if not (a == b):
+        raise TestFailed("%r == %r" % (a, b))
+
+def sortdict(dict):
+    "Like repr(dict), but in sorted order."
+    items = dict.items()
+    items.sort()
+    reprpairs = ["%r: %r" % pair for pair in items]
+    withcommas = ", ".join(reprpairs)
+    return "{%s}" % withcommas
+
+def make_bad_fd():
+    """
+    Create an invalid file descriptor by opening and closing a file and return
+    its fd.
+    """
+    file = open(TESTFN, "wb")
+    try:
+        return file.fileno()
+    finally:
+        file.close()
+        unlink(TESTFN)
+
+def check_syntax_error(testcase, statement):
+    try:
+        compile(statement, '<test string>', 'exec')
+    except SyntaxError:
+        pass
+    else:
+        testcase.fail('Missing SyntaxError: "%s"' % statement)
+
+def open_urlresource(url):
+    import urllib, urlparse
+
+    requires('urlfetch')
+    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+
+    for path in [os.path.curdir, os.path.pardir]:
+        fn = os.path.join(path, filename)
+        if os.path.exists(fn):
+            return open(fn)
+
+    print >> get_original_stdout(), '\tfetching %s ...' % url
+    fn, _ = urllib.urlretrieve(url, filename)
+    return open(fn)
+
+
+class WarningsRecorder(object):
+    """Convenience wrapper for the warnings list returned on
+       entry to the warnings.catch_warnings() context manager.
+    """
+    def __init__(self, warnings_list):
+        self.warnings = warnings_list
+
+    def __getattr__(self, attr):
+        if self.warnings:
+            return getattr(self.warnings[-1], attr)
+        elif attr in warnings.WarningMessage._WARNING_DETAILS:
+            return None
+        raise AttributeError("%r has no attribute %r" % (self, attr))
+
+    def reset(self):
+        del self.warnings[:]
+
+@contextlib.contextmanager
+def check_warnings():
+    with warnings.catch_warnings(record=True) as w:
+        yield WarningsRecorder(w)
+
+
+class CleanImport(object):
+    """Context manager to force import to return a new module reference.
+
+    This is useful for testing module-level behaviours, such as
+    the emission of a DeprecationWarning on import.
+
+    Use like this:
+
+        with CleanImport("foo"):
+            __import__("foo") # new reference
+    """
+
+    def __init__(self, *module_names):
+        self.original_modules = sys.modules.copy()
+        for module_name in module_names:
+            if module_name in sys.modules:
+                module = sys.modules[module_name]
+                # It is possible that module_name is just an alias for
+                # another module (e.g. stub for modules renamed in 3.x).
+                # In that case, we also need delete the real module to clear
+                # the import cache.
+                if module.__name__ != module_name:
+                    del sys.modules[module.__name__]
+                del sys.modules[module_name]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.modules.update(self.original_modules)
+
+
+class EnvironmentVarGuard(object):
+
+    """Class to help protect the environment variable properly.  Can be used as
+    a context manager."""
+
+    def __init__(self):
+        self._changed = {}
+
+    def set(self, envvar, value):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = os.environ.get(envvar)
+        os.environ[envvar] = value
+
+    def unset(self, envvar):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = os.environ.get(envvar)
+        if envvar in os.environ:
+            del os.environ[envvar]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        for (k, v) in self._changed.items():
+            if v is None:
+                if k in os.environ:
+                    del os.environ[k]
+            else:
+                os.environ[k] = v
+
+
+class TransientResource(object):
+
+    """Raise ResourceDenied if an exception is raised while the context manager
+    is in effect that matches the specified exception and attributes."""
+
+    def __init__(self, exc, **kwargs):
+        self.exc = exc
+        self.attrs = kwargs
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_=None, value=None, traceback=None):
+        """If type_ is a subclass of self.exc and value has attributes matching
+        self.attrs, raise ResourceDenied.  Otherwise let the exception
+        propagate (if any)."""
+        if type_ is not None and issubclass(self.exc, type_):
+            for attr, attr_value in self.attrs.iteritems():
+                if not hasattr(value, attr):
+                    break
+                if getattr(value, attr) != attr_value:
+                    break
+            else:
+                raise ResourceDenied("an optional resource is not available")
+
+
+def transient_internet():
+    """Return a context manager that raises ResourceDenied when various issues
+    with the Internet connection manifest themselves as exceptions."""
+    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
+    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
+    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
+    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
+
+
+@contextlib.contextmanager
+def captured_output(stream_name):
+    """Run the 'with' statement body using a StringIO object in place of a
+    specific attribute on the sys module.
+    Example use (with 'stream_name=stdout')::
+
+       with captured_stdout() as s:
+           print "hello"
+       assert s.getvalue() == "hello"
+    """
+    import StringIO
+    orig_stdout = getattr(sys, stream_name)
+    setattr(sys, stream_name, StringIO.StringIO())
+    try:
+        yield getattr(sys, stream_name)
+    finally:
+        setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+    return captured_output("stdout")
+
+
+#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+    def decorator(func):
+        def inner(*args, **kwds):
+            try:
+                import locale
+                category = getattr(locale, catstr)
+                orig_locale = locale.setlocale(category)
+            except AttributeError:
+                # if the test author gives us an invalid category string
+                raise
+            except:
+                # cannot retrieve original locale, so do nothing
+                locale = orig_locale = None
+            else:
+                for loc in locales:
+                    try:
+                        locale.setlocale(category, loc)
+                        break
+                    except:
+                        pass
+
+            # now run the function, resetting the locale on exceptions
+            try:
+                return func(*args, **kwds)
+            finally:
+                if locale and orig_locale:
+                    locale.setlocale(category, orig_locale)
+        inner.func_name = func.func_name
+        inner.__doc__ = func.__doc__
+        return inner
+    return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+_4G = 4 * _1G
+
+MAX_Py_ssize_t = sys.maxsize
+
+def set_memlimit(limit):
+    import re
+    global max_memuse
+    global real_max_memuse
+    sizes = {
+        'k': 1024,
+        'm': _1M,
+        'g': _1G,
+        't': 1024*_1G,
+    }
+    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+                 re.IGNORECASE | re.VERBOSE)
+    if m is None:
+        raise ValueError('Invalid memory limit %r' % (limit,))
+    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+    real_max_memuse = memlimit
+    if memlimit > MAX_Py_ssize_t:
+        memlimit = MAX_Py_ssize_t
+    if memlimit < _2G - 1:
+        raise ValueError('Memory limit %r too low to be useful' % (limit,))
+    max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+    """Decorator for bigmem tests.
+
+    'minsize' is the minimum useful size for the test (in arbitrary,
+    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+    independent of the testsize, and defaults to 5Mb.
+
+    The decorator tries to guess a good value for 'size' and passes it to
+    the decorated test function. If minsize * memuse is more than the
+    allowed memory use (as defined by max_memuse), the test is skipped.
+    Otherwise, minsize is adjusted upward to use up to max_memuse.
+    """
+    def decorator(f):
+        def wrapper(self):
+            if not max_memuse:
+                # If max_memuse is 0 (the default),
+                # we still want to run the tests with size set to a few kb,
+                # to make sure they work. We still want to avoid using
+                # too much memory, though, but we do that noisily.
+                maxsize = 5147
+                self.failIf(maxsize * memuse + overhead > 20 * _1M)
+            else:
+                maxsize = int((max_memuse - overhead) / memuse)
+                if maxsize < minsize:
+                    # Really ought to print 'test skipped' or something
+                    if verbose:
+                        sys.stderr.write("Skipping %s because of memory "
+                                         "constraint\n" % (f.__name__,))
+                    return
+                # Try to keep some breathing room in memory use
+                maxsize = max(maxsize - 50 * _1M, minsize)
+            return f(self, maxsize)
+        wrapper.minsize = minsize
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def precisionbigmemtest(size, memuse, overhead=5*_1M):
+    def decorator(f):
+        def wrapper(self):
+            if not real_max_memuse:
+                maxsize = 5147
+            else:
+                maxsize = size
+
+                if real_max_memuse and real_max_memuse < maxsize * memuse:
+                    if verbose:
+                        sys.stderr.write("Skipping %s because of memory "
+                                         "constraint\n" % (f.__name__,))
+                    return
+
+            return f(self, maxsize)
+        wrapper.size = size
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def bigaddrspacetest(f):
+    """Decorator for tests that fill the address space."""
+    def wrapper(self):
+        if max_memuse < MAX_Py_ssize_t:
+            if verbose:
+                sys.stderr.write("Skipping %s because of memory "
+                                 "constraint\n" % (f.__name__,))
+        else:
+            return f(self)
+    return wrapper
+
+#=======================================================================
+# unittest integration.
+
+class BasicTestRunner:
+    def run(self, test):
+        result = unittest.TestResult()
+        test(result)
+        return result
+
+
+def _run_suite(suite):
+    """Run tests from a unittest.TestSuite-derived class."""
+    if verbose:
+        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
+    else:
+        runner = BasicTestRunner()
+
+    result = runner.run(suite)
+    if not result.wasSuccessful():
+        if len(result.errors) == 1 and not result.failures:
+            err = result.errors[0][1]
+        elif len(result.failures) == 1 and not result.errors:
+            err = result.failures[0][1]
+        else:
+            err = "errors occurred; run in verbose mode for details"
+        raise TestFailed(err)
+
+
+def run_unittest(*classes):
+    """Run tests from unittest.TestCase-derived classes."""
+    valid_types = (unittest.TestSuite, unittest.TestCase)
+    suite = unittest.TestSuite()
+    for cls in classes:
+        if isinstance(cls, str):
+            if cls in sys.modules:
+                suite.addTest(unittest.findTestCases(sys.modules[cls]))
+            else:
+                raise ValueError("str arguments must be keys in sys.modules")
+        elif isinstance(cls, valid_types):
+            suite.addTest(cls)
+        else:
+            suite.addTest(unittest.makeSuite(cls))
+    _run_suite(suite)
+
+
+#=======================================================================
+# doctest driver.
+
+def run_doctest(module, verbosity=None):
+    """Run doctest on the given module.  Return (#failures, #tests).
+
+    If optional argument verbosity is not specified (or is None), pass
+    test_support's belief about verbosity on to doctest.  Else doctest's
+    usual behavior is used (it searches sys.argv for -v).
+    """
+
+    import doctest
+
+    if verbosity is None:
+        verbosity = verbose
+    else:
+        verbosity = None
+
+    # Direct doctest output (normally just errors) to real stdout; doctest
+    # output shouldn't be compared by regrtest.
+    save_stdout = sys.stdout
+    sys.stdout = get_original_stdout()
+    try:
+        f, t = doctest.testmod(module, verbose=verbosity)
+        if f:
+            raise TestFailed("%d of %d doctests failed" % (f, t))
+    finally:
+        sys.stdout = save_stdout
+    if verbose:
+        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
+    return f, t
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+def threading_setup():
+    import threading
+    return len(threading._active), len(threading._limbo)
+
+def threading_cleanup(num_active, num_limbo):
+    import threading
+    import time
+
+    _MAX_COUNT = 10
+    count = 0
+    while len(threading._active) != num_active and count < _MAX_COUNT:
+        count += 1
+        time.sleep(0.1)
+
+    count = 0
+    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
+        count += 1
+        time.sleep(0.1)
+
+def reap_children():
+    """Use this function at the end of test_main() whenever sub-processes
+    are started.  This will help ensure that no extra children (zombies)
+    stick around to hog resources and create problems when looking
+    for refleaks.
+    """
+
+    # Reap all our dead child processes so we don't leave zombies around.
+    # These hog resources and might be causing some of the buildbots to die.
+    if hasattr(os, 'waitpid'):
+        any_process = -1
+        while True:
+            try:
+                # This will raise an exception on Windows.  That's ok.
+                pid, status = os.waitpid(any_process, os.WNOHANG)
+                if pid == 0:
+                    break
+            except:
+                break
+#! /usr/bin/env python
+"""Test script for the gzip module.
+"""
+
+import clr
+clr.AddReference("IronPython.Zlib")
+
+import unittest
+from test import test_support
+import os
+import gzip
+
+
+data1 = """  int length=DEFAULTALLOC, err = Z_OK;
+  PyObject *RetVal;
+  int flushmode = Z_FINISH;
+  unsigned long start_total_out;
+
+"""
+
+data2 = """/* zlibmodule.c -- gzip-compatible data compression */
+/* See http://www.gzip.org/zlib/
+/* See http://www.winimage.com/zLibDll for Windows */
+"""
+
+
+class TestGzip(unittest.TestCase):
+    filename = test_support.TESTFN
+
+    def setUp(self):
+        test_support.unlink(self.filename)
+
+    def tearDown(self):
+        test_support.unlink(self.filename)
+
+
+    def test_write(self):
+        f = gzip.GzipFile(self.filename, 'wb') ; f.write(data1 * 50)
+
+        # Try flush and fileno.
+        f.flush()
+        f.fileno()
+        if hasattr(os, 'fsync'):
+            os.fsync(f.fileno())
+        f.close()
+
+        # Test multiple close() calls.
+        f.close()
+
+    def test_read(self):
+        self.test_write()
+        # Try reading.
+        f = gzip.GzipFile(self.filename, 'r') ; d = f.read() ; f.close()
+        self.assertEqual(d, data1*50)
+
+    def test_append(self):
+        self.test_write()
+        # Append to the previous file
+        f = gzip.GzipFile(self.filename, 'ab') ; f.write(data2 * 15) ; f.close()
+
+        f = gzip.GzipFile(self.filename, 'rb') ; d = f.read() ; f.close()
+        self.assertEqual(d, (data1*50) + (data2*15))
+
+    def test_many_append(self):
+        # Bug #1074261 was triggered when reading a file that contained
+        # many, many members.  Create such a file and verify that reading it
+        # works.
+        f = gzip.open(self.filename, 'wb', 9)
+        f.write('a')
+        f.close()
+        for i in range(0,200):
+            f = gzip.open(self.filename, "ab", 9) # append
+            f.write('a')
+            f.close()
+
+        # Try reading the file
+        zgfile = gzip.open(self.filename, "rb")
+        contents = ""
+        while 1:
+            ztxt = zgfile.read(8192)
+            contents += ztxt
+            if not ztxt: break
+        zgfile.close()
+        self.assertEquals(contents, 'a'*201)
+
+
+    def test_readline(self):
+        self.test_write()
+        # Try .readline() with varying line lengths
+
+        f = gzip.GzipFile(self.filename, 'rb')
+        line_length = 0
+        while 1:
+            L = f.readline(line_length)
+            if L == "" and line_length != 0: break
+            self.assert_(len(L) <= line_length)
+            line_length = (line_length + 1) % 50
+        f.close()
+
+    def test_readlines(self):
+        self.test_write()
+        # Try .readlines()
+
+        f = gzip.GzipFile(self.filename, 'rb')
+        L = f.readlines()
+        f.close()
+
+        f = gzip.GzipFile(self.filename, 'rb')
+        while 1:
+            L = f.readlines(150)
+            if L == []: break
+        f.close()
+
+    def test_seek_read(self):
+        self.test_write()
+        # Try seek, read test
+
+        f = gzip.GzipFile(self.filename)
+        while 1:
+            oldpos = f.tell()
+            line1 = f.readline()
+            if not line1: break
+            newpos = f.tell()
+            f.seek(oldpos)  # negative seek
+            if len(line1)>10:
+                amount = 10
+            else:
+                amount = len(line1)
+            line2 = f.read(amount)
+            self.assertEqual(line1[:amount], line2)
+            f.seek(newpos)  # positive seek
+        f.close()
+
+    def test_seek_whence(self):
+        self.test_write()
+        # Try seek(whence=1), read test
+
+        f = gzip.GzipFile(self.filename)
+        f.read(10)
+        f.seek(10, whence=1)
+        y = f.read(10)
+        f.close()
+        self.assertEquals(y, data1[20:30])
+
+    def test_seek_write(self):
+        # Try seek, write test
+        f = gzip.GzipFile(self.filename, 'w')
+        for pos in range(0, 256, 16):
+            f.seek(pos)
+            f.write('GZ\n')
+        f.close()
+
+    def test_mode(self):
+        self.test_write()
+        f = gzip.GzipFile(self.filename, 'r')
+        self.assertEqual(f.myfileobj.mode, 'rb')
+        f.close()
+
+    def test_1647484(self):
+        for mode in ('wb', 'rb'):
+            f = gzip.GzipFile(self.filename, mode)
+            self.assert_(hasattr(f, "name"))
+            self.assertEqual(f.name, self.filename)
+            f.close()
+
+def test_main(verbose=None):
+    test_support.run_unittest(TestGzip)
+
+if __name__ == "__main__":
+    test_main(verbose=True)
+# We can test part of the module without zlib.
+import clr
+clr.AddReference("IronPython.Zlib")
+
+try:
+    import zlib
+except ImportError:
+    zlib = None
+
+import zipfile, os, unittest, sys, shutil, struct
+
+from StringIO import StringIO
+from tempfile import TemporaryFile
+from random import randint, random
+
+import test.test_support as support
+from test.test_support import TESTFN, run_unittest, findfile
+
+TESTFN2 = TESTFN + "2"
+TESTFNDIR = TESTFN + "d"
+FIXEDTEST_SIZE = 1000
+
+SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
+                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
+                   ('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
+                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
+
+class TestsWithSourceFile(unittest.TestCase):
+    def setUp(self):
+        self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
+                          for i in xrange(FIXEDTEST_SIZE)]
+        self.data = '\n'.join(self.line_gen) + '\n'
+
+        # Make a source file with some lines
+        fp = open(TESTFN, "wb")
+        fp.write(self.data)
+        fp.close()
+
+    def makeTestArchive(self, f, compression):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(f, "w", compression)
+        zipfp.write(TESTFN, "another"+os.extsep+"name")
+        zipfp.write(TESTFN, TESTFN)
+        zipfp.writestr("strfile", self.data)
+        zipfp.close()
+
+    def zipTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        self.assertEqual(zipfp.read(TESTFN), self.data)
+        self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
+        self.assertEqual(zipfp.read("strfile"), self.data)
+
+        # Print the ZIP directory
+        fp = StringIO()
+        stdout = sys.stdout
+        try:
+            sys.stdout = fp
+
+            zipfp.printdir()
+        finally:
+            sys.stdout = stdout
+
+        directory = fp.getvalue()
+        lines = directory.splitlines()
+        self.assertEquals(len(lines), 4) # Number of files + header
+
+        self.assert_('File Name' in lines[0])
+        self.assert_('Modified' in lines[0])
+        self.assert_('Size' in lines[0])
+
+        fn, date, time, size = lines[1].split()
+        self.assertEquals(fn, 'another.name')
+        # XXX: timestamp is not tested
+        self.assertEquals(size, str(len(self.data)))
+
+        # Check the namelist
+        names = zipfp.namelist()
+        self.assertEquals(len(names), 3)
+        self.assert_(TESTFN in names)
+        self.assert_("another"+os.extsep+"name" in names)
+        self.assert_("strfile" in names)
+
+        # Check infolist
+        infos = zipfp.infolist()
+        names = [ i.filename for i in infos ]
+        self.assertEquals(len(names), 3)
+        self.assert_(TESTFN in names)
+        self.assert_("another"+os.extsep+"name" in names)
+        self.assert_("strfile" in names)
+        for i in infos:
+            self.assertEquals(i.file_size, len(self.data))
+
+        # check getinfo
+        for nm in (TESTFN, "another"+os.extsep+"name", "strfile"):
+            info = zipfp.getinfo(nm)
+            self.assertEquals(info.filename, nm)
+            self.assertEquals(info.file_size, len(self.data))
+
+        # Check that testzip doesn't raise an exception
+        zipfp.testzip()
+        zipfp.close()
+
+    def testStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipTest(f, zipfile.ZIP_STORED)
+
+    def zipOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(256)
+            if not read_data:
+                break
+            zipdata1.append(read_data)
+
+        zipdata2 = []
+        zipopen2 = zipfp.open("another"+os.extsep+"name")
+        while 1:
+            read_data = zipopen2.read(256)
+            if not read_data:
+                break
+            zipdata2.append(read_data)
+
+        self.assertEqual(''.join(zipdata1), self.data)
+        self.assertEqual(''.join(zipdata2), self.data)
+        zipfp.close()
+
+    def testOpenStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+    def testOpenViaZipInfo(self):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
+        zipfp.writestr("name", "foo")
+        zipfp.writestr("name", "bar")
+        zipfp.close()
+
+        zipfp = zipfile.ZipFile(TESTFN2, "r")
+        infos = zipfp.infolist()
+        data = ""
+        for info in infos:
+            data += zipfp.open(info).read()
+        self.assert_(data == "foobar" or data == "barfoo")
+        data = ""
+        for info in infos:
+            data += zipfp.read(info)
+        self.assert_(data == "foobar" or data == "barfoo")
+        zipfp.close()
+
+    def zipRandomOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(randint(1, 1024))
+            if not read_data:
+                break
+            zipdata1.append(read_data)
+
+        self.assertEqual(''.join(zipdata1), self.data)
+        zipfp.close()
+
+    def testRandomOpenStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+    def zipReadlineTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        zipopen = zipfp.open(TESTFN)
+        for line in self.line_gen:
+            linedata = zipopen.readline()
+            self.assertEqual(linedata, line + '\n')
+
+        zipfp.close()
+
+    def zipReadlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        ziplines = zipfp.open(TESTFN).readlines()
+        for line, zipline in zip(self.line_gen, ziplines):
+            self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+
+    def zipIterlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
+            self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+
+    def testReadlineStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipReadlineTest(f, zipfile.ZIP_STORED)
+
+    def testReadlinesStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipReadlinesTest(f, zipfile.ZIP_STORED)
+
+    def testIterlinesStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipIterlinesTest(f, zipfile.ZIP_STORED)
+
+    if zlib:
+        def testDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipTest(f, zipfile.ZIP_DEFLATED)
+
+        def testOpenDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
+
+        def testRandomOpenDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlineDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlinesDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
+
+        def testIterlinesDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+        def testLowCompression(self):
+            # Checks for cases where compressed data is larger than original
+            # Create the ZIP archive
+            zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+            zipfp.writestr("strfile", '12')
+            zipfp.close()
+
+            # Get an open object for strfile
+            zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
+            openobj = zipfp.open("strfile")
+            self.assertEqual(openobj.read(1), '1')
+            self.assertEqual(openobj.read(1), '2')
+
+    def testAbsoluteArcnames(self):
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
+        zipfp.write(TESTFN, "/absolute")
+        zipfp.close()
+
+        zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED)
+        self.assertEqual(zipfp.namelist(), ["absolute"])
+        zipfp.close()
+
+    def testAppendToZipFile(self):
+        # Test appending to an existing zipfile
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
+        zipfp.write(TESTFN, TESTFN)
+        zipfp.close()
+        zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED)
+        zipfp.writestr("strfile", self.data)
+        self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
+        zipfp.close()
+
+    def testAppendToNonZipFile(self):
+        # Test appending to an existing file that is not a zipfile
+        # NOTE: this test fails if len(d) < 22 because of the first
+        # line "fpin.seek(-22, 2)" in _EndRecData
+        d = 'I am not a ZipFile!'*10
+        f = file(TESTFN2, 'wb')
+        f.write(d)
+        f.close()
+        zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED)
+        zipfp.write(TESTFN, TESTFN)
+        zipfp.close()
+
+        f = file(TESTFN2, 'rb')
+        f.seek(len(d))
+        zipfp = zipfile.ZipFile(f, "r")
+        self.assertEqual(zipfp.namelist(), [TESTFN])
+        zipfp.close()
+        f.close()
+
+    def test_WriteDefaultName(self):
+        # Check that calling ZipFile.write without arcname specified produces the expected result
+        zipfp = zipfile.ZipFile(TESTFN2, "w")
+        zipfp.write(TESTFN)
+        self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read())
+        zipfp.close()
+
+    def test_PerFileCompression(self):
+        # Check that files within a Zip archive can have different compression options
+        zipfp = zipfile.ZipFile(TESTFN2, "w")
+        zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
+        zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
+        sinfo = zipfp.getinfo('storeme')
+        dinfo = zipfp.getinfo('deflateme')
+        self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
+        self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
+        zipfp.close()
+
+    def test_WriteToReadonly(self):
+        # Check that trying to call write() on a readonly ZipFile object
+        # raises a RuntimeError
+        zipf = zipfile.ZipFile(TESTFN2, mode="w")
+        zipf.writestr("somefile.txt", "bogus")
+        zipf.close()
+        zipf = zipfile.ZipFile(TESTFN2, mode="r")
+        self.assertRaises(RuntimeError, zipf.write, TESTFN)
+        zipf.close()
+
+    def testExtract(self):
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
+        for fpath, fdata in SMALL_TEST_DATA:
+            zipfp.writestr(fpath, fdata)
+        zipfp.close()
+
+        zipfp = zipfile.ZipFile(TESTFN2, "r")
+        for fpath, fdata in SMALL_TEST_DATA:
+            writtenfile = zipfp.extract(fpath)
+
+            # make sure it was written to the right place
+            if os.path.isabs(fpath):
+                correctfile = os.path.join(os.getcwd(), fpath[1:])
+            else:
+                correctfile = os.path.join(os.getcwd(), fpath)
+            correctfile = os.path.normpath(correctfile)
+
+            self.assertEqual(writtenfile, correctfile)
+
+            # make sure correct data is in correct file
+            self.assertEqual(fdata, file(writtenfile, "rb").read())
+
+            os.remove(writtenfile)
+
+        zipfp.close()
+
+        # remove the test file subdirectories
+        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
+
+    def testExtractAll(self):
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
+        for fpath, fdata in SMALL_TEST_DATA:
+            zipfp.writestr(fpath, fdata)
+        zipfp.close()
+
+        zipfp = zipfile.ZipFile(TESTFN2, "r")
+        zipfp.extractall()
+        for fpath, fdata in SMALL_TEST_DATA:
+            if os.path.isabs(fpath):
+                outfile = os.path.join(os.getcwd(), fpath[1:])
+            else:
+                outfile = os.path.join(os.getcwd(), fpath)
+
+            self.assertEqual(fdata, file(outfile, "rb").read())
+
+            os.remove(outfile)
+
+        zipfp.close()
+
+        # remove the test file subdirectories
+        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
+
+    def zip_test_writestr_permissions(self, f, compression):
+        # Make sure that writestr creates files with mode 0600,
+        # when it is passed a name rather than a ZipInfo instance.
+
+        self.makeTestArchive(f, compression)
+        zipfp = zipfile.ZipFile(f, "r")
+        zinfo = zipfp.getinfo('strfile')
+        self.assertEqual(zinfo.external_attr, 0600 << 16)
+
+    def test_writestr_permissions(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
+
+    def tearDown(self):
+        os.remove(TESTFN)
+        os.remove(TESTFN2)
+
+class TestZip64InSmallFiles(unittest.TestCase):
+    # These tests test the ZIP64 functionality without using large files,
+    # see test_zipfile64 for proper tests.
+
+    def setUp(self):
+        self._limit = zipfile.ZIP64_LIMIT
+        zipfile.ZIP64_LIMIT = 5
+
+        line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE))
+        self.data = '\n'.join(line_gen)
+
+        # Make a source file with some lines
+        fp = open(TESTFN, "wb")
+        fp.write(self.data)
+        fp.close()
+
+    def largeFileExceptionTest(self, f, compression):
+        zipfp = zipfile.ZipFile(f, "w", compression)
+        self.assertRaises(zipfile.LargeZipFile,
+                zipfp.write, TESTFN, "another"+os.extsep+"name")
+        zipfp.close()
+
+    def largeFileExceptionTest2(self, f, compression):
+        zipfp = zipfile.ZipFile(f, "w", compression)
+        self.assertRaises(zipfile.LargeZipFile,
+                zipfp.writestr, "another"+os.extsep+"name", self.data)
+        zipfp.close()
+
+    def testLargeFileException(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.largeFileExceptionTest(f, zipfile.ZIP_STORED)
+            self.largeFileExceptionTest2(f, zipfile.ZIP_STORED)
+
+    def zipTest(self, f, compression):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(f, "w", compression, allowZip64=True)
+        zipfp.write(TESTFN, "another"+os.extsep+"name")
+        zipfp.write(TESTFN, TESTFN)
+        zipfp.writestr("strfile", self.data)
+        zipfp.close()
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        self.assertEqual(zipfp.read(TESTFN), self.data)
+        self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
+        self.assertEqual(zipfp.read("strfile"), self.data)
+
+        # Print the ZIP directory
+        fp = StringIO()
+        stdout = sys.stdout
+        try:
+            sys.stdout = fp
+
+            zipfp.printdir()
+        finally:
+            sys.stdout = stdout
+
+        directory = fp.getvalue()
+        lines = directory.splitlines()
+        self.assertEquals(len(lines), 4) # Number of files + header
+
+        self.assert_('File Name' in lines[0])
+        self.assert_('Modified' in lines[0])
+        self.assert_('Size' in lines[0])
+
+        fn, date, time, size = lines[1].split()
+        self.assertEquals(fn, 'another.name')
+        # XXX: timestamp is not tested
+        self.assertEquals(size, str(len(self.data)))
+
+        # Check the namelist
+        names = zipfp.namelist()
+        self.assertEquals(len(names), 3)
+        self.assert_(TESTFN in names)
+        self.assert_("another"+os.extsep+"name" in names)
+        self.assert_("strfile" in names)
+
+        # Check infolist
+        infos = zipfp.infolist()
+        names = [ i.filename for i in infos ]
+        self.assertEquals(len(names), 3)
+        self.assert_(TESTFN in names)
+        self.assert_("another"+os.extsep+"name" in names)
+        self.assert_("strfile" in names)
+        for i in infos:
+            self.assertEquals(i.file_size, len(self.data))
+
+        # check getinfo
+        for nm in (TESTFN, "another"+os.extsep+"name", "strfile"):
+            info = zipfp.getinfo(nm)
+            self.assertEquals(info.filename, nm)
+            self.assertEquals(info.file_size, len(self.data))
+
+        # Check that testzip doesn't raise an exception
+        zipfp.testzip()
+
+
+        zipfp.close()
+
+    def testStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipTest(f, zipfile.ZIP_STORED)
+
+
+    if zlib:
+        def testDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipTest(f, zipfile.ZIP_DEFLATED)
+
+    def testAbsoluteArcnames(self):
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True)
+        zipfp.write(TESTFN, "/absolute")
+        zipfp.close()
+
+        zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED)
+        self.assertEqual(zipfp.namelist(), ["absolute"])
+        zipfp.close()
+
+    def tearDown(self):
+        zipfile.ZIP64_LIMIT = self._limit
+        os.remove(TESTFN)
+        os.remove(TESTFN2)
+
+class PyZipFileTests(unittest.TestCase):
+    def testWritePyfile(self):
+        zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
+        fn = __file__
+        if fn.endswith('.pyc') or fn.endswith('.pyo'):
+            fn = fn[:-1]
+
+        zipfp.writepy(fn)
+
+        bn = os.path.basename(fn)
+        self.assert_(bn not in zipfp.namelist())
+        self.assert_(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist())
+        zipfp.close()
+
+
+        zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
+        fn = __file__
+        if fn.endswith('.pyc') or fn.endswith('.pyo'):
+            fn = fn[:-1]
+
+        zipfp.writepy(fn, "testpackage")
+
+        bn = "%s/%s"%("testpackage", os.path.basename(fn))
+        self.assert_(bn not in zipfp.namelist())
+        self.assert_(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist())
+        zipfp.close()
+
+    def testWritePythonPackage(self):
+        import email
+        packagedir = os.path.dirname(email.__file__)
+
+        zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
+        zipfp.writepy(packagedir)
+
+        # Check for a couple of modules at different levels of the hieararchy
+        names = zipfp.namelist()
+        self.assert_('email/__init__.pyo' in names or 'email/__init__.pyc' in names)
+        self.assert_('email/mime/text.pyo' in names or 'email/mime/text.pyc' in names)
+
+    def testWritePythonDirectory(self):
+        os.mkdir(TESTFN2)
+        try:
+            fp = open(os.path.join(TESTFN2, "mod1.py"), "w")
+            fp.write("print 42\n")
+            fp.close()
+
+            fp = open(os.path.join(TESTFN2, "mod2.py"), "w")
+            fp.write("print 42 * 42\n")
+            fp.close()
+
+            fp = open(os.path.join(TESTFN2, "mod2.txt"), "w")
+            fp.write("bla bla bla\n")
+            fp.close()
+
+            zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
+            zipfp.writepy(TESTFN2)
+
+            names = zipfp.namelist()
+            self.assert_('mod1.pyc' in names or 'mod1.pyo' in names)
+            self.assert_('mod2.pyc' in names or 'mod2.pyo' in names)
+            self.assert_('mod2.txt' not in names)
+
+        finally:
+            shutil.rmtree(TESTFN2)
+
+    def testWriteNonPyfile(self):
+        zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
+        file(TESTFN, 'w').write('most definitely not a python file')
+        self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
+        os.remove(TESTFN)
+
+
+class OtherTests(unittest.TestCase):
+    def testUnicodeFilenames(self):
+        zf = zipfile.ZipFile(TESTFN, "w")
+        zf.writestr(u"foo.txt", "Test for unicode filename")
+        zf.writestr(u"\xf6.txt", "Test for unicode filename")
+        self.assertTrue(isinstance(zf.infolist()[0].filename, unicode))
+        zf.close()
+        zf = zipfile.ZipFile(TESTFN, "r")
+        self.assertEqual(zf.filelist[0].filename, "foo.txt")
+        self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
+        zf.close()
+
+    def testCreateNonExistentFileForAppend(self):
+        if os.path.exists(TESTFN):
+            os.unlink(TESTFN)
+
+        filename = 'testfile.txt'
+        content = 'hello, world. this is some content.'
+
+        try:
+            zf = zipfile.ZipFile(TESTFN, 'a')
+            zf.writestr(filename, content)
+            zf.close()
+        except IOError, (errno, errmsg):
+            self.fail('Could not append data to a non-existent zip file.')
+
+        self.assert_(os.path.exists(TESTFN))
+
+        zf = zipfile.ZipFile(TESTFN, 'r')
+        self.assertEqual(zf.read(filename), content)
+        zf.close()
+
+    def testCloseErroneousFile(self):
+        # This test checks that the ZipFile constructor closes the file object
+        # it opens if there's an error in the file.  If it doesn't, the traceback
+        # holds a reference to the ZipFile object and, indirectly, the file object.
+        # On Windows, this causes the os.unlink() call to fail because the
+        # underlying file is still open.  This is SF bug #412214.
+        #
+        fp = open(TESTFN, "w")
+        fp.write("this is not a legal zip file\n")
+        fp.close()
+        try:
+            zf = zipfile.ZipFile(TESTFN)
+        except zipfile.BadZipfile:
+            pass
+
+    def testIsZipErroneousFile(self):
+        # This test checks that the is_zipfile function correctly identifies
+        # a file that is not a zip file
+        fp = open(TESTFN, "w")
+        fp.write("this is not a legal zip file\n")
+        fp.close()
+        chk = zipfile.is_zipfile(TESTFN)
+        self.assert_(chk is False)
+
+    def testIsZipValidFile(self):
+        # This test checks that the is_zipfile function correctly identifies
+        # a file that is a zip file
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        chk = zipfile.is_zipfile(TESTFN)
+        self.assert_(chk is True)
+
+    def testNonExistentFileRaisesIOError(self):
+        # make sure we don't raise an AttributeError when a partially-constructed
+        # ZipFile instance is finalized; this tests for regression on SF tracker
+        # bug #403871.
+
+        # The bug we're testing for caused an AttributeError to be raised
+        # when a ZipFile instance was created for a file that did not
+        # exist; the .fp member was not initialized but was needed by the
+        # __del__() method.  Since the AttributeError is in the __del__(),
+        # it is ignored, but the user should be sufficiently annoyed by
+        # the message on the output that regression will be noticed
+        # quickly.
+        self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
+
+    def testClosedZipRaisesRuntimeError(self):
+        # Verify that testzip() doesn't swallow inappropriate exceptions.
+        data = StringIO()
+        zipf = zipfile.ZipFile(data, mode="w")
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+
+        # This is correct; calling .read on a closed ZipFile should throw
+        # a RuntimeError, and so should calling .testzip.  An earlier
+        # version of .testzip would swallow this exception (and any other)
+        # and report that the first file in the archive was corrupt.
+        self.assertRaises(RuntimeError, zipf.read, "foo.txt")
+        self.assertRaises(RuntimeError, zipf.open, "foo.txt")
+        self.assertRaises(RuntimeError, zipf.testzip)
+        self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
+        file(TESTFN, 'w').write('zipfile test data')
+        self.assertRaises(RuntimeError, zipf.write, TESTFN)
+
+    def test_BadConstructorMode(self):
+        # Check that bad modes passed to ZipFile constructor are caught
+        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
+
+    def test_BadOpenMode(self):
+        # Check that bad modes passed to ZipFile.open are caught
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        zipf = zipfile.ZipFile(TESTFN, mode="r")
+        # read the data to make sure the file is there
+        zipf.read("foo.txt")
+        self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
+        zipf.close()
+
+    def test_Read0(self):
+        # Check that calling read(0) on a ZipExtFile object returns an empty
+        # string and doesn't advance file pointer
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        # read the data to make sure the file is there
+        f = zipf.open("foo.txt")
+        for i in xrange(FIXEDTEST_SIZE):
+            self.assertEqual(f.read(0), '')
+
+        self.assertEqual(f.read(), "O, for a Muse of Fire!")
+        zipf.close()
+
+    def test_OpenNonexistentItem(self):
+        # Check that attempting to call open() for an item that doesn't
+        # exist in the archive raises a RuntimeError
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
+
+    def test_BadCompressionMode(self):
+        # Check that bad compression methods passed to ZipFile.open are caught
+        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
+
+    def test_NullByteInFilename(self):
+        # Check that a filename containing a null byte is properly terminated
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
+        self.assertEqual(zipf.namelist(), ['foo.txt'])
+
+    def test_StructSizes(self):
+        # check that ZIP internal structure sizes are calculated correctly
+        self.assertEqual(zipfile.sizeEndCentDir, 22)
+        self.assertEqual(zipfile.sizeCentralDir, 46)
+        self.assertEqual(zipfile.sizeEndCentDir64, 56)
+        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
+
+    def testComments(self):
+        # This test checks that comments on the archive are handled properly
+
+        # check default comment is empty
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        self.assertEqual(zipf.comment, '')
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        zipfr = zipfile.ZipFile(TESTFN, mode="r")
+        self.assertEqual(zipfr.comment, '')
+        zipfr.close()
+
+        # check a simple short comment
+        comment = 'Bravely taking to his feet, he beat a very brave retreat.'
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.comment = comment
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        zipfr = zipfile.ZipFile(TESTFN, mode="r")
+        self.assertEqual(zipfr.comment, comment)
+        zipfr.close()
+
+        # check a comment of max length
+        comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.comment = comment2
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        zipfr = zipfile.ZipFile(TESTFN, mode="r")
+        self.assertEqual(zipfr.comment, comment2)
+        zipfr.close()
+
+        # check a comment that is too long is truncated
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.comment = comment2 + 'oops'
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        zipfr = zipfile.ZipFile(TESTFN, mode="r")
+        self.assertEqual(zipfr.comment, comment2)
+        zipfr.close()
+
+    def tearDown(self):
+        support.unlink(TESTFN)
+        support.unlink(TESTFN2)
+
+class DecryptionTests(unittest.TestCase):
+    # This test checks that ZIP decryption works. Since the library does not
+    # support encryption at the moment, we use a pre-generated encrypted
+    # ZIP file
+
+    data = (
+    'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
+    '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
+    '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
+    'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
+    '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
+    '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
+    '\x00\x00L\x00\x00\x00\x00\x00' )
+    data2 = (
+    'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
+    '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
+    '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
+    'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
+    '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
+    '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
+    'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
+    '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
+
+    plain = 'zipfile.py encryption test'
+    plain2 = '\x00'*512
+
+    def setUp(self):
+        fp = open(TESTFN, "wb")
+        fp.write(self.data)
+        fp.close()
+        self.zip = zipfile.ZipFile(TESTFN, "r")
+        fp = open(TESTFN2, "wb")
+        fp.write(self.data2)
+        fp.close()
+        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
+
+    def tearDown(self):
+        self.zip.close()
+        os.unlink(TESTFN)
+        self.zip2.close()
+        os.unlink(TESTFN2)