Commits

Armin Rigo  committed aac465b Merge

merge heads

  • Participants
  • Parent commits 099b99f, facac5b

Comments (0)

Files changed (9)

File pypy/module/_io/interp_bytesio.py

         self.buf = None
 
     @unwrap_spec('self', ObjSpace, W_Root)
-    def descr_init(self, space, w_initvalue=None):
+    def descr_init(self, space, w_initial_bytes=None):
         # In case __init__ is called multiple times
         self.buf = []
         self.string_size = 0
         self.pos = 0
 
-        if not space.is_w(w_initvalue, space.w_None):
-            self.write_w(space, w_initvalue)
+        if not space.is_w(w_initial_bytes, space.w_None):
+            self.write_w(space, w_initial_bytes)
             self.pos = 0
 
     def _check_closed(self, space, message=None):
     __getstate__ = interp2app(W_BytesIO.getstate_w),
     __setstate__ = interp2app(W_BytesIO.setstate_w),
     )
-

File pypy/module/_io/interp_stringio.py

         if not space.isinstance_w(w_obj, space.w_unicode):
             raise operationerrfmt(space.w_TypeError,
                                   "string argument expected, got '%s'",
-                                  space.type(self).getname(space, '?'))
+                                  space.type(w_obj).getname(space, '?'))
         self._check_closed(space)
         string = space.unicode_w(w_obj)
         size = len(string)
         self.pos = end
         return space.wrap(u''.join(self.buf[start:end]))
 
+    @unwrap_spec('self', ObjSpace, int)
+    def seek_w(self, space, pos):
+        if pos < 0:
+            raise operationerrfmt(space.w_ValueError,
+                "negative seek position: %d", pos
+            )
+        self.pos = pos
+
     @unwrap_spec('self', ObjSpace)
     def getvalue_w(self, space):
         self._check_closed(space)
     __init__ = interp2app(W_StringIO.descr_init),
     write=interp2app(W_StringIO.write_w),
     read=interp2app(W_StringIO.read_w),
+    seek=interp2app(W_StringIO.seek_w),
     getvalue=interp2app(W_StringIO.getvalue_w),
     readable = interp2app(W_StringIO.readable_w),
     writable = interp2app(W_StringIO.writable_w),
     seekable = interp2app(W_StringIO.seekable_w),
     close = interp2app(W_StringIO.close_w),
     closed = GetSetProperty(W_StringIO.closed_get_w),
-    )
-
+)

File pypy/module/_io/interp_textio.py

     readline = interp2app(W_TextIOBase.readline_w),
     detach = interp2app(W_TextIOBase.detach_w),
     encoding = interp_attrproperty_w("w_encoding", W_TextIOBase)
-    )
+)
 
 class PositionCookie:
     def __init__(self, bigint):

File pypy/module/_io/test/test_bytesio.py

         import _io
         raises(TypeError, _io.BytesIO, u"12345")
 
+    def test_init_kwargs(self):
+        import _io
+
+        buf = "1234567890"
+        b = _io.BytesIO(initial_bytes=buf)
+        assert b.read() == buf
+        raises(TypeError, _io.BytesIO, buf, foo=None)
+
     def test_capabilities(self):
         import _io
         f = _io.BytesIO()

File pypy/module/_io/test/test_stringio.py

         assert buf[5:] == sio.read(900)
         assert u"" == sio.read()
 
+    def test_seek(self):
+        import io
+
+        s = u"1234567890"
+        sio = io.StringIO(s)
+
+        sio.read(5)
+        sio.seek(0)
+        r = sio.read()
+        assert r == s
+
+        sio.seek(3)
+        r = sio.read()
+        assert r == s[3:]
+        raises(TypeError, sio.seek, 0.0)
+
+        exc_info = raises(ValueError, sio.seek, -3)
+        assert exc_info.value.args[0] == "negative seek position: -3"
+
+    def test_write_error(self):
+        import io
+
+        exc_info = raises(TypeError, io.StringIO, 3)
+        assert "int" in exc_info.value.args[0]
+
+        sio = io.StringIO(u"")
+        exc_info = raises(TypeError, sio.write, 3)
+        assert "int" in exc_info.value.args[0]

File pypy/module/_ssl/interp_ssl.py

                 raise ssl_error(self.space, "The read operation timed out")
             elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT:
                 raise ssl_error(self.space, "Underlying socket too large for select().")
+            elif sockstate == SOCKET_HAS_BEEN_CLOSED:
+                if libssl_SSL_get_shutdown(self.ssl) == SSL_RECEIVED_SHUTDOWN:
+                    return self.space.wrap('')
+                raise ssl_error(self.space, "Socket closed without SSL shutdown handshake")
 
         raw_buf, gc_buf = rffi.alloc_buffer(num_bytes)
         while True:
             elif err == SSL_ERROR_WANT_WRITE:
                 sockstate = check_socket_and_wait_for_timeout(self.space,
                     self.w_socket, True)
+            elif (err == SSL_ERROR_ZERO_RETURN and
+                  libssl_SSL_get_shutdown(self.ssl) == SSL_RECEIVED_SHUTDOWN):
+                return self.space.wrap("")
             else:
                 sockstate = SOCKET_OPERATION_OK
         

File pypy/objspace/std/complexobject.py

 def repr__Complex(space, w_complex):
     if w_complex.realval == 0 and copysign(1., w_complex.realval) == 1.:
         return space.wrap(repr_format(w_complex.imagval) + 'j')
-    sign = (copysign(1., w_complex.imagval) == 1.) and '+' or ''
+    sign = (copysign(1., w_complex.imagval) == 1. or
+            isnan(w_complex.imagval)) and '+' or ''
     return space.wrap('(' + repr_format(w_complex.realval)
                       + sign + repr_format(w_complex.imagval) + 'j)')
 
 def str__Complex(space, w_complex):
     if w_complex.realval == 0 and copysign(1., w_complex.realval) == 1.:
         return space.wrap(str_format(w_complex.imagval) + 'j')
-    sign = (copysign(1., w_complex.imagval) == 1.) and '+' or ''
+    sign = (copysign(1., w_complex.imagval) == 1. or
+            isnan(w_complex.imagval)) and '+' or ''
     return space.wrap('(' + str_format(w_complex.realval)
                       + sign + str_format(w_complex.imagval) + 'j)')
 

File pypy/objspace/std/test/test_complexobject.py

         assert repr(complex(-0.0, -0.0)) == '(-0-0j)'
         assert repr(complex(1e45)) == "(" + repr(1e45) + "+0j)"
         assert repr(complex(1e200*1e200)) == '(inf+0j)'
+        assert repr(complex(1,-float("nan"))) == '(1+nanj)'
 
     def test_neg(self):
         h = self.helper

File pypy/rlib/ropenssl.py

         "SSL_ERROR_WANT_CONNECT")
     SSL_ERROR_SYSCALL = rffi_platform.ConstantInteger("SSL_ERROR_SYSCALL")
     SSL_ERROR_SSL = rffi_platform.ConstantInteger("SSL_ERROR_SSL")
+    SSL_RECEIVED_SHUTDOWN = rffi_platform.ConstantInteger(
+        "SSL_RECEIVED_SHUTDOWN")
     SSL_CTRL_OPTIONS = rffi_platform.ConstantInteger("SSL_CTRL_OPTIONS")
     SSL_CTRL_MODE = rffi_platform.ConstantInteger("SSL_CTRL_MODE")
     BIO_C_SET_NBIO = rffi_platform.ConstantInteger("BIO_C_SET_NBIO")
 ssl_external('SSL_do_handshake', [SSL], rffi.INT)
 ssl_external('SSL_shutdown', [SSL], rffi.INT)
 ssl_external('SSL_get_error', [SSL, rffi.INT], rffi.INT)
+ssl_external('SSL_get_shutdown', [SSL], rffi.INT)
 ssl_external('SSL_set_read_ahead', [SSL, rffi.INT], lltype.Void)
 
 ssl_external('ERR_get_error', [], rffi.INT)