Commits

Lynn Rees committed 5b38341

- further

  • Participants
  • Parent commits 90ac222

Comments (0)

Files changed (5)

crossroads/high.py

-# -*- coding: utf-8 -*-
-'''High-level ctypes bindings to Crossroads API functions.'''

crossroads/low.py

         )
         return self.last_rc
 
-    def _setmany(self, **options):
-
-        return self
-
     def _sbind(self, prefix, *addresses):
         for address in addresses:
             self.last_rc = self._bind(prefix + tobytes(address))
         return unique_id()
 
     def send(self, data, size=None, more=False, nowait=False):
-        msg = data if isinstance(data, Message) else Message(size, data)
+        msg = Message(size, data)
         msg.last_rc = self.last_rc = self._send(
             msg.src,
             len(msg),
         return msg
 
     def sendmsg(self, data, more=False, nowait=False):
-        msg = data if isinstance(data, MessageXS) else MessageXS(data)
+        msg = MessageXS(data)
+        src = msg.src
         msg.last_rc = self.last_rc = self._sendmsg(
-            msg.src, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0
+            src, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
         )
         return msg
 
     def recv(self, size, nowait=False):
         msg = Message(size)
         msg.last_rc = self.last_rc = self._recv(
-            msg.dest, len(msg), XS.DONTWAIT if nowait else 0
+            msg.dest, msg.size, XS.DONTWAIT if nowait else 0,
         )
         msg.more = self.rcvmore
         return msg
 
     def recvmsg(self, nowait=False):
+        msg = MessageXS()
         try:
-            msg = MessageXS()
             msg.last_rc = self.last_rc = self._recvmsg(
-                msg.dest, XS.DONTWAIT if nowait else 0
+                msg.dest, XS.DONTWAIT if nowait else 0,
             )
             msg.more = self.rcvmore
         finally:

crossroads/message.py

 # -*- coding: utf-8 -*-
 '''Crossroads.IO messages.'''
 
-from ctypes import sizeof, string_at, c_ubyte
+from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t
 
-from stuf.six import PY3, tobytes, tounicode
+from stuf.six import PY3, tobytes, tounicode, strings
 
+from .constants import MORE
 from . import lowest as xs
+from stuf.deep import getcls
+from test.test_multiprocessing import c_int
 
 
-class BaseMsg(object):
+class BaseMessage(object):
 
-    def __init__(self, size, source=None):
-        self.src = source
-        self.size = size
-        self.more = self.rc = self.dest = None
+    def __init__(self):
+        self.more = self.rc = self.dest = self.last_rc = None
 
-    def __bytes__(self, *args, **kwargs):
-        if self.dest is not None:
-            return tobytes(string_at(self.dest, len(self)), 'latin-1')
-        return b''
+    def __bytes__(self):
+        if self.dest is None:
+            return b''
+        return tobytes(string_at(byref(self.dest), self.size))
 
-    def __unicode__(self, *args, **kwargs):
-        return tounicode(tobytes(self, 'utf-8'))
+    def __unicode__(self):
+        return tounicode(self)
 
     __str__ = __unicode__ if PY3 else __bytes__
 
 
-class Message(BaseMsg):
+class Message(BaseMessage):
 
-    __slots__ = 'src dest rc more size'.split()
+    __slots__ = 'src dest rc more size last_rc'.split()
 
     def __init__(self, size, source=None):
-        super(Message, self).__init__(size, source)
+        super(Message, self).__init__()
+        self.src = source
         if source is None:
             self.dest = xs.array(c_ubyte, size)
+            self.size = size
+        else:
+            if isinstance(source, (getcls(self), strings)):
+                self.src = source = bytes(source)
+            self.size = len(source) if size is None else size
+            self.dest = None
 
     def __len__(self):
-        if self.size is not None:
-            return self.size
-        if self.src is not None:
-            return len(self.src)
-        return sizeof(self.dest)
+        return self.size
 
 
-class MessageXS(BaseMsg):
+class MessageXS(BaseMessage):
 
-    __slots__ = 'src dest last_rc more ref'.split()
+    __slots__ = 'src dest rc more last_rc'.split()
 
-    def __init__(self, data=None):
-        super(MessageXS, self).__init__(32, data)
-        self.dest = self.ref = dest = xs.msg_t()
-        if data is None:
-            self.rc = xs.msg_init(dest)
+    def __init__(self, source=None):
+        super(MessageXS, self).__init__()
+        self.dest = xs.msg_t()
+        if isinstance(source, getcls(self)):
+            self.src = source.dest
+            self.rc = xs.msg_init(self.src)
+            self.size = sizeof(self.src)
+        elif source is None:
+            self.src = self.dest
+            self.rc = xs.msg_init(self.src)
+            self.size = sizeof(self.src)
         else:
-            self.rc = xs.msg_init_size(dest, 32)
-            xs.msg_init_size(dest, 32)
-            xs.memmove(xs.msg_data(dest), data, 32)
+            self.src = bytes(source)
+            length = self.size = len(self.src)
+            self.rc = xs.msg_init_size(self.dest, length)
+            xs.memmove(xs.msg_data(self.dest), self.src, length)
 
     def __len__(self):
-        return sizeof(self.dest)
+        return self.size
+
+    @property
+    def moremsg(self):
+        more = c_int()
+        more_size = c_size_t(sizeof(more))
+        self.rc = xs.getmsgopt(self.dest, MORE, more, more_size)
+        value = more.value
+        del more_size, more
+        return value
 
     def close(self):
         self.rc = xs.msg_close(self.dest)
-
-
-#class BaseMultipart(object):
-#
-#    def __init__(self, size, source=None):
-#        self.src = source
-#        self.more = self.rc = self.dest = None
-#        self.parts = []
-#        self.__add__ = self.parts.append
-#
-#    def __bytes__(self, *args, **kwargs):
-#        if self.dest is not None:
-#            return b''.join(tobytes(
-#                string_at(byref(i), sizeof(i)), 'latin-1') for i in self.parts
-#            )
-#        return b''
-#
-#    def __unicode__(self, *args, **kwargs):
-#        return tounicode(tobytes(self, 'utf-8'))
-#
-#    __str__ = __unicode__ if PY3 else __bytes__
-#
-#    @property
-#    def dest(self):
-#        return self.parts[-1]
-#class Multipart(BaseMultipart):
-#
-#    def __init__(self, size, source=None):
-#        super(Multipart, self).__init__(size, source)
-#
-#    def __len__(self):
-#
-#        return len(self.src) if self.dest is None else sizeof(self.dest)
-#
-#    @property
-#    def ref(self):
-#        return None if self.dest is None else byref(self.dest)
-#
-#
-#class XSMultipart(BaseMultipart):
-#
-#    def __init__(self, size, source=None):
-#        super(XSMultipart, self).__init__(size, source)
-#
-#    def __len__(self):
-#        return sizeof(self.dest)
-#
-#    def close(self):
-#        self.last_rc = sum(xs.msg_close(i) for i in self.parts)

tests/test_low_level.py

         s = ctx.push(linger=100)
         with s.connect_tcp('127.0.0.1:5560'):
             # send a message.
-            s.send(b'r')
+            s.send('r')
             self.assertEqual(s.last_rc, 1)
         # terminate the context. this should take 0.1 second.
         watch = self.xs.stopwatch_start()
             push_id = push.last_rc
             self.assertNotEqual(push_id, -1)
             # pass one message through to ensure the connection is established
-            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            self.assertEqual(push.send('ABC').last_rc, 3)
             pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
             # shut down the bound endpoint
             # check that sending would block (there's no outbound connection)
             try:
 
-                push.send(b'ABC', nowait=True)
+                push.send('ABC', nowait=True)
             except self.xs.XSError as e:
                 self.assertTrue(e.errno, self.XS.EAGAIN)
         # now the other way round...Create infrastructure
             push_id = push.last_rc
             self.assertNotEqual(push_id, -1)
             # pass one message through to ensure the connection is established
-            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            self.assertEqual(push.send('ABC').last_rc, 3)
             pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
             # shut down the bound endpoint
             sleep(1)
             # check that sending would block (there's no outbound connection)
             try:
-                push.send(b'ABC', nowait=True)
+                push.send('ABC', nowait=True)
             except self.xs.XSError as e:
                 self.assertTrue(e.errno, self.XS.EAGAIN)
 
         buf2 = sb.recv(32)
         self.assertEqual(len(buf2), 32)
         self.assertEqual(buf2.more, 0)
-        self.assertEqual(sb.send(bytes(buf1), more=True).last_rc, 32)
-        self.assertEqual(sb.send(bytes(buf2)).last_rc, 32)
+        self.assertEqual(sb.send(buf1, more=True).last_rc, 32)
+        self.assertEqual(sb.send(buf2).last_rc, 32)
         # receive the bounced message
         buf1 = sc.recv(32)
         self.assertEqual(buf1.last_rc, 32)
             self.assertNotEqual(sb.last_rc, -1)
             self.assertNotEqual(sc.last_rc, -1)
             # send 2 - part message
-            self.assertEqual(sc.send(b'A', more=True).last_rc, 1)
-            self.assertEqual(sc.send(b'B').last_rc, 1)
+            self.assertEqual(sc.send('A', more=True).last_rc, 1)
+            self.assertEqual(sc.send('B').last_rc, 1)
             # identity comes first
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
-            self.assertEqual(msg.more, 1)
+            self.assertEqual(msg.moremsg, 1)
             # then the first part of the message body
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
-            self.assertEqual(msg.more, 1)
+            self.assertEqual(msg.moremsg, 1)
             # and finally, the second part of the message body
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
-            self.assertEqual(msg.more, 0)
+            self.assertEqual(msg.moremsg, 0)
 
     def test_wireformat(self):
         import socket
             )
             rpull.connect(('127.0.0.1', 5561))
             # let's send some data and check if it arrived
-            self.assertEqual(rpush.send(b'\x04\0abc', 0), 5)
+            self.assertEqual(rpush.send('\x04\0abc', 0), 5)
             buf = pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
             self.assertEqual(bytes(buf), b'abc')
             # let's push this data into another socket
-            self.assertEqual(push.send(bytes(buf)).last_rc, 3)
+            self.assertEqual(push.send(buf).last_rc, 3)
             self.assertNotEqual(rpull.recv(3), b'\x04\0abc')
             rpush.close()
             rpull.close()
         pull = self.ctx.pull()
         # connect before bind was done at the peer and send one message
         with push.connect_tcp('127.0.0.1:5560'):
-            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            self.assertEqual(push.send('ABC').last_rc, 3)
             # wait a while for few attempts to reconnect to happen
             sleep(1)
             # bind the peer and get the message
             pull = self.ctx.pull()
             with push.connect_ipc('/tmp/tester'):
                 # connect before bind was done at the peer and send one message
-                self.assertEqual(push.send(b'ABC').last_rc, 3)
+                self.assertEqual(push.send('ABC').last_rc, 3)
                 # wait a while for few attempts to reconnect to happen
                 sleep(1)
                 # bind the peer and get the message
                 self.assertTrue(s1.last_rc >= 0)
                 self.assertTrue(s2.last_rc >= 0)
                 for i in lrange(10):
-                    self.assertEqual(s2.send(b'test', nowait=True).last_rc, 4)
+                    self.assertEqual(s2.send('test', nowait=True).last_rc, 4)
                     s1.recv(4)
                     self.assertEqual(s1.last_rc, 4)
 
             surveyor.connect_inproc('b'), respondent1.connect_inproc('a'), \
             respondent2.connect_inproc('a'):
             # end the survey
-            self.assertEqual(surveyor.send(b'ABC').last_rc, 3)
+            self.assertEqual(surveyor.send('ABC').last_rc, 3)
             # Forward the survey through the intermediate device. Survey consist
             # of identity(4 bytes), survey ID(4 bytes) and the body.
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xsurveyor.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xsurveyor.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xsurveyor.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xsurveyor.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(3)
             self.assertEqual(buf.last_rc, 3)
-            self.assertEqual(xsurveyor.send(bytes(buf)).last_rc, 3)
+            self.assertEqual(xsurveyor.send(buf).last_rc, 3)
             # respondent 1 responds to the survey
             buf = respondent1.recv(3)
             self.assertEqual(buf.last_rc, 3)
-            self.assertEqual(respondent1.send(b'DE').last_rc, 2)
+            self.assertEqual(respondent1.send('DE').last_rc, 2)
             # forward the response through the intermediate device
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xsurveyor.recv(2)
             self.assertEqual(buf.last_rc, 2)
-            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 2)
+            self.assertEqual(xrespondent.send(buf).last_rc, 2)
             # surveyor gets the response
             buf = surveyor.recv(2)
             self.assertEqual(buf.last_rc, 2)
             # respondent 2 responds to the survey
             buf = respondent2.recv(3)
             self.assertEqual(buf.last_rc, 3)
-            self.assertEqual(respondent2.send(b'FGHI').last_rc, 4)
+            self.assertEqual(respondent2.send('FGHI').last_rc, 4)
             # forward the response through the intermediate device
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf).last_rc, 4)
             # surveyor gets the response
             buf = surveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
             # now let's test whether survey timeout works as expected
             surveyor.survey_timeout = 100
             self.assertEqual(surveyor.last_rc, 0)
-            self.assertEqual(surveyor.send(b'ABC', 3, 0).last_rc, 3)
+            self.assertEqual(surveyor.send('ABC', 3, 0).last_rc, 3)
             watch = self.xs.stopwatch_start()
             try:
                 surveyor.recv(4)
             # initiate new survey.
             surveyor.survey_timeout = 100
             self.assertEqual(surveyor.last_rc, 0)
-            self.assertEqual(surveyor.send(b'DE').last_rc, 2)
+            self.assertEqual(surveyor.send('DE').last_rc, 2)
             # read, process and reply to the old survey
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(3)
             self.assertEqual(buf.last_rc, 3)
-            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 3)
+            self.assertEqual(xrespondent.send(buf).last_rc, 3)
             # read, process and reply to the new survey
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            self.assertEqual(xrespondent.send(buf, more=True).last_rc, 4)
             buf = xrespondent.recv(2)
             self.assertEqual(buf.last_rc, 2)
-            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 2)
+            self.assertEqual(xrespondent.send(buf).last_rc, 2)
             # Get the response and check it's the response to the new survey and
             # that response to the old survey was silently discarded.
             self.assertEqual(surveyor.recv(2).last_rc, 2)
         req_socket = self.ctx.req(linger=0)
         with xrep_socket.bind_inproc('hi'), req_socket.connect_inproc('hi'):
             # initial request.
-            self.assertEqual(req_socket.send(b'r').last_rc, 1)
+            self.assertEqual(req_socket.send('r').last_rc, 1)
             # receive the request
             addr = xrep_socket.recv(32)
             addr_size = addr.last_rc
             body = xrep_socket.recv(1)
             self.assertEqual(body.last_rc, 1)
             # send invalid reply
-            self.assertEqual(xrep_socket.send(addr.dest, 5).last_rc, addr_size)
+            self.assertEqual(xrep_socket.send(addr, 5).last_rc, addr_size)
             # send valid reply
             self.assertEqual(
-                xrep_socket.send(addr.dest, 5, more=True).last_rc, addr_size,
+                xrep_socket.send(addr, 5, True).last_rc, addr_size,
             )
             self.assertEqual(
-                xrep_socket.send(bottom.dest, 0, more=True).last_rc, 0
+                xrep_socket.send(bottom, 0, True).last_rc, 0
             )
-            self.assertEqual(xrep_socket.send(b'b').last_rc, 1)
+            self.assertEqual(xrep_socket.send('b').last_rc, 1)
             # check whether we've got the valid reply
             body = req_socket.recv(1, nowait=True)
             self.assertEqual(body.last_rc, 1)
         pub = self.ctx.pub()
         sub = self.ctx.sub()
         with xpub.bind_tcp('127.0.0.1:5560'), xsub.bind_tcp('127.0.0.1:5561'), \
-           pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
+          pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
             # create a subscriber and subscribe for all messages
             sub.subscribe = ''
             self.assertEqual(sub.last_rc, 0)
             sub.recv(xpub.last_rc)
             self.assertEqual(sub.last_rc, 0)
 
-#    def test_regrep_device(self):
-#        # create a req/rep device
-#        xreq = self.ctx.xreq()
-#        xrep = self.ctx.xrep()
-#        # create a worker
-#        rep = self.ctx.rep()
-#        # create a client
-#        req = self.ctx.req()
-#        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
-#            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
-#            # send a request
-#            self.assertEqual(req.send(b'ABC', 3, more=True).last_rc, 3)
-#            self.assertEqual(req.send(b'DEF', 3).last_rc, 3)
-#            # pass the request through the device
-#            for i in lrange(4):
-#                msg = xrep.recvmsg()
-#                self.assertTrue(msg.last_rc >= 0)
-#                self.assertTrue(xreq.sendmsg(
-#                    msg.dest, True if msg.more else False
-#                ).last_rc >= 0)
-##            # receive the request
+    def test_regrep_device(self):
+        # create a req/rep device
+        xreq = self.ctx.xreq()
+        xrep = self.ctx.xrep()
+        # create a worker
+        rep = self.ctx.rep()
+        # create a client
+        req = self.ctx.req()
+        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'),\
+           rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
+            # send a request
+            self.assertEqual(req.send('ABC', 3, True).last_rc, 3)
+            self.assertEqual(req.send('DEF', 3).last_rc, 3)
+            # pass the request through the device
+            msg = xrep.recvmsg()
+            self.assertEqual(msg.last_rc, 5)
+            sendmore = True if msg.more else False
+            self.assertEqual(xreq.sendmsg(msg, sendmore).last_rc, 5)
+#            msg = xrep.recvmsg()
+#            self.assertEqual(msg.last_rc, 0)
+#            rc = xreq.sendmsg(msg, True if msg.more else False).last_rc
+#            self.assertEqual(rc, 0)
+#            msg = xrep.recvmsg()
+#            self.assertEqual(msg.last_rc, 3)
+#            rc = xreq.sendmsg(msg, True if msg.more else False).last_rc
+#            self.assertEqual(rc, 3)
+#            msg = xrep.recvmsg()
+#            self.assertEqual(msg.last_rc, 3)
+#            rc = xreq.sendmsg(msg, True if msg.more else False).last_rc
+#            self.assertEqual(rc, 3)
+#            # receive the request
 #            buff = rep.recv(3)
+#            self.assertEqual(bytes(buff), b'ABC')
 #            self.assertEqual(buff.last_rc, 3)
-#            self.assertEqual(bytes(buff), b'ABC')
 #            self.assertEqual(buff.more, 0)
 #            buff2 = rep.recv(3)
 #            self.assertEqual(buff2.last_rc, 3)
 #            self.assertEqual(bytes(buff2), b'DEF')
 #            self.assertEqual(buff2.more, 0)
-#            # Send the reply.
-#            self.assertEqual(rep.send(b'GHI', more=True).last_rc, 3)
-#            self.assertEqual(rep.send(b'JKL').last_rc, 3)
-#            # Pass the reply through the device.
+#            # send the reply
+#            self.assertEqual(rep.send('GHI', more=True).last_rc, 3)
+#            self.assertEqual(rep.send('JKL').last_rc, 3)
+#            # pass the reply through the device
 #            for i in lrange(4):
 #                msg = xrep.recvmsg()
 #                self.assertTrue(msg.last_rc >= 0)
-#                self.assertTrue(
-#                    xreq.sendmsg(bytes(msg.dest), more=True if msg.more else False).last_rc >= 0
-#                )
-#            # Receive the reply.
+#                rc = xreq.sendmsg(msg, True if msg.more else False).last_rc
+#                self.assertTrue(rc >= 0)
+#            # receive the reply
 #            buff = req.recv(3)
 #            self.assertEqual(buff.last_rc, 3)
 #            self.assertEqual(bytes(buff), b'GHI')
 #            self.assertEqual(buff2.last_rc, 3)
 #            self.assertEqual(bytes(buff2), b'JKL')
 #            self.assertEqual(buff2.more, 0)
-#
+
 #    def test_resubscribe(self):
 #        from stuf.six import int2byte
 ##        from time import sleep

tests/test_lowest_level.py

         for i in lrange(4):
             msg = xs.msg_t()
             self.assertEqual(xs.msg_init(msg), 0)
-            self.assertTrue(xs.recvmsg(xrep, msg, 0) >= 0)
+            rc = xs.recvmsg(xrep, msg, 0)
+            self.assertTrue(rc >= 0)
             rcvmore = c_int()
             sz = c_size_t(sizeof(rcvmore))
             self.assertEqual(xs.getsockopt(xrep, XS.RCVMORE, rcvmore, sz), 0)
-            self.assertTrue(
-                xs.sendmsg(xreq, msg, XS.SNDMORE if rcvmore else 0) >= 0
-            )
+            rc = xs.sendmsg(xreq, msg, XS.SNDMORE if rcvmore else 0)
+            self.assertTrue(rc >= 0)
         # receive the request.
         buff = array(c_ubyte, 3)
         self.assertEqual(xs.recv(rep, buff, 3, 0), 3)