Commits

Lynn Rees committed 3e6c695

- yeah, like that

Comments (0)

Files changed (5)

crossroads/constants.py

 INT_OPTS = frozenset((
     SNDHWM, RCVHWM, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, LINGER, RECONNECT_IVL,
     RECONNECT_IVL_MAX, BACKLOG, MULTICAST_HOPS, RCVTIMEO, SNDTIMEO, IPV4ONLY,
-    KEEPALIVE, SURVEY_TIMEOUT,
+    KEEPALIVE, SURVEY_TIMEOUT, RCVMORE,
 ))
 INT64_OPTS = ((AFFINITY, MAXMSGSIZE))
 

crossroads/low.py

 xsget = partial(getattr, xs)
 
 
-class Options(Setter):
-
-    def __init__(self, **options):
-        self._options = options
-
-    def __setattr__(self, key, value, setr=object.__setattr__):
-        try:
-            this = bigxsget(key.upper())
-            self._options[this] = value
-            setr(self, key.lower(), value)
-        except AttributeError:
-            setr(self, key, value)
-
-
-class Context(Options):
+class Context(Setter):
 
     def __init__(
         self, threads=XS._IO_THREADS, max_sockets=XS._MAX_SOCKETS, **options
     ):
-        super(Context, self).__init__(**options)
+        self._options = options
+        super(Context, self).__init__()
         self._ctx = xs.init()
-        self.set(
+        self._set(
             XS._IO_THREADS,
             XS.IO_THREADS,
             threads,
             'must use 1 or more I/O threads',
         )
-        self.set(
+        self._set(
             XS._MAX_SOCKETS,
             XS.MAX_SOCKETS,
             max_sockets,
             try:
                 this = bigxsget(key.upper())
                 if this in XS.SOCKET_TYPES:
-                    return self._setter(key.lower(), self.open, this)
+                    return self._setter(key.lower(), self._open, this)
             except AttributeError:
                 raise AttributeError(key)
 
-    def set(self, default, key, value, msg):
+    def __setattr__(self, key, value, setr=object.__setattr__):
+        try:
+            this = bigxsget(key.upper())
+            self._options[this] = value
+            setr(self, key.lower(), value)
+        except AttributeError:
+            setr(self, key, value)
+
+    def _set(self, default, key, value, msg):
         if value != default:
             if value > 0:
                 value = c_int(value)
             else:
                 raise xs.XSOperationError(msg)
 
-    def open(self, stype, **options):
+    def _open(self, stype, **options):
         return Socket(self, stype, **options)
 
     def close(self):
     __del__ = close
 
 
-class Socket(Options):
+class Socket(Setter):
 
     def __init__(self, context, stype, **options):
         self.context = context
         self._type = stype
         self._socket = xs.socket(context._ctx, stype)
         options.update(context._options)
-        self.set(**options)
+        self._setmany(**options)
         context.sockets[self.id] = self
         self.last_rc = None
         self.closed = False
         self.connections = OrderedDict()
         self.bindings = OrderedDict()
 
+    def __enter__(self):
+        return self
+
     def __getattr__(self, key, getr=object.__getattribute__):
         try:
             return getr(self, key)
         except AttributeError:
             try:
-                if not key.startswith('__'):
-                    if key.startswith('_'):
+                return self._setter(
+                    key, xsget(key.lower().strip('_')), self._socket
+                )
+            except AttributeError:
+                try:
+                    return self._get(key)
+                except AttributeError:
+                    if key.startswith(('bind', 'connect')):
+                        method, prefix = key.split('_')
                         return self._setter(
-                            key, xsget(key.lower().strip('_')), self._socket
+                            key, getattr(self, '_s' + method), PREFIX[prefix]
                         )
-                    elif key.startswith('bind'):
-                        return self._setter(
-                            key, self._ubind, PREFIX[split(key)[-1]]
-                        )
-                    elif key.startswith('connect'):
-                        return self._setter(
-                            key, self._uconnect, PREFIX[split(key)[-1]]
-                        )
-            except AttributeError:
-                raise AttributeError(key)
+                    raise AttributeError(key)
 
-    @lazy
-    def id(self):
-        return uuid4().get_hex().upper()
+    def __setattr__(self, key, value, setr=object.__setattr__):
+        try:
+            self._set(key, value)
+        except AttributeError:
+            setr(self, key, value)
 
-    def set(self, **options):
-        INT, INT64, BINARY = XS.INT_OPTS, XS.INT64_OPTS, XS.BINARY_OPTS
-        setsocket, getr = self._setsockopt, getattr
+    def __exit__(self, e, c, b):
+        self.close()
+        if any((e is not None, c is not None, b is not None)):
+            return False
+
+    def _get(self, key):
+        const = bigxsget(key.upper())
+        if const in XS.INT_OPTS:
+            value = c_int()
+        elif const in XS.INT64_OPTS:
+            value = c_int64()
+        elif const in XS.BINARY_OPTS:
+            value = c_char_p()
+        self._getsockopt(const, value, c_size_t(sizeof(value)))
+        return value.value
+
+    def _set(self, key, value):
+        const = bigxsget(key.upper())
+        if const in XS.INT_OPTS:
+            value = c_int(value)
+        elif const in XS.INT64_OPTS:
+            value = c_int64(value)
+        elif const in XS.BINARY_OPTS:
+            value = c_char_p(value)
+        self.last_rc = self._setsockopt(
+            const, byref(value), 0 if const == XS.SUBSCRIBE else sizeof(value)
+        )
+        return self.last_rc
+
+    def _setmany(self, **options):
+        _set = self._set
         rc = 0
         for k, v in items(options):
-            const = getr(XS, k.upper())
-            if const in INT:
-                value = c_int(v)
-            elif const in INT64:
-                value = c_int64(v)
-            elif const in BINARY:
-                value = c_char_p(v)
-            rc |= setsocket(const, byref(value), sizeof(value))
-            del value
+            rc |= _set(k, v)
         self.last_rc = rc
         return self
 
-    def __enter__(self):
-        return self
-
-    def _ubind(self, prefix , *addresses):
+    def _sbind(self, prefix , *addresses):
         for address in addresses:
             # coerce to bytes
             self.last_rc = self._bind(prefix + tobytes(address))
             self.bindings[uuid4().get_hex().upper()] = self.last_rc
         return self
 
-    def _uconnect(self, prefix, *addresses):
+    def _sconnect(self, prefix, *addresses):
         for address in addresses:
             # coerce to bytes
             self.last_rc = self._connect(prefix + tobytes(address))
             self.connections[uuid4().get_hex().upper()] = self.last_rc
         return self
 
+    @lazy
+    def id(self):
+        return uuid4().get_hex().upper()
+
     def send(self, data, size=None, more=False, nowait=False):
         msg = Message(size, data)
         msg.last_rc = self.last_rc = self._send(
         msg.last_rc = self.last_rc = self._recv(
             msg.dest, len(msg), XS.DONTWAIT if nowait else 0
         )
-        more = c_int()
-        self._getsockopt(XS.RCVMORE, more, c_size_t(sizeof(more)))
-        msg.more = more.value
+        msg.more = self.rcvmore
         return msg
 
     def sendmsg(self, data, more=False, nowait=False):
         return self
 
     def close(self):
-        rc = self._close()
+        rc = xs.close(self._socket)
         del self.context.sockets[self.id]
         self._socket = self.context = self.last_rc = None
         if not rc:
             self.closed = True
         return rc
-
-    def __exit__(self, e, c, b):
-        self.close()
-        if any((e is not None, c is not None, b is not None)):
-            return False

crossroads/message.py

 
     def __init__(self, size, source=None):
         self.source = source
+        self.size = size
         self.more = self.rc = self.dest = None
 
     def __bytes__(self, *args, **kwargs):
     def dest(self):
         return self.parts[-1]
 
-    @property
-    def array(self):
-        return bytearray(self.__bytes__())
-
 
 class Message(BaseMsg):
 
-    __slots__ = 'source dest last_rc more'.split()
+    __slots__ = 'source dest last_rc more size'.split()
 
     def __init__(self, size, source=None):
         super(Message, self).__init__(size, source)
             self.dest = xs.array(c_ubyte, size)
 
     def __len__(self):
-        return len(self.source) if self.dest is None else sizeof(self.dest)
+        return self.size if self.size is not None else len(self.source) if self.dest is None else sizeof(self.dest)
 
     @property
     def ref(self):
         super(Multipart, self).__init__(size, source)
 
     def __len__(self):
+
         return len(self.source) if self.dest is None else sizeof(self.dest)
 
     @property

tests/test_low_level.py

             sleep(1)
             # Bind the peer and get the message.
             with pull.bind_tcp('127.0.0.1:5560'):
-                pull.recv(3)
-                self.assertEqual(pull.last_rc, 3)
+                msg = pull.recv(3)
+                self.assertEqual(msg.last_rc, 3)
         if sys.platform not in ('win32', 'cygwin'):
             # Now, let's test the same scenario with IPC.
             push = self.ctx.push()
                     s1.recv(4)
                     self.assertEqual(s1.last_rc, 4)
 
+    def test_survey(self):
+        # Create the basic infrastructure.
+        xsurveyor = self.ctx.xsurveyor()
+        xrespondent = self.ctx.xrespondent()
+        surveyor = self.ctx.surveyor()
+        respondent1 = self.ctx.respondent()
+        respondent2 = self.ctx.respondent()
+        with xsurveyor.bind_inproc('a'), xrespondent.bind_inproc('b'), \
+            surveyor.connect_inproc('b'), respondent1.connect_inproc('a'), \
+            respondent2.connect_inproc('a'):
+            # Send the survey.
+            self.assertEqual(surveyor.send(b'ABC').last_rc, 3)
+            # Forward the survey through the intermediate device. Survey consist
+            # of identity(4 bytes), survey ID(4 bytes) and the body.
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xsurveyor.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xsurveyor.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(3)
+            self.assertEqual(buf.last_rc, 3)
+            self.assertEqual(xsurveyor.send(bytes(buf)).last_rc, 3)
+            # Respondent 1 responds to the survey.
+            buf = respondent1.recv(3)
+            self.assertEqual(buf.last_rc, 3)
+            self.assertEqual(respondent1.send(b'DE').last_rc, 2)
+            # Forward the response through the intermediate device.
+            buf = xsurveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xsurveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xsurveyor.recv(2)
+            self.assertEqual(buf.last_rc, 2)
+            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 2)
+            # Surveyor gets the response.
+            buf = surveyor.recv(2)
+            self.assertEqual(buf.last_rc, 2)
+            # Respondent 2 responds to the survey.
+            buf = respondent2.recv(3)
+            self.assertEqual(buf.last_rc, 3)
+            self.assertEqual(respondent2.send(b'FGHI').last_rc, 4)
+            # Forward the response through the intermediate device.
+            buf = xsurveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xsurveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xsurveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 4)
+            # Surveyor gets the response.
+            buf = surveyor.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            # Now let's test whether survey timeout works as expected.
+            surveyor.survey_timeout = 100
+            self.assertEqual(surveyor.last_rc, 0)
+            self.assertEqual(surveyor.send(b'ABC', 3, 0).last_rc, 3)
+            watch = self.xs.stopwatch_start()
+            try:
+                surveyor.recv(4)
+            except self.xs.XSError as e:
+                self.assertEqual(e.errno, self.XS.ETIMEDOUT)
+            self.time_assert(self.xs.stopwatch_stop(watch) / 1000, 100)
+            # Test whether responses for old surveys are discarded. First,
+            # initiate new survey.
+            surveyor.survey_timeout = 100
+            self.assertEqual(surveyor.last_rc, 0)
+            self.assertEqual(surveyor.send(b'DE').last_rc, 2)
+            # Read, process and reply to the old survey.
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(3)
+            self.assertEqual(buf.last_rc, 3)
+            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 3)
+            # Read, process and reply to the new survey.
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(4)
+            self.assertEqual(buf.last_rc, 4)
+            self.assertEqual(xrespondent.send(bytes(buf), more=True).last_rc, 4)
+            buf = xrespondent.recv(2)
+            self.assertEqual(buf.last_rc, 2)
+            self.assertEqual(xrespondent.send(bytes(buf)).last_rc, 2)
+            # Get the response and check it's the response to the new survey and
+            # that response to the old survey was silently discarded.
+            self.assertEqual(surveyor.recv(2).last_rc, 2)
+#
+    def test_invalid_rep(self):
+        from stuf.six import int2byte
+        # Create REQ/XREP wiring.
+        xrep_socket = self.ctx.xrep(linger=0)
+        req_socket = self.ctx.req(linger=0)
+        with xrep_socket.bind_inproc('hi'), req_socket.connect_inproc('hi'):
+            # Initial request.
+            self.assertEqual(req_socket.send(b'r').last_rc, 1)
+            # Receive the request.
+            addr = xrep_socket.recv(32)
+            addr_size = addr.last_rc
+            self.assertEqual(addr_size, 5)
+            bottom = xrep_socket.recv(1)
+            self.assertEqual(bottom.last_rc, 0)
+            body = xrep_socket.recv(1)
+            self.assertEqual(body.last_rc, 1)
+            # Send invalid reply.
+            self.assertEqual(xrep_socket.send(addr.dest, 5).last_rc, addr_size)
+            # Send valid reply.
+            self.assertEqual(
+                xrep_socket.send(addr.dest, 5, more=True).last_rc, addr_size,
+            )
+            self.assertEqual(
+                xrep_socket.send(bottom.dest, 0, more=True).last_rc, 0
+            )
+            self.assertEqual(xrep_socket.send(b'b').last_rc, 1)
+            # Check whether we've got the valid reply.
+            body = req_socket.recv(1, nowait=True)
+            self.assertEqual(body.last_rc, 1)
+            self.assertEqual(int2byte(body.dest[0]), b'b')
+
     def test_sub_forward(self):
         from time import sleep
         # First, create an intermediate device.
         pub = self.ctx.pub()
         sub = self.ctx.sub()
         with xpub.bind_tcp('127.0.0.1:5560'), xsub.bind_tcp('127.0.0.1:5561'), \
-                pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
+          pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
             # Create a subscriber and subscribe for all messages
-            self.assertEqual(sub.set(subscribe='').last_rc, 0)
+            sub.subscribe = ''
+            self.assertEqual(sub.last_rc, 0)
             # Pass the subscription upstream through the device.
             buf = xpub.recv(32)
-            self.assertTrue(xpub.last_rc >= 0)
-            self.assertTrue(xsub.send(bytes(buf), 32).last_rc >= 0)
+            self.assertEqual(xpub.last_rc, 4)
+            self.assertEqual(xsub.send(buf.dest, 4).last_rc, 4)
             # Wait a bit till the subscription gets to the publisher.
             sleep(1)
             # Send an empty message.
             self.assertEqual(pub.send(None, 0).last_rc, 0)
             # Pass the message downstream through the device.
-            buf = xsub.recv(pub.last_rc, nowait=True)
+            buf = xsub.recv(32, nowait=True)
             self.assertTrue(xsub.last_rc >= 0)
-            self.assertTrue(xpub.send(bytes(buf)).last_rc >= 0)
+            self.assertTrue(xpub.send(buf.dest, buf.last_rc).last_rc >= 0)
             # Receive the message in the subscriber.
             sub.recv(xpub.last_rc)
             self.assertEqual(sub.last_rc, 0)
 
+#    def test_regrep_device(self):
+#        # Create a req/rep device.
+#        xreq = self.ctx.xreq()
+#        xrep = self.ctx.xrep()
+#        # Create a worker.
+#        rep = self.ctx.rep()
+#        # Create a client.
+#        req = self.ctx.req()
+#        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
+#            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
+#            # Send a request.
+#            self.assertEqual(req.send(b'ABC', more=True).last_rc, 3)
+#            self.assertEqual(req.send(b'DEF').last_rc, 3)
+#            # Pass the request through the device.
+#            for i in lrange(4):
+#                msg = xrep.recvmsg()
+#                self.assertTrue(msg.last_rc >= 0)
+#                self.assertTrue(xreq.sendmsg(
+#                    bytes(msg.dest), True if msg.more else False
+#                ).last_rc >= 0)
+            # Receive the request.
+#            buff = rep.recv(3)
+#            self.assertEqual(buff.last_rc, 3)
+#            self.assertEqual(bytes(buff), b'ABC')
+#            self.assertEqual(buff.more, 0)
+#            buff2 = rep.recv(3)
+#            self.assertEqual(buff2.last_rc, 3)
+#            self.assertEqual(bytes(buff2), b'DEF')
+#            self.assertEqual(buff2.more, 0)
+#            # Send the reply.
+#            self.assertEqual(rep.send(b'GHI', more=True).last_rc, 3)
+#            self.assertEqual(rep.send(b'JKL').last_rc, 3)
+#            # Pass the reply through the device.
+#            for i in lrange(4):
+#                msg = xrep.recvmsg()
+#                self.assertTrue(msg.last_rc >= 0)
+#                self.assertTrue(
+#                    xreq.sendmsg(bytes(msg.dest), more=True if msg.more else False).last_rc >= 0
+#                )
+#            # Receive the reply.
+#            buff = req.recv(3)
+#            self.assertEqual(buff.last_rc, 3)
+#            self.assertEqual(bytes(buff), b'GHI')
+#            self.assertEqual(buff.more, 0)
+#            buff2 = req.recv(3)
+#            self.assertEqual(buff2.last_rc, 3)
+#            self.assertEqual(bytes(buff2), b'JKL')
+#            self.assertEqual(buff2.more, 0)
+#
 #    def test_resubscribe(self):
 #        from stuf.six import int2byte
-#        from time import sleep
-#        from ctypes import sizeof, c_ubyte
-#        from crossroads.lowest import array
-#        XS, xs = twoget(self)
+##        from time import sleep
 #        # Create the basic infrastructure.
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        xpub = xs.socket(ctx, XS.XPUB)
-#        self.assertTrue(xpub)
-#        sub = xs.socket(ctx, XS.SUB)
-#        self.assertTrue(sub)
+#        xpub = self.ctx.xpub()
+#        sub = self.ctx.sub()
 #        # Send two subscriptions upstream.
-#        self.assertNotEqual(xs.bind(xpub, b'tcp://127.0.0.1:5560'), -1)
-#        self.assertEqual(xs.setsockopt(sub, XS.SUBSCRIBE, b'a', 1), 0)
-#        self.assertEqual(xs.setsockopt(sub, XS.SUBSCRIBE, b'b', 1), 0)
-#        self.assertNotEqual(xs.connect(sub, b'tcp://127.0.0.1:5560'), -1)
-#        # Check whether subscriptions are correctly received.
-#        buf = array(c_ubyte, 5)
-#        self.assertEqual(xs.recv(xpub, buf, sizeof(buf), 0), 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'a')
-#        self.assertEqual(xs.recv(xpub, buf, sizeof(buf), 0), 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'b')
-#        # Tear down the connection.
-#        self.assertEqual(xs.close(xpub), 0)
-#        sleep(1)
-#        # Re-establish the connection.
-#        xpub = xs.socket(ctx, XS.XPUB)
-#        self.assertTrue(xpub)
-#        self.assertNotEqual(xs.bind(xpub, b'tcp://127.0.0.1:5560'), -1)
-#        # We have to give control to the SUB socket here so that it has
-#        # chance to resend the subscriptions.
-#        try:
-#            xs.recv(sub, buf, sizeof(buf), XS.DONTWAIT)
-#        except xs.XSError as e:
-#            self.assertEqual(e.errno, XS.EAGAIN)
-#        # Check whether subscriptions are correctly generated.
-#        self.assertEqual(xs.recv(xpub, buf, sizeof(buf), 0), 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'a')
-#        self.assertEqual(xs.recv(xpub, buf, sizeof(buf), 0), 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'b')
-#        # Clean up.
-#        self.assertEqual(xs.close(sub), 0)
-#        self.assertEqual(xs.close(xpub), 0)
-#        self.assertEqual(xs.term(ctx), 0)
-
-    def test_regrep_device(self):
-        # Create a req/rep device.
-        xreq = self.ctx.xreq()
-        xrep = self.ctx.xrep()
-        # Create a worker.
-        rep = self.ctx.rep()
-        # Create a client.
-        req = self.ctx.req()
-        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
-            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
-            # Send a request.
-            self.assertEqual(req.send(b'ABC', more=True).last_rc, 3)
-            self.assertEqual(req.send(b'DEF').last_rc, 3)
-            # Pass the request through the device.
-            for i in lrange(4):
-                msg = xrep.recvmsg()
-                self.assertTrue(msg.last_rc >= 0)
-                self.assertEqual(msg.more, 1)
-                self.assertTrue(
-                    xreq.sendmsg(msg.dest, more=True if msg.more else False).last_rc >= 0
-                )
-            # Receive the request.
-            buff = rep.recv(3)
-            self.assertEqual(buff.last_rc, 3)
-            self.assertEqual(bytes(buff), b'ABC')
-            self.assertEqual(buff.more, 0)
-            buff2 = rep.recv(3)
-            self.assertEqual(buff2.last_rc, 3)
-            self.assertEqual(bytearray(buff), b'DEF')
-            self.assertEqual(buff2.more, 0)
-            # Send the reply.
-            self.assertEqual(rep.send(b'GHI', more=True).last_rc, 3)
-            self.assertEqual(rep.send(b'JKL').last_rc, 3)
-            # Pass the reply through the device.
-            for i in lrange(4):
-                msg = xrep.recvmsg()
-                self.assertTrue(msg.last_rc >= 0)
-                self.assertEqual(msg.more, 0)
-                self.assertTrue(
-                    xreq.sendmsg(msg.dest, more=True if msg.more else False).last_rc >= 0
-                )
-            # Receive the reply.
-            buff = req.recv(3)
-            self.assertEqual(buff.last_rc, 3)
-            self.assertEqual(bytes(buff), b'GHI')
-            self.assertEqual(buff.more, 0)
-            buff2 = req.recv(3)
-            self.assertEqual(buff2.last_rc, 3)
-            self.assertEqual(bytes(buff), b'JKL')
-            self.assertEqual(buff2.more, 0)
-
-
-    def test_invalid_rep(self):
-        from stuf.six import int2byte
-        # Create REQ/XREP wiring.
-        xrep_socket = self.ctx.xrep(linger=0)
-        req_socket = self.ctx.req(linger=0)
-        with xrep_socket.bind_inproc('hi'), req_socket.connect_inproc('hi'):
-            self.assertNotEqual(xrep_socket.last_rc, -1)
-            self.assertNotEqual(req_socket.last_rc, -1)
-            # Initial request.
-            self.assertEqual(req_socket.send('r').last_rc, 1)
-            # Receive the request.
-            addr = xrep_socket.recv(32)
-            addr_size = addr.last_rc
-            self.assertTrue(addr_size >= 0)
-            bottom = xrep_socket.recv(1)
-            self.assertEqual(bottom.last_rc, 0)
-            body = xrep_socket.recv(1)
-            self.assertEqual(body.last_rc, 1)
-            # Send invalid reply.
-            self.assertNotEqual(xrep_socket.send(addr.dest).last_rc, addr_size)
-            # Send valid reply.
-            self.assertNotEqual(
-                xrep_socket.send(addr.dest, more=True).last_rc, addr_size,
-            )
-            self.assertNotEqual(
-                xrep_socket.send(bottom.dest, 0, more=True).last_rc, 0
-            )
-            self.assertEqual(xrep_socket.send(b'b').last_rc, 1)
-            # Check whether we've got the valid reply.
-            body = req_socket.recv(1, nowait=True)
-            self.assertEqual(body.last_rc, 1)
-            self.assertEqual(int2byte(body.dest[0]), b'b')
-
-#    def test_survey(self):
-#        from crossroads.lowest import array
-#        from ctypes import sizeof, byref, c_int, c_ubyte
-#        XS, xs = twoget(self)
-#        buf = array(c_ubyte, 32)
-#        # Create the basic infrastructure.
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
-#        self.assertTrue(xsurveyor)
-#        self.assertNotEqual(xs.bind(xsurveyor, b'inproc://a'), -1)
-#        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
-#        self.assertTrue(xrespondent)
-#        self.assertNotEqual(xs.bind(xrespondent, b'inproc://b'), -1)
-#        surveyor = xs.socket(ctx, XS.SURVEYOR)
-#        self.assertTrue(surveyor)
-#        self.assertNotEqual(xs.connect(surveyor, b'inproc://b'), -1)
-#        respondent1 = xs.socket(ctx, XS.RESPONDENT)
-#        self.assertTrue(respondent1)
-#        self.assertNotEqual(xs.connect(respondent1, b'inproc://a'), -1)
-#        respondent2 = xs.socket(ctx, XS.RESPONDENT)
-#        self.assertTrue(respondent2)
-#        self.assertNotEqual(xs.connect(respondent2, b'inproc://a'), -1)
-#        # Send the survey.
-#        self.assertEqual(xs.send(surveyor, b'ABC', 3, 0), 3)
-#        # Forward the survey through the intermediate device.
-#        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xsurveyor, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xsurveyor, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 3)
-#        self.assertEqual(xs.send(xsurveyor, buf, 3, 0), 3)
-#        # Respondent 1 responds to the survey.
-#        self.assertEqual(xs.recv(respondent1, buf, sizeof(buf), 0), 3)
-#        self.assertEqual(xs.send(respondent1, b'DE', 2, 0), 2)
-#        # Forward the response through the intermediate device.
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 2)
-#        self.assertEqual(xs.send(xrespondent, buf, 2, 0), 2)
-#        # Surveyor gets the response.
-#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 2)
-#        # Respondent 2 responds to the survey.
-#        self.assertEqual(xs.recv(respondent2, buf, sizeof(buf), 0), 3)
-#        self.assertEqual(xs.send(respondent2, b'FGHI', 4, 0), 4)
-#        # Forward the response through the intermediate device.
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, 0), 4)
-#        # Surveyor gets the response.
-#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 4)
-#        # Now let's test whether survey timeout works as expected.
-#        timeout = c_int(100)
-#        self.assertEqual(xs.setsockopt(
-#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-#        ), 0)
-#        self.assertEqual(xs.send(surveyor, b'ABC', 3, 0), 3)
-#        watch = xs.stopwatch_start()
-#        try:
-#            xs.recv(surveyor, buf, sizeof(buf), 0)
-#        except xs.XSError as e:
-#            self.assertEqual(e.errno, XS.ETIMEDOUT)
-#        self.time_assert(xs.stopwatch_stop(watch) / 1000, timeout.value)
-#        # Test whether responses for old surveys are discarded. First,
-#        # initiate new survey.
-#        self.assertEqual(xs.setsockopt(
-#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-#        ), 0)
-#        self.assertEqual(xs.send(surveyor, b'DE', 2, 0), 2)
-#        # Read, process and reply to the old survey.
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 3)
-#        self.assertEqual(xs.send(xrespondent, buf, 3, 0), 3)
-#        # Read, process and reply to the new survey.
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
-#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
-#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 2)
-#        self.assertEqual(xs.send(xrespondent, buf, 2, 0), 2)
-#        # Get the response and check it's the response to the new survey and
-#        # that response to the old survey was silently discarded.
-#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 2)
-#        self.assertEqual(xs.close(respondent2), 0)
-#        self.assertEqual(xs.close(respondent1), 0)
-#        self.assertEqual(xs.close(surveyor), 0)
-#        self.assertEqual(xs.close(xrespondent), 0)
-#        self.assertEqual(xs.close(xsurveyor), 0)
-#        self.assertEqual(xs.term(ctx), 0)
+#        self.assertEqual(sub.set(subscribe = b'a'), 0)
+#        self.assertEqual(sub.set(subscribe = b'b'), 0)
+#        with xpub.bind_tcp('127.0.0.1:5560'), sub.connect_tcp('127.0.0.1:5560'):
+#            # Check whether subscriptions are correctly received.
+#            buf = xpub.recv(5)
+#            self.assertEqual(buf.last_rc, 5)
+#            self.assertEqual(buf.dest[0], 0)
+#            self.assertEqual(buf.dest[1], 1)
+#            self.assertEqual(buf.dest[2], 0)
+#            self.assertEqual(buf.dest[3], 1)
+#            self.assertEqual(int2byte(buf.dest[4]), b'a')
+#            buf = xpub.recv(5)
+#            self.assertEqual(buf.last_rc, 5)
+#            self.assertEqual(buf.dest[0], 0)
+#            self.assertEqual(buf.dest[1], 1)
+#            self.assertEqual(buf.dest[2], 0)
+#            self.assertEqual(buf.dest[3], 1)
+#            self.assertEqual(int2byte(buf.dest[4]), b'b')
+#                # Tear down the connection.
+#                self.assertEqual(xpub.close(), 0)
+#                sleep(1)
+#                # Re-establish the connection.
+#                xpub = self.ctx.xpub()
+#                self.assertNotEqual(xpub.bind_tcp('127.0.0.1:5560'), -1)
+#                # We have to give control to the SUB socket here so that it has
+#                # chance to resend the subscriptions.
+#                try:
+#                    sub.recv(3, self.XS.DONTWAIT)
+#                except self.xs.XSError as e:
+#                    self.assertEqual(e.errno, self.XS.EAGAIN)
+#                # Check whether subscriptions are correctly generated.
+#                buf = xpub.recv(5)
+#                self.assertEqual(buf.last_rc, 5)
+#                self.assertEqual(buf.dest[0], 0)
+#                self.assertEqual(buf.dest[1], 1)
+#                self.assertEqual(buf.dest[2], 0)
+#                self.assertEqual(buf.dest[3], 1)
+#                self.assertEqual(int2byte(buf.dest[4]), b'a')
+#                buf = xpub.recv(5)
+#                self.assertEqual(buf.last_rc, 5)
+#                self.assertEqual(buf.dest[0], 0)
+#                self.assertEqual(buf.dest[1], 1)
+#                self.assertEqual(buf.dest[2], 0)
+#                self.assertEqual(buf.dest[3], 1)
+#                self.assertEqual(int2byte(buf.dest[4]), b'b')

tests/test_lowest_level.py

         import crossroads.constants
         self.XS = crossroads.constants
 
+#        static void *thread_routine (void *arg_)
+#        {
+#            arg_t *arg = (arg_t*) arg_;
+#            arg->fn (arg->arg);
+#            return NULL;
+#        }
+#    }
+#
+#    void *thread_create (void (*fn_) (void *arg_), void *arg_)
+#    {
+#        arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
+#        assert (arg);
+#        arg->fn = fn_;
+#        arg->arg = arg_;
+#        int rc = pthread_create (&arg->handle, NULL, thread_routine, (void*) arg);
+#        assert (rc == 0);
+#        return (void*) arg;
+#    }
+#
+#    void thread_join (void *thread_)
+#    {
+#        arg_t *arg = (arg_t*) thread_;
+#        int rc = pthread_join (arg->handle, NULL);
+#        assert (rc == 0);
+#        free (arg);
+#    }
+
     def bounce(self, sb, sc):
         from ctypes import c_char_p, c_int, sizeof, c_size_t, c_ubyte
         from crossroads.lowest import array
 #            rc = xs.close(s1)
 #            self.assertEqual(rc, 0)
 #        self.assertEqual(xs.term(ctx), 0)
-
-#extern "C"
-#{
-#    static void *thread_routine (void *arg_)
-#    {
-#        arg_t *arg = (arg_t*) arg_;
-#        arg->fn (arg->arg);
-#        return NULL;
-#    }
-#}
 #
-#void *thread_create (void (*fn_) (void *arg_), void *arg_)
-#{
-#    arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
-#    assert (arg);
-#    arg->fn = fn_;
-#    arg->arg = arg_;
-#    int rc = pthread_create (&arg->handle, NULL, thread_routine, (void*) arg);
-#    assert (rc == 0);
-#    return (void*) arg;
-#}
-#
-#void thread_join (void *thread_)
-#{
-#    arg_t *arg = (arg_t*) thread_;
-#    int rc = pthread_join (arg->handle, NULL);
-#    assert (rc == 0);
-#    free (arg);
-#}
-
 #    def test_backlog(self):
 #        BACKLOG = 10
 #        PARALLEL_CONNECTS = 50