Source

cpython / Lib / test / test_gzip.py

Diff from to

File Lib/test/test_gzip.py

         raise io.UnsupportedOperation
 
 
-class TestGzip(unittest.TestCase):
+class BaseTest(unittest.TestCase):
     filename = support.TESTFN
 
     def setUp(self):
         support.unlink(self.filename)
 
 
+class TestGzip(BaseTest):
     def test_write(self):
         with gzip.GzipFile(self.filename, 'wb') as f:
             f.write(data1 * 50)
             d = f.read()
         self.assertEqual(d, data1*50)
 
+    def test_read1(self):
+        self.test_write()
+        blocks = []
+        nread = 0
+        with gzip.GzipFile(self.filename, 'r') as f:
+            while True:
+                d = f.read1()
+                if not d:
+                    break
+                blocks.append(d)
+                nread += len(d)
+                # Check that position was updated correctly (see issue10791).
+                self.assertEqual(f.tell(), nread)
+        self.assertEqual(b''.join(blocks), data1 * 50)
+
     def test_io_on_closed_object(self):
         # Test that I/O operations on closed GzipFile objects raise a
         # ValueError, just like the corresponding functions on file objects.
         # Bug #1074261 was triggered when reading a file that contained
         # many, many members.  Create such a file and verify that reading it
         # works.
-        with gzip.open(self.filename, 'wb', 9) as f:
+        with gzip.GzipFile(self.filename, 'wb', 9) as f:
             f.write(b'a')
         for i in range(0, 200):
-            with gzip.open(self.filename, "ab", 9) as f: # append
+            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
                 f.write(b'a')
 
         # Try reading the file
-        with gzip.open(self.filename, "rb") as zgfile:
+        with gzip.GzipFile(self.filename, "rb") as zgfile:
             contents = b""
             while 1:
                 ztxt = zgfile.read(8192)
             with io.BufferedReader(f) as r:
                 lines = [line for line in r]
 
-        self.assertEqual(lines, 50 * data1.splitlines(True))
+        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
 
     def test_readline(self):
         self.test_write()
             self.assertEqual(f.read(100), b'')
             self.assertEqual(nread, len(uncompressed))
 
+    def test_textio_readlines(self):
+        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
+        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
+        self.test_write()
+        with gzip.GzipFile(self.filename, 'r') as f:
+            with io.TextIOWrapper(f, encoding="ascii") as t:
+                self.assertEqual(t.readlines(), lines)
+
     def test_fileobj_from_fdopen(self):
         # Issue #13781: Opening a GzipFile for writing fails when using a
         # fileobj created with os.fdopen().
                 self.assertRaises(EOFError, f.read, 1)
 
 
+class TestOpen(BaseTest):
+    def test_binary_modes(self):
+        uncompressed = data1 * 50
+        with gzip.open(self.filename, "wb") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read())
+            self.assertEqual(file_data, uncompressed)
+        with gzip.open(self.filename, "rb") as f:
+            self.assertEqual(f.read(), uncompressed)
+        with gzip.open(self.filename, "ab") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read())
+            self.assertEqual(file_data, uncompressed * 2)
+
+    def test_implicit_binary_modes(self):
+        # Test implicit binary modes (no "b" or "t" in mode string).
+        uncompressed = data1 * 50
+        with gzip.open(self.filename, "w") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read())
+            self.assertEqual(file_data, uncompressed)
+        with gzip.open(self.filename, "r") as f:
+            self.assertEqual(f.read(), uncompressed)
+        with gzip.open(self.filename, "a") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read())
+            self.assertEqual(file_data, uncompressed * 2)
+
+    def test_text_modes(self):
+        uncompressed = data1.decode("ascii") * 50
+        uncompressed_raw = uncompressed.replace("\n", os.linesep)
+        with gzip.open(self.filename, "wt") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read()).decode("ascii")
+            self.assertEqual(file_data, uncompressed_raw)
+        with gzip.open(self.filename, "rt") as f:
+            self.assertEqual(f.read(), uncompressed)
+        with gzip.open(self.filename, "at") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read()).decode("ascii")
+            self.assertEqual(file_data, uncompressed_raw * 2)
+
+    def test_fileobj(self):
+        uncompressed_bytes = data1 * 50
+        uncompressed_str = uncompressed_bytes.decode("ascii")
+        compressed = gzip.compress(uncompressed_bytes)
+        with gzip.open(io.BytesIO(compressed), "r") as f:
+            self.assertEqual(f.read(), uncompressed_bytes)
+        with gzip.open(io.BytesIO(compressed), "rb") as f:
+            self.assertEqual(f.read(), uncompressed_bytes)
+        with gzip.open(io.BytesIO(compressed), "rt") as f:
+            self.assertEqual(f.read(), uncompressed_str)
+
+    def test_bad_params(self):
+        # Test invalid parameter combinations.
+        with self.assertRaises(TypeError):
+            gzip.open(123.456)
+        with self.assertRaises(ValueError):
+            gzip.open(self.filename, "wbt")
+        with self.assertRaises(ValueError):
+            gzip.open(self.filename, "rb", encoding="utf-8")
+        with self.assertRaises(ValueError):
+            gzip.open(self.filename, "rb", errors="ignore")
+        with self.assertRaises(ValueError):
+            gzip.open(self.filename, "rb", newline="\n")
+
+    def test_encoding(self):
+        # Test non-default encoding.
+        uncompressed = data1.decode("ascii") * 50
+        uncompressed_raw = uncompressed.replace("\n", os.linesep)
+        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
+            f.write(uncompressed)
+        with open(self.filename, "rb") as f:
+            file_data = gzip.decompress(f.read()).decode("utf-16")
+            self.assertEqual(file_data, uncompressed_raw)
+        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
+            self.assertEqual(f.read(), uncompressed)
+
+    def test_encoding_error_handler(self):
+        # Test with non-default encoding error handler.
+        with gzip.open(self.filename, "wb") as f:
+            f.write(b"foo\xffbar")
+        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+                as f:
+            self.assertEqual(f.read(), "foobar")
+
+    def test_newline(self):
+        # Test with explicit newline (universal newline mode disabled).
+        uncompressed = data1.decode("ascii") * 50
+        with gzip.open(self.filename, "wt", newline="\n") as f:
+            f.write(uncompressed)
+        with gzip.open(self.filename, "rt", newline="\r") as f:
+            self.assertEqual(f.readlines(), [uncompressed])
+
 def test_main(verbose=None):
-    support.run_unittest(TestGzip)
+    support.run_unittest(TestGzip, TestOpen)
 
 if __name__ == "__main__":
     test_main(verbose=True)