Commits

Anonymous committed efbbc2b Merge

Merge v2.7 test_io fixes

Comments (0)

Files changed (3)

     def raw(self):
         return self._raw
 
-    # Jython difference: @property closed(self) inherited from _IOBase.__closed
+    @property
+    def closed(self):
+        return self.raw.closed
 
     # Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
     # compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
                )[self.seennl]
 
 
+def _check_decoded_chars(chars):
+    """Check decoder output is unicode"""
+    if not isinstance(chars, unicode):
+        raise TypeError("decoder should return a string result, not '%s'" %
+                        type(chars))
+
+def _check_buffered_bytes(b, context="read"):
+    """Check buffer has returned bytes"""
+    if not isinstance(b, str):
+        raise TypeError("underlying %s() should have returned a bytes object, not '%s'" %
+                        (context, type(b)))
+
+
+
 class TextIOWrapper(_TextIOBase):
 
     r"""Character and line based layer over a _BufferedIOBase object, buffer.
             finally:
                 self.buffer.close()
 
-    # Jython difference: @property closed(self) inherited from _IOBase.__closed
+    @property
+    def closed(self):
+        return self.buffer.closed
 
     # Jython difference: emulate C implementation CHECK_INITIALIZED. This is for
     # compatibility, to pass test.test_io.CTextIOWrapperTest.test_initialization.
 
         # Read a chunk, decode it, and put the result in self._decoded_chars.
         input_chunk = self.buffer.read1(self._CHUNK_SIZE)
+        _check_buffered_bytes(input_chunk, "read1")
+
         eof = not input_chunk
-        self._set_decoded_chars(self._decoder.decode(input_chunk, eof))
+        decoded_chunk = self._decoder.decode(input_chunk, eof)
+        _check_decoded_chars(decoded_chunk)
+        self._set_decoded_chars(decoded_chunk)
 
         if self._telling:
             # At the snapshot point, len(dec_buffer) bytes before the read,
         if chars_to_skip:
             # Just like _read_chunk, feed the decoder and save a snapshot.
             input_chunk = self.buffer.read(bytes_to_feed)
-            self._set_decoded_chars(
-                self._decoder.decode(input_chunk, need_eof))
+            _check_buffered_bytes(input_chunk)
+            decoded_chunk = self._decoder.decode(input_chunk, need_eof)
+            _check_decoded_chars(decoded_chunk)
+            self._set_decoded_chars(decoded_chunk)
+
             self._snapshot = (dec_flags, input_chunk)
 
             # Skip chars_to_skip of the decoded characters.
             raise TypeError("an integer is required")
         if n < 0:
             # Read everything.
-            result = (self._get_decoded_chars() +
-                      decoder.decode(self.buffer.read(), final=True))
+            input_chunk = self.buffer.read()
+            # Jython difference: CPython textio.c omits:
+            _check_buffered_bytes(input_chunk)
+            decoded_chunk = decoder.decode(input_chunk, final=True)
+            _check_decoded_chars(decoded_chunk)
+            result = self._get_decoded_chars() + decoded_chunk
             self._set_decoded_chars('')
             self._snapshot = None
             return result

Lib/test/test_io.py

 import errno
 from itertools import cycle, count
 from collections import deque
+from UserList import UserList
 from test import test_support as support
+import contextlib
 
 import codecs
 import io               # subject of test
             raise IOError()
         f.flush = bad_flush
         self.assertRaises(IOError, f.close) # exception not swallowed
+        self.assertTrue(f.closed)
 
     def test_multi_close(self):
         f = self.open(support.TESTFN, "wb", buffering=0)
         self.assertEqual(rawio.read(2), None)
         self.assertEqual(rawio.read(2), b"")
 
+    def test_fileio_closefd(self):
+        # Issue #4841
+        with self.open(__file__, 'rb') as f1, \
+             self.open(__file__, 'rb') as f2:
+            fileio = self.FileIO(f1.fileno(), closefd=False)
+            # .__init__() must not close f1
+            fileio.__init__(f2.fileno(), closefd=False)
+            f1.readline()
+            # .close() must not close f2
+            fileio.close()
+            f2.readline()
+
+
 class CIOTest(IOTest):
 
     @unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
         raw.flush = bad_flush
         b = self.tp(raw)
         self.assertRaises(IOError, b.close) # exception not swallowed
+        self.assertTrue(b.closed)
+
+    def test_close_error_on_close(self):
+        raw = self.MockRawIO()
+        def bad_flush():
+            raise IOError('flush')
+        def bad_close():
+            raise IOError('close')
+        raw.close = bad_close
+        b = self.tp(raw)
+        b.flush = bad_flush
+        with self.assertRaises(IOError) as err: # exception not swallowed
+            b.close()
+        self.assertEqual(err.exception.args, ('close',))
+        self.assertFalse(b.closed)
 
     def test_multi_close(self):
         raw = self.MockRawIO()
             buf.raw = x
 
 
+class SizeofTest:
+
+    @support.cpython_only
+    def test_sizeof(self):
+        bufsize1 = 4096
+        bufsize2 = 8192
+        rawio = self.MockRawIO()
+        bufio = self.tp(rawio, buffer_size=bufsize1)
+        size = sys.getsizeof(bufio) - bufsize1
+        rawio = self.MockRawIO()
+        bufio = self.tp(rawio, buffer_size=bufsize2)
+        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+
+
 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
     read_mode = "rb"
 
                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
 
 
-class CBufferedReaderTest(BufferedReaderTest):
+class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
     tp = io.BufferedReader
 
     def test_constructor(self):
         support.gc_collect()
         self.assertTrue(wr() is None, wr)
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
 class PyBufferedReaderTest(BufferedReaderTest):
     tp = pyio.BufferedReader
 
         bufio.flush()
         self.assertEqual(b"abc", writer._write_stack[0])
 
+    def test_writelines(self):
+        l = [b'ab', b'cd', b'ef']
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        bufio.writelines(l)
+        bufio.flush()
+        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+    def test_writelines_userlist(self):
+        l = UserList([b'ab', b'cd', b'ef'])
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        bufio.writelines(l)
+        bufio.flush()
+        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+    def test_writelines_error(self):
+        writer = self.MockRawIO()
+        bufio = self.tp(writer, 8)
+        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
+        self.assertRaises(TypeError, bufio.writelines, None)
+
     @unittest.skipIf(support.is_jython, "GC nondeterministic in Jython")
     def test_destructor(self):
         writer = self.MockRawIO()
                                      DeprecationWarning)):
             self.tp(self.MockRawIO(), 8, 12)
 
-
-class CBufferedWriterTest(BufferedWriterTest):
+    def test_write_error_on_close(self):
+        raw = self.MockRawIO()
+        def bad_write(b):
+            raise IOError()
+        raw.write = bad_write
+        b = self.tp(raw)
+        b.write(b'spam')
+        self.assertRaises(IOError, b.close) # exception not swallowed
+        self.assertTrue(b.closed)
+
+
+class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
     tp = io.BufferedWriter
 
     def test_constructor(self):
         with self.open(support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"123xxx")
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
 
 class PyBufferedWriterTest(BufferedWriterTest):
     tp = pyio.BufferedWriter
                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
 
 
-class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
+class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
+                          BufferedRandomTest, SizeofTest):
     tp = io.BufferedRandom
 
     def test_constructor(self):
         CBufferedReaderTest.test_garbage_collection(self)
         CBufferedWriterTest.test_garbage_collection(self)
 
+    @unittest.skipIf(support.is_jython, "FIXME: in the Java version with ArgParser")
+    # When implemented in Python, the error message is about __init__, even on CPython
+    def test_args_error(self):
+        # Issue #17275
+        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
+            self.tp(io.BytesIO(), 1024, 1024, 1024)
+
+
 class PyBufferedRandomTest(BufferedRandomTest):
     tp = pyio.BufferedRandom
 
             reads += c
         self.assertEqual(reads, "A"*127+"\nB")
 
+    def test_writelines(self):
+        l = ['ab', 'cd', 'ef']
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf)
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_userlist(self):
+        l = UserList(['ab', 'cd', 'ef'])
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf)
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_error(self):
+        txt = self.TextIOWrapper(self.BytesIO())
+        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+        self.assertRaises(TypeError, txt.writelines, None)
+        self.assertRaises(TypeError, txt.writelines, b'abc')
+
     def test_issue1395_1(self):
         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
 
             raise IOError()
         txt.flush = bad_flush
         self.assertRaises(IOError, txt.close) # exception not swallowed
+        self.assertTrue(txt.closed)
 
     def test_multi_close(self):
         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
         with self.assertRaises((AttributeError, TypeError)):
             txt.buffer = buf
 
+    def test_read_nonbytes(self):
+        # Issue #17106
+        # Crash when underlying read() returns non-bytes
+        class NonbytesStream(self.StringIO):
+            read1 = self.StringIO.read
+        class NonbytesStream(self.StringIO):
+            read1 = self.StringIO.read
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            t.read(1)
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            t.readline()
+        t = self.TextIOWrapper(NonbytesStream('a'))
+        with self.maybeRaises(TypeError):
+            # Jython difference: also detect the error in
+            t.read()
+
+    def test_illegal_decoder(self):
+        # Issue #17106
+        # Crash when decoder returns non-string
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.read(1)
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.readline()
+        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
+                               encoding='quopri_codec')
+        with self.maybeRaises(TypeError):
+            t.read()
+
+
 class CTextIOWrapperTest(TextIOWrapperTest):
 
     def test_initialization(self):
             t2.buddy = t1
         support.gc_collect()
 
+    maybeRaises = unittest.TestCase.assertRaises
+
 
 class PyTextIOWrapperTest(TextIOWrapperTest):
-    pass
+    @contextlib.contextmanager
+    def maybeRaises(self, *args, **kwds):
+        yield
 
 
 class IncrementalNewlineDecoderTest(unittest.TestCase):

src/org/python/core/PyByteArray.java

 
     @ExposedMethod(doc = BuiltinDocs.bytearray_extend_doc)
     final synchronized void bytearray_extend(PyObject o) {
+        // Raise TypeError if the argument is not iterable
+        o.__iter__();
         // Use the general method, assigning to the crack at the end of the array.
         // Note this deals with all legitimate PyObject types including the case o==this.
         setslice(size, size, 1, o);