Lynn Rees avatar Lynn Rees committed cf91d0c

- more tests

Comments (0)

Files changed (4)

crossroads/constants.py

 POLLOUT = 2
 POLLERR = 4
 
-SOCKET_TYPES = frozenset((
+TYPES = frozenset((
     PAIR, PUB, SUB, REQ, REP, XREQ, XREP, PULL, PUSH, XPUB, XSUB, SURVEYOR,
     RESPONDENT, XSURVEYOR, XRESPONDENT,
 ))
-BINARY_OPTS = frozenset((SUBSCRIBE, UNSUBSCRIBE, IDENTITY))
-INT_OPTS = frozenset((
+BINARY = frozenset((SUBSCRIBE, UNSUBSCRIBE, IDENTITY))
+INT = frozenset((
     SNDHWM, RCVHWM, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, LINGER, RECONNECT_IVL,
     RECONNECT_IVL_MAX, BACKLOG, MULTICAST_HOPS, RCVTIMEO, SNDTIMEO, IPV4ONLY,
     KEEPALIVE, SURVEY_TIMEOUT, RCVMORE,
 ))
-INT64_OPTS = ((AFFINITY, MAXMSGSIZE))
+INT64 = ((AFFINITY, MAXMSGSIZE))
+PROTO = dict(inproc=b'inproc://', ipc=b'ipc://', tcp=b'tcp://')
 
 from errno import (
     EAGAIN, EFAULT, EINVAL, ENETDOWN, ENOBUFS, ENODEV, ENOMEM, ETIMEDOUT,

crossroads/low.py

 # -*- coding: utf-8 -*-
 '''Low level API for ctypes binding to Crossroads.IO.'''
 
-from uuid import uuid4
 from functools import partial
-from operator import methodcaller
 from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_size_t
 
+from stuf.utils import unique_id
 from stuf.desc import Setter, lazy
-from stuf.six import OrderedDict, items, tobytes, b
+from stuf.six import OrderedDict, items, tobytes
 
 from . import lowest as xs
 from . import constants as XS
 from .message import XSMessage, Message
 
-PREFIX = dict(inproc=b('inproc://'), ipc=b('ipc://'), tcp=b('tcp://'))
-split = methodcaller('rsplit', '_')
 bigxsget = partial(getattr, XS)
 xsget = partial(getattr, xs)
 
         except AttributeError:
             try:
                 this = bigxsget(key.upper())
-                if this in XS.SOCKET_TYPES:
+                if this in XS.TYPES:
                     return self._setter(key.lower(), self._open, this)
             except AttributeError:
                 raise AttributeError(key)
                     if key.startswith(('bind', 'connect')):
                         method, prefix = key.split('_')
                         return self._setter(
-                            key, getattr(self, '_s' + method), PREFIX[prefix]
+                            key, getattr(self, '_s' + method), XS.PROTO[prefix],
                         )
                     raise AttributeError(key)
 
         if any((e is not None, c is not None, b is not None)):
             return False
 
+    def _opt_type(self, key):
+        opt_key = bigxsget(key.upper())
+        if opt_key in XS.INT:
+            opt_type = c_int
+        elif opt_key in XS.INT64:
+            opt_type = c_int64
+        elif opt_key in XS.BINARY:
+            opt_type = c_char_p
+        return opt_key, opt_type
+
     def _get(self, key):
-        const = bigxsget(key.upper())
-        if const in XS.INT_OPTS:
-            value = c_int()
-        elif const in XS.INT64_OPTS:
-            value = c_int64()
-        elif const in XS.BINARY_OPTS:
-            value = c_char_p()
-        self._getsockopt(const, value, c_size_t(sizeof(value)))
+        opt_key, opt_type = self._opt_type(key)
+        value = opt_type()
+        self._getsockopt(opt_key, value, c_size_t(sizeof(value)))
         return value.value
 
     def _set(self, key, value):
-        const = bigxsget(key.upper())
-        if const in XS.INT_OPTS:
-            value = c_int(value)
-        elif const in XS.INT64_OPTS:
-            value = c_int64(value)
-        elif const in XS.BINARY_OPTS:
-            value = c_char_p(value)
+        opt_key, opt_type = self._opt_type(key)
+        value = opt_type(value)
         self.last_rc = self._setsockopt(
-            const, byref(value), 0 if const == XS.SUBSCRIBE else sizeof(value)
+            opt_key,
+            byref(value),
+            0 if opt_key == XS.SUBSCRIBE else sizeof(value),
         )
         return self.last_rc
 
 
     def _sbind(self, prefix , *addresses):
         for address in addresses:
-            # coerce to bytes
             self.last_rc = self._bind(prefix + tobytes(address))
-            self.bindings[uuid4().get_hex().upper()] = self.last_rc
+            self.bindings[unique_id()] = self.last_rc
         return self
 
     def _sconnect(self, prefix, *addresses):
         for address in addresses:
-            # coerce to bytes
             self.last_rc = self._connect(prefix + tobytes(address))
-            self.connections[uuid4().get_hex().upper()] = self.last_rc
+            self.connections[unique_id()] = self.last_rc
         return self
 
     @lazy
     def id(self):
-        return uuid4().get_hex().upper()
+        return unique_id()
 
     def send(self, data, size=None, more=False, nowait=False):
-        msg = Message(size, data)
+        msg = data if isinstance(data, Message) else Message(size, data)
         msg.last_rc = self.last_rc = self._send(
             msg.source,
             len(msg),
         )
         return msg
 
+    def sendmsg(self, data, more=False, nowait=False):
+        msg = data if isinstance(data, XSMessage) else XSMessage(data)
+        msg.last_rc = self.last_rc = self._sendmsg(
+            msg.source,
+            XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
+        )
+        return msg
+
     def recv(self, size, nowait=False):
-        # staging
         msg = Message(size)
         msg.last_rc = self.last_rc = self._recv(
             msg.dest, len(msg), XS.DONTWAIT if nowait else 0
         msg.more = self.rcvmore
         return msg
 
-    def sendmsg(self, data, more=False, nowait=False):
-        msg = XSMessage(data)
-        msg.last_rc = self.last_rc = self._sendmsg(
-            msg.dest, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
-        )
-        return msg
-
     def recvmsg(self, nowait=False):
-        msg = XSMessage()
         try:
+            msg = XSMessage()
             msg.last_rc = self.last_rc = self._recvmsg(
                 msg.dest, XS.DONTWAIT if nowait else 0
             )
-            more = c_int()
-            xs.getmsgopt(msg.dest, XS.MORE, more, c_size_t(sizeof(more)))
-            msg.more = more.value
+            msg.more = self.rcvmore
         finally:
             msg.close()
         return msg

crossroads/message.py

 # -*- coding: utf-8 -*-
 '''Crossroads.IO messages.'''
 
-from stuf.six import PY3, tobytes, tounicode, b
-from ctypes import byref, sizeof, string_at, c_ubyte
+from stuf.six import PY3, tobytes, tounicode, b, isstring
+from ctypes import byref, sizeof, string_at, c_ubyte, c_int, c_size_t
 
 from . import lowest as xs
+from . import constants as XS
 
 
 class BaseMsg(object):
 
     def __init__(self, size, source=None):
-        self.source = source
+        self.parts = []
+        self.raw = source
         self.size = size
-        self.more = self.rc = self.dest = None
+        self.more = self.rc = None
 
-    def __bytes__(self, *args, **kwargs):
-        if self.dest is not None:
-            return tobytes(string_at(self.ref, len(self)), 'latin-1')
-        return b('')
+    def __add__(self, other):
+        if isinstance(other, BaseMsg):
+            self.parts.extend(other.parts)
+        elif isstring(other):
+            self.parts.append(other)
+        return self
 
-    def __unicode__(self, *args, **kwargs):
-        return tounicode(tobytes(self, 'utf-8'))
+    __iadd__ = __add__
 
-    __str__ = __unicode__ if PY3 else __bytes__
+    def __bytes__(self):
+        if self.dest is None:
+            return b('')
+        return b('').join(tobytes(
+            string_at(byref(i), sizeof(i)), 'latin-1'
+        ) for i in self.parts)
 
-    @property
-    def array(self):
-        return bytearray(self.__bytes__())
-
-
-class BaseMultipart(object):
-
-    def __init__(self, size, source=None):
-        self.source = source
-        self.more = self.rc = self.dest = None
-        self.parts = []
-        self.__add__ = self.parts.append
-
-    def __bytes__(self, *args, **kwargs):
-        if self.dest is not None:
-            return b('').join(tobytes(
-                string_at(byref(i), sizeof(i)), 'latin-1') for i in self.parts)
-        return b('')
-
-    def __unicode__(self, *args, **kwargs):
-        return tounicode(tobytes(self, 'utf-8'))
+    def __unicode__(self):
+        return tounicode(self.__bytes__(), 'utf-8')
 
     __str__ = __unicode__ if PY3 else __bytes__
 
 
 class Message(BaseMsg):
 
-    __slots__ = 'source dest last_rc more size'.split()
+    __slots__ = 'raw source dest last_rc more size'.split()
 
     def __init__(self, size, source=None):
         super(Message, self).__init__(size, source)
+        self.source = source
         if source is None:
-            self.dest = xs.array(c_ubyte, size)
+            self.parts.append(xs.array(c_ubyte, size))
 
     def __len__(self):
-        return self.size if self.size is not None else len(self.source) if self.dest is None else sizeof(self.dest)
+        if self.size is not None:
+            return self.size
+        if self.dest is None:
+            return len(self.source)
+        return sizeof(self.dest)
 
     @property
-    def ref(self):
-        return None if self.dest is None else byref(self.dest)
-
-
-class Multipart(BaseMultipart):
-
-    def __init__(self, size, source=None):
-        super(Multipart, self).__init__(size, source)
-
-    def __len__(self):
-
-        return len(self.source) if self.dest is None else sizeof(self.dest)
-
-    @property
-    def ref(self):
-        return None if self.dest is None else byref(self.dest)
+    def msgmore(self):
+        more = c_int()
+        xs.getmsgopt(self.dest, XS.MORE, more, c_size_t(sizeof(more)))
+        return more.value
 
 
 class XSMessage(BaseMsg):
 
-    __slots__ = 'source dest last_rc more ref'.split()
+    __slots__ = 'source dest rc more ref'.split()
 
     def __init__(self, data=None):
         super(XSMessage, self).__init__(32, data)
-        self.dest = self.ref = dest = xs.msg_t()
+        self.dest = self.source = dest = xs.msg_t()
         if data is None:
-            self.last_rc = xs.msg_init(dest)
+            self.rc = xs.msg_init(dest)
         else:
-            self.last_rc = xs.msg_init_size(dest, 32)
+            self.rc = xs.msg_init_size(dest, 32)
             xs.msg_init_size(dest, 32)
             xs.memmove(xs.msg_data(dest), data, 32)
 
         return sizeof(self.dest)
 
     def close(self):
-        self.last_rc = xs.msg_close(self.dest)
+        rc = 0
+        self.rc = [rc | xs.msg_close(i) for i in self.parts][-1]
 
+    __del__ = close
 
-class XSMultipart(BaseMultipart):
 
-    def __init__(self, size, source=None):
-        super(XSMultipart, self).__init__(size, source)
+#class BaseMultipart(object):
+#
+#    def __init__(self, size, source=None):
+#        self.source = source
+#        self.more = self.rc = self.dest = None
+#        self.parts = []
+#        self.__add__ = self.parts.append
 
-    def __len__(self):
-        return sizeof(self.dest)
+#
+#
+#class Multipart(BaseMultipart):
+#
+#    def __init__(self, size, source=None):
+#        super(Multipart, self).__init__(size, source)
+#
+#    def __len__(self):
+#        return len(self.source) if self.dest is None else sizeof(self.dest)
+#
+#
+#class XSMultipart(BaseMultipart):
+#
+#    def __init__(self, size, source=None):
+#        super(XSMultipart, self).__init__(size, source)
+#
+#    def __len__(self):
+#        return sizeof(self.dest)
+#
 
-    def close(self):
-        self.last_rc = sum(xs.msg_close(i) for i in self.parts)

tests/test_low_level.py

         ctx = self.context_class()
         s = ctx.push(linger=100)
         with s.connect_tcp('127.0.0.1:5560'):
-            #  Send a message.
+            # send a message.
             s.send(b'r')
             self.assertEqual(s.last_rc, 1)
-        #  Terminate the context. This should take 0.1 second.
+        # terminate the context. this should take 0.1 second.
         watch = self.xs.stopwatch_start()
         ctx.close()
         self.time_assert(self.xs.stopwatch_stop(watch) / 1000, 100)
 
     def test_max_sockets(self):
         try:
-            # Create context and set MAX_SOCKETS to 1.
+            # create context and set MAX_SOCKETS to 1
             ctx = self.context_class(max_sockets=1)
-            # First socket should be created OK.
+            # first socket should be created OK
             s1 = ctx.push()
-            # Creation of second socket should fail.
+            # creation of second socket should fail
             try:
                 ctx.push()
             except self.xs.XSError as e:
         except:
             raise
         finally:
-            # Clean up.
+            # clean up
             self.assertEqual(s1.close(), 0)
             self.assertEqual(ctx.close(), 0)
 
     def test_shutdown(self):
         from time import sleep
-        # Create infrastructure.
+        # create infrastructure
         ctx = self.context_class()
         push = ctx.push()
         pull = ctx.pull()
         with push.bind_tcp('127.0.0.1:5560'), pull.connect_tcp('127.0.0.1:5560'):
             push_id = push.last_rc
             self.assertNotEqual(push_id, -1)
-            # Pass one message through to ensure the connection is established.
+            # pass one message through to ensure the connection is established
             self.assertEqual(push.send(b'ABC').last_rc, 3)
             pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
-            # Shut down the bound endpoint.
+            # shut down the bound endpoint
             self.assertEqual(push.shutdown(push_id).last_rc, 0)
             sleep(1)
+            # check that sending would block (there's no outbound connection)
             try:
-                # Check that sending would block (there's no outbound
-                # connection).
+
                 push.send(b'ABC', nowait=True)
             except self.xs.XSError as e:
                 self.assertTrue(e.errno, self.XS.EAGAIN)
-        # Now the other way round...Create infrastructure.
+        # now the other way round...Create infrastructure
         ctx = self.context_class()
         pull = ctx.pull()
         push = ctx.push()
         with pull.bind_tcp('127.0.0.1:5560'), push.connect_tcp('127.0.0.1:5560'):
             push_id = push.last_rc
             self.assertNotEqual(push_id, -1)
-            # Pass one message through to ensure the connection is established.
+            # pass one message through to ensure the connection is established
             self.assertEqual(push.send(b'ABC').last_rc, 3)
             pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
-            # Shut down the bound endpoint.
+            # shut down the bound endpoint
             self.assertEqual(push.shutdown(push_id).last_rc, 0)
             sleep(1)
+            # check that sending would block (there's no outbound connection)
             try:
-                # Check that sending would block (there's no outbound
-                # connection).
                 push.send(b'ABC', nowait=True)
             except self.xs.XSError as e:
                 self.assertTrue(e.errno, self.XS.EAGAIN)
 
     def bounce(self, sb, sc):
         content = b'12345678ABCDEFGH12345678abcdefgh'
-        #  Send the message.
+        # send the message
         self.assertEqual(sc.send(content, more=True).last_rc, 32)
         self.assertEqual(sc.send(content).last_rc, 32)
-        #  Bounce the message back.
+        # bounce the message back
         buf1 = sb.recv(32)
         self.assertEqual(len(buf1), 32)
         self.assertEqual(buf1.more, 1)
         self.assertEqual(buf2.more, 0)
         self.assertEqual(sb.send(bytes(buf1), more=True).last_rc, 32)
         self.assertEqual(sb.send(bytes(buf2)).last_rc, 32)
-        #  Receive the bounced message.
+        # receive the bounced message
         buf1 = sc.recv(32)
         self.assertEqual(buf1.last_rc, 32)
         self.assertEqual(buf1.more, 1)
         buf2 = sc.recv(32)
         self.assertEqual(len(buf2), 32)
         self.assertEqual(buf2.more, 0)
-        #  Check whether the message is still the same.
+        # check whether the message is still the same
         self.assertEqual(bytes(buf1), content)
 
     def test_pair_ipc(self):
         sc = self.ctx.pair()
         with sb.bind_tcp('127.0.0.1:5560'), sc.connect_tcp('127.0.0.1:5560'):
             self.bounce(sb, sc)
-            # Now let's try to open one more connection to the bound socket.
-            # The connection should be silently rejected rather than causing
-            # error.
+            # Now let's try to open one more connection to the bound socket. The
+            # connection should be silently rejected rather than causing error.
             sc2 = self.ctx.pair()
             with sc2.connect_tcp('127.0.0.1:5560'):
                 sleep(1)
             self.bounce(sb, sc)
 
     def test_msg_flags(self):
-        # Create the infrastructure
+        # create the infrastructure
         sb = self.ctx.xrep()
         sc = self.ctx.xreq()
         with sb.bind_inproc('a'), sc.connect_inproc('a'):
             self.assertNotEqual(sb.last_rc, -1)
             self.assertNotEqual(sc.last_rc, -1)
-            # Send 2 - part message.
+            # send 2 - part message
             self.assertEqual(sc.send(b'A', more=True).last_rc, 1)
             self.assertEqual(sc.send(b'B').last_rc, 1)
-            # Identity comes first.
+            # identity comes first
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
             self.assertEqual(msg.more, 1)
-            # Then the first part of the message body.
+            # then the first part of the message body
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
             self.assertEqual(msg.more, 1)
-            # And finally, the second part of the message body.
+            # and finally, the second part of the message body
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
             self.assertEqual(msg.more, 0)
 
     def test_wireformat(self):
         import socket
-        # Create the basic infrastructure.
+        # create the basic infrastructure
         push = self.ctx.push()
         pull = self.ctx.pull()
-        # Bind the peer and get the message.
+        # bind the peer and get the message
         with pull.bind_tcp('127.0.0.1:5560'), push.bind_tcp('127.0.0.1:5561'):
-            # Connect to the peer using raw sockets.
+            # connect to the peer using raw sockets
             rpush = socket.socket(
                 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
             )
                 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
             )
             rpull.connect(('127.0.0.1', 5561))
-            # Let's send some data and check if it arrived
+            # let's send some data and check if it arrived
             self.assertEqual(rpush.send(b'\x04\0abc', 0), 5)
             buf = pull.recv(3)
             self.assertEqual(pull.last_rc, 3)
             self.assertEqual(bytes(buf), b'abc')
-            # Let's push this data into another socket
+            # let's push this data into another socket
             self.assertEqual(push.send(bytes(buf)).last_rc, 3)
             self.assertNotEqual(rpull.recv(3), b'\x04\0abc')
             rpush.close()
     def test_reconnect(self):
         import sys
         from time import sleep
-        # Create the basic infrastructure.
+        # create the basic infrastructure
         push = self.ctx.push()
         pull = self.ctx.pull()
-        # Connect before bind was done at the peer and send one message.
+        # connect before bind was done at the peer and send one message
         with push.connect_tcp('127.0.0.1:5560'):
             self.assertEqual(push.send(b'ABC').last_rc, 3)
-            # Wait a while for few attempts to reconnect to happen.
+            # wait a while for few attempts to reconnect to happen
             sleep(1)
-            # Bind the peer and get the message.
+            # bind the peer and get the message
             with pull.bind_tcp('127.0.0.1:5560'):
                 msg = pull.recv(3)
                 self.assertEqual(msg.last_rc, 3)
         if sys.platform not in ('win32', 'cygwin'):
-            # Now, let's test the same scenario with IPC.
+            # now, let's test the same scenario with IPC
             push = self.ctx.push()
             pull = self.ctx.pull()
             with push.connect_ipc('/tmp/tester'):
-                # Connect before bind was done at the peer and send one message
+                # connect before bind was done at the peer and send one message
                 self.assertEqual(push.send(b'ABC').last_rc, 3)
-                # Wait a while for few attempts to reconnect to happen.
+                # wait a while for few attempts to reconnect to happen
                 sleep(1)
-                # Bind the peer and get the message.
+                # bind the peer and get the message
                 with pull.bind_ipc('/tmp/tester'):
                     pull.recv(3)
                     self.assertEqual(pull.last_rc, 3)
                         self.assertEqual(sc.last_rc, 0)
                 except self.xs.XSError as e:
                     self.assertTrue(e.errno, self.XS.EAGAIN)
-            # There should be now 4 messages pending, consume them.
+            # there should be now 4 messages pending, consume them.
             for i in lrange(4):
                 sb.recv(0)
                 self.assertEqual(sb.last_rc, 0)
-            # Now it should be possible to send one more.
+            # now it should be possible to send one more.
             self.assertEqual(sc.send(None, 0).last_rc, 0)
-            # Consume the remaining message.
+            # consume the remaining message
             sb.recv(0)
             self.assertEqual(sb.last_rc, 0)
             s1 = self.ctx.pull()
                     self.assertEqual(s1.last_rc, 4)
 
     def test_survey(self):
-        # Create the basic infrastructure.
+        # create the basic infrastructure
         xsurveyor = self.ctx.xsurveyor()
         xrespondent = self.ctx.xrespondent()
         surveyor = self.ctx.surveyor()
         with xsurveyor.bind_inproc('a'), xrespondent.bind_inproc('b'), \
             surveyor.connect_inproc('b'), respondent1.connect_inproc('a'), \
             respondent2.connect_inproc('a'):
-            # Send the survey.
+            # end the survey
             self.assertEqual(surveyor.send(b'ABC').last_rc, 3)
             # Forward the survey through the intermediate device. Survey consist
             # of identity(4 bytes), survey ID(4 bytes) and the body.
             buf = xrespondent.recv(3)
             self.assertEqual(buf.last_rc, 3)
             self.assertEqual(xsurveyor.send(bytes(buf)).last_rc, 3)
-            # Respondent 1 responds to the survey.
+            # respondent 1 responds to the survey
             buf = respondent1.recv(3)
             self.assertEqual(buf.last_rc, 3)
             self.assertEqual(respondent1.send(b'DE').last_rc, 2)
-            # Forward the response through the intermediate device.
+            # forward the response through the intermediate device
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
             self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
             buf = xsurveyor.recv(2)
             self.assertEqual(buf.last_rc, 2)
             self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 2)
-            # Surveyor gets the response.
+            # surveyor gets the response
             buf = surveyor.recv(2)
             self.assertEqual(buf.last_rc, 2)
-            # Respondent 2 responds to the survey.
+            # respondent 2 responds to the survey
             buf = respondent2.recv(3)
             self.assertEqual(buf.last_rc, 3)
             self.assertEqual(respondent2.send(b'FGHI').last_rc, 4)
-            # Forward the response through the intermediate device.
+            # forward the response through the intermediate device
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
             self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
             buf = xsurveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
             self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 4)
-            # Surveyor gets the response.
+            # surveyor gets the response
             buf = surveyor.recv(4)
             self.assertEqual(buf.last_rc, 4)
-            # Now let's test whether survey timeout works as expected.
+            # now let's test whether survey timeout works as expected
             surveyor.survey_timeout = 100
             self.assertEqual(surveyor.last_rc, 0)
             self.assertEqual(surveyor.send(b'ABC', 3, 0).last_rc, 3)
             surveyor.survey_timeout = 100
             self.assertEqual(surveyor.last_rc, 0)
             self.assertEqual(surveyor.send(b'DE').last_rc, 2)
-            # Read, process and reply to the old survey.
+            # read, process and reply to the old survey
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
             self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
             buf = xrespondent.recv(3)
             self.assertEqual(buf.last_rc, 3)
             self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 3)
-            # Read, process and reply to the new survey.
+            # read, process and reply to the new survey
             buf = xrespondent.recv(4)
             self.assertEqual(buf.last_rc, 4)
             self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
 #
     def test_invalid_rep(self):
         from stuf.six import int2byte
-        # Create REQ/XREP wiring.
+        # create REQ/XREP wiring
         xrep_socket = self.ctx.xrep(linger=0)
         req_socket = self.ctx.req(linger=0)
         with xrep_socket.bind_inproc('hi'), req_socket.connect_inproc('hi'):
-            # Initial request.
+            # initial request.
             self.assertEqual(req_socket.send(b'r').last_rc, 1)
-            # Receive the request.
+            # receive the request
             addr = xrep_socket.recv(32)
             addr_size = addr.last_rc
             self.assertEqual(addr_size, 5)
             self.assertEqual(bottom.last_rc, 0)
             body = xrep_socket.recv(1)
             self.assertEqual(body.last_rc, 1)
-            # Send invalid reply.
+            # send invalid reply
             self.assertEqual(xrep_socket.send(addr.dest, 5).last_rc, addr_size)
-            # Send valid reply.
+            # send valid reply
             self.assertEqual(
                 xrep_socket.send(addr.dest, 5, more=True).last_rc, addr_size,
             )
                 xrep_socket.send(bottom.dest, 0, more=True).last_rc, 0
             )
             self.assertEqual(xrep_socket.send(b'b').last_rc, 1)
-            # Check whether we've got the valid reply.
+            # check whether we've got the valid reply
             body = req_socket.recv(1, nowait=True)
             self.assertEqual(body.last_rc, 1)
             self.assertEqual(int2byte(body.dest[0]), b'b')
 
     def test_sub_forward(self):
         from time import sleep
-        # First, create an intermediate device.
+        # first, create an intermediate device
         xpub = self.ctx.xpub()
         xsub = self.ctx.xsub()
-        # Create a publisher.
+        # create a publisher
         pub = self.ctx.pub()
         sub = self.ctx.sub()
         with xpub.bind_tcp('127.0.0.1:5560'), xsub.bind_tcp('127.0.0.1:5561'), \
-          pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
-            # Create a subscriber and subscribe for all messages
+           pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
+            # create a subscriber and subscribe for all messages
             sub.subscribe = ''
             self.assertEqual(sub.last_rc, 0)
-            # Pass the subscription upstream through the device.
+            # pass the subscription upstream through the device
             buf = xpub.recv(32)
             self.assertEqual(xpub.last_rc, 4)
             self.assertEqual(xsub.send(buf.dest, 4).last_rc, 4)
-            # Wait a bit till the subscription gets to the publisher.
+            # wait a bit till the subscription gets to the publisher
             sleep(1)
-            # Send an empty message.
+            # send an empty message
             self.assertEqual(pub.send(None, 0).last_rc, 0)
-            # Pass the message downstream through the device.
-            buf = xsub.recv(32, nowait=True)
+            # pass the message downstream through the device
+            buf = xsub.recv(32)
             self.assertTrue(xsub.last_rc >= 0)
             self.assertTrue(xpub.send(buf.dest, buf.last_rc).last_rc >= 0)
-            # Receive the message in the subscriber.
+            # receive the message in the subscriber
             sub.recv(xpub.last_rc)
             self.assertEqual(sub.last_rc, 0)
 
-#    def test_regrep_device(self):
-#        # Create a req/rep device.
-#        xreq = self.ctx.xreq()
-#        xrep = self.ctx.xrep()
-#        # Create a worker.
-#        rep = self.ctx.rep()
-#        # Create a client.
-#        req = self.ctx.req()
-#        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
-#            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
-#            # Send a request.
-#            self.assertEqual(req.send(b'ABC', more=True).last_rc, 3)
-#            self.assertEqual(req.send(b'DEF').last_rc, 3)
-#            # Pass the request through the device.
-#            for i in lrange(4):
-#                msg = xrep.recvmsg()
-#                self.assertTrue(msg.last_rc >= 0)
-#                self.assertTrue(xreq.sendmsg(
-#                    bytes(msg.dest), True if msg.more else False
-#                ).last_rc >= 0)
-            # Receive the request.
-#            buff = rep.recv(3)
+    def test_regrep_device(self):
+        # create a req/rep device
+        xreq = self.ctx.xreq()
+        xrep = self.ctx.xrep()
+        # create a worker
+        rep = self.ctx.rep()
+        # create a client
+        req = self.ctx.req()
+        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
+            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
+            # send a request
+            self.assertEqual(req.send(b'ABC', 3, more=True).last_rc, 3)
+            self.assertEqual(req.send(b'DEF', 3).last_rc, 3)
+            # pass the request through the device
+            for i in lrange(4):
+                msg = xrep.recvmsg()
+                self.assertTrue(msg.last_rc >= 0)
+                self.assertTrue(xreq.sendmsg(
+                    msg.dest, True if msg.more else False
+                ).last_rc >= 0)
+#            # receive the request
+            buff = rep.recv(3)
 #            self.assertEqual(buff.last_rc, 3)
 #            self.assertEqual(bytes(buff), b'ABC')
 #            self.assertEqual(buff.more, 0)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.