Commits

Lynn Rees committed 13fd5a0

- further

Comments (0)

Files changed (5)

crossroads/constants.py

 IO_THREADS = 2
 PLUGIN = 3
 
+# implicit constants
+DEFAULT_MAX_SOCKETS = 512
+DEFAULT_IO_THREADS = 1
+
 ###############################################################################
 ## CROSSROADS SOCKET DEFINITION ###############################################
 ###############################################################################

crossroads/low.py

 
 from functools import partial
 from ctypes import (
-    byref, sizeof, c_int, c_int64, c_char_p, c_ubyte, string_at, c_size_t)
+    byref, sizeof, c_int, c_int64, c_char_p, string_at, c_size_t, c_ubyte)
 
 from stuf.deep import setter
-from stuf.six.moves import zip_longest as izip  # @UnresolvedImport
-from stuf.six import items, tobytes, isstring, tounicode
+from stuf.six import items, tobytes, isstring
 
 from . import lowest as xs
 from . import constants as XS
+from collections import namedtuple
+from crossroads.lowest import array
 
 BAD_TYPE = 'address must be bytes, got {r} instead'.format
+Received = namedtuple('Received', 'data result more')
 
 
 class Options(object):
 
 class Context(Options):
 
-    def __init__(self, threads=1, max_sockets=512, **options):
+    def __init__(
+        self,
+        threads=XS.DEFAULT_IO_THREADS,
+        max_sockets=XS.DEFAULT_MAX_SOCKETS,
+        **options
+    ):
         super(Context, self).__init__(**options)
         self._ctx = ctx = xs.init()
-        if threads != 1:
-            if threads != 0:
+        if threads != XS.DEFAULT_IO_THREADS:
+            if threads > 0:
                 iothreads = c_int(threads)
                 xs.setctxopt(
                     ctx, XS.IO_THREADS, byref(iothreads), sizeof(iothreads)
                 )
                 del iothreads
             else:
-                raise xs.XSError('must have 1 or more I/O threads')
-        if max_sockets != 512:
-            if max_sockets != 0:
+                raise xs.XSOperationError('must have 1 or more I/O threads')
+        if max_sockets != XS.DEFAULT_MAX_SOCKETS:
+            if max_sockets > 0:
                 max_sockets = c_int(max_sockets)
                 xs.setctxopt(
                     ctx,
                 )
                 del max_sockets
             else:
-                raise xs.XSError('must have 1 or more sockets')
+                raise xs.XSOperationError('must use 1 or more sockets')
+        self.sockets = []
 
     def __getattr__(self, key, getr=object.__getattribute__):
         try:
         return Socket(self, stype, **options)
 
     def close(self):
-        return xs.term(self._ctx)
+        rc = xs.term(self._ctx)
+        del self.sockets
+        self._ctx = None
+        if rc:
+            raise xs.XSOperationError('socket failed to close')
+        return rc
+
+    __del__ = close
 
 
 class Socket(Options):
 
     def __init__(self, context, stype, **options):
-        self._context = context
+        self.context = context
         self._type = stype
         self._socket = xs.socket(context._ctx, stype)
         options.update(context._options)
         self.set(**options)
+        context.sockets.append(self)
+        self.last_rc = self.more = None
+        self.socket_closed = False
+        self.id = None
 
     def __getattr__(self, key, getr=object.__getattribute__):
         try:
         except AttributeError:
             try:
                 if key.startswith('_') and not key.startswith('__'):
+                    this = partial(
+                        getattr(xs, key.lower().strip('_')), self._socket,
+                    )
                     return setter(
                         self,
                         key,
-                        partial(
-                            getattr(xs, key.lower().strip('_')), self._socket,
-                        )
+                        this
                     )
             except AttributeError:
                 raise AttributeError(key)
     def set(self, **options):
         INT, INT64, BINARY = XS.INT_OPTS, XS.INT64_OPTS, XS.BINARY_OPTS
         setsocket, getr = self._setsockopt, getattr
+        rc = 0
         for k, v in items(options):
             const = getr(XS, k.upper())
             if const in INT:
                 value = c_int64(v)
             elif const in BINARY:
                 value = c_char_p(v)
-            setsocket(const, byref(value), sizeof(value))
+            rc |= setsocket(const, byref(value), sizeof(value))
             del value
+        self.last_rc = rc
+        return self
+
+    def __enter__(self):
+        return self
 
     def bind(self, address):
         if isstring(address):
             # if string, coerce to bytes
-            return self._bind(tobytes(address))
-        raise xs.XSError(BAD_TYPE(type(address)))
+            self.id = self.last_rc = self._bind(tobytes(address))
+            return self
+        raise TypeError(BAD_TYPE(type(address)))
 
     def connect(self, address):
         if isstring(address):
             # if string, coerce to bytes
-            return self._connect(tobytes(address))
-        raise xs.XSError(BAD_TYPE(type(address)))
+            self.id = self.last_rc = self._connect(tobytes(address))
+            return self
+        raise TypeError(BAD_TYPE(type(address)))
+
+    def send(self, data, size=None, nowait=False):
+        self.last_rc = self._send(
+            data,
+            len(data) if size is None else size,
+            XS.DONTWAIT if nowait else 0,
+        )
+        return self
 
     def send_bytes(self, data, size=None, nowait=False):
-#        if size is not None and size > 0 and size > dlength:
-#            FLAGS = XS.DONTWAIT if nowait else 0 | XS.SNDMORE
-#            send = self._send
-#            for datum in izip(*[iter(data)] * size):
-#                length = len(datum)
-#                if length == size:
-#                    send(datum, length, FLAGS)
-#                else:
-#                    return send(data, length, 0)
-        return self._send(data, len(data), XS.DONTWAIT if nowait else 0)
+        return self.send(tobytes(data, 'latin-1'), size, nowait)
 
-    def send_unicode(self, data, size=None, encoding='utf-8', errors='strict'):
-        return self.send_bytes(tobytes(data, encoding, errors), size)
+    def recv(self, size, nowait=False):
+        data = array(c_ubyte, size)
+        self.last_rc = self._recv(
+            data, sizeof(data), XS.DONTWAIT if nowait else 0 | 0,
+        )
+        more = c_int()
+        more_size = c_size_t(sizeof(more))
+        self._getsockopt(XS.RCVMORE, byref(more), byref(more_size))
+        self.more = more.value
+        return data
 
     def recv_bytes(self, size, nowait=False):
-        more = c_int()
-        more_size = c_size_t(sizeof(more))
-        results = []
-        rappend = results.append
-        recv, getsockopt, array = self._recv, self._getsockopt, xs.array
-        FLAGS, RCVMORE = XS.DONTWAIT if nowait else 0 | 0, XS.RCVMORE
-        while True:
-            try:
-                data = array(c_ubyte, size)
-                recv(byref(data), sizeof(data), FLAGS)
-                more_size = c_size_t(sizeof(more))
-                getsockopt(RCVMORE, byref(more), byref(more_size))
-                rappend(string_at(byref(data), size))
-                if not more.value:
-                    break
-            finally:
-                del data
-        return b''.join(results)
+        return string_at(byref(self.recv(size, nowait)), size)
 
-    def recv_unicode(
-        self, size, nowait=False, encoding='utf-8', errors='strict',
-    ):
-        return tounicode(self.recv_bytes(size, nowait), encoding, errors)
-
-    def sendmsg_bytes(self, data, size=None, nowait=False):
-        if size is not None and size > 0:
-            FLAGS = XS.DONTWAIT if nowait else 0 | XS.SNDMORE
-            byref, msg_data = byref, xs.msg_data
-            sendmsg, msg_t = self._sendmsg, self._msg_t(),
-            memmove, init_size = xs.memmove, xs.msg_init_size
-            for datum in izip(None, *[iter(data)] * size):
-                msg = msg_t()
-                length = len(datum)
-                init_size(byref(msg), length)
-                memmove(msg_data(byref(msg)), datum, length)
-                if length == size:
-                    sendmsg(byref(msg), length, FLAGS)
-                else:
-                    return sendmsg(byref(msg), length, 0)
-        msg = xs.msg_t()
-        length = len(data)
-        xs.msg_init_size(byref(msg), length)
-        xs.memmove(xs.msg_data(byref(msg)), data, length)
-        return self._sendmsg(byref(msg), length, XS.DONTWAIT if nowait else 0)
-
-    def sendmsg_unicode(
-        self, data, size=None, encoding='utf-8', errors='strict',
-    ):
-        return self.sendmsg_bytes(tobytes(data, encoding, errors), size)
-
-    def recvmsg_bytes(self, size, nowait=False):
-        more = c_int()
-        more_size = c_size_t(sizeof(more))
-        results = []
-        rappend, string_at, c_size_t = results.append, string_at, c_size_t
-        recvmsg, getsockopt = self._recvmsg, self._getsockopt
-        byref, sizeof, msg_close = byref, sizeof, xs.msg_close
-        FLAGS, MORE = XS.DONTWAIT if nowait else 0 | 0, XS.MORE
-        while more.value:
-            try:
-                memref = byref(self._msg_t())
-                rc = xs.msg_init(byref(memref))
-                assert rc == 0, 'message not initialized'
-                more_size = c_size_t(sizeof(more))
-                rc = recvmsg(byref(memref), sizeof(memref), FLAGS)
-                assert rc == 0, 'nothing received'
-                rc = getsockopt(MORE, byref(more), byref(more_size))
-                if not more:
-                    break
-                rappend(string_at(memref, more.value))
-            finally:
-                msg_close(memref)
-        return b''.join(results)
-
-    def recvmsg_unicode(
-        self, size, nowait=False, encoding='utf-8', errors='strict',
-    ):
-        return tounicode(self.sendmsg_bytes(size, nowait), encoding, errors)
+#    def sendmsg(self, data, size=None, nowait=False):
+#        msg = xs.msg_t()
+#        length = len(data) if size is None else size
+#        xs.msg_init_size(byref(msg), length)
+#        xs.memmove(xs.msg_data(byref(msg)), data, length)
+#        return self._sendmsg(byref(msg), length, XS.DONTWAIT if nowait else 0)
+#
+#    def recvmsg(self, size, nowait=False):
+#        try:
+#            msg = xs.msg_t()
+#            rc = xs.msg_init(byref(msg))
+#            assert rc == 0, 'message not initialized'
+#            length = self._recvmsg(
+#                byref(msg), sizeof(msg), XS.DONTWAIT if nowait else 0,
+#            )
+#            assert rc == 0, 'nothing received'
+#            more = c_int()
+#            more_size = c_size_t(sizeof(more))
+#            rc = self._getsockopt(XS.RCVMORE, byref(more), byref(more_size))
+#            data = string_at(byref(msg), size)
+#        finally:
+#            xs.msg_close(byref(msg))
+#        return Received(data, length, more.value)
 
     def shutdown(self, sid):
-        return self._shutdown(c_int(sid))
+        self.last_rc = self._shutdown(c_int(sid))
+        return self
 
     def close(self):
-        return self._close()
+        rc = self._close()
+        self.context.sockets.remove(self)
+        self._socket = self.context = self.last_rc = self.more = None
+        if not rc:
+            self.socket_closed = True
+        return rc
+
+    def __exit__(self, e, c, b):
+        self.close()
+        if any((e is not None, c is not None, b is not None)):
+            return False

crossroads/lowest.py

 
 
 class XSBaseError(Exception):
+    '''Base exception.'''
 
-    '''Base exception.'''
+
+class XSOperationError(XSBaseError):
+    '''Operational Error.'''
 
 
 class XSError(XSBaseError):
 msg_size = factory(size_t, 'msg_size', POINTER(msg_t))
 
 # int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, size_t *optvallen)
-getmsgopt = intfactory(
-    'getmsgopt', POINTER(msg_t), cint, void, POINTER(size_t)
-)
+getmsgopt = intfactory('getmsgopt', POINTER(msg_t), cint, void, POINTER(size_t))
 
 ###############################################################################
 ## CROSSROADS CONTEXT DEFINITION ##############################################
 
 class pollitem_t(Structure):
 
-    # typedef struct { void *socket; int fd; short events; short revents;
+    # typedef struct {
+    #   void *socket; int fd; short events; short revents;
     # } xs_pollitem_t;
 
     _fields_ = [

tests/test_low_level.py

     def bounce(self, sb, sc):
         content = b'12345678ABCDEFGH12345678abcdefgh'
         #  Send the message.
-        self.assertEqual(sc.send_bytes(content, 32), 32)
+        self.assertEqual(sc.send(content).last_rc, 32)
         #  Bounce the message back.
-        buf1 = sb.recv_bytes(32)
-        rc = sb.send_bytes(buf1, 32)
-        self.assertEqual(rc, 32)
-        #  Receive the bounced message.
-        #  Check whether the message is still the same.
-        self.assertEqual(sc.recv_bytes(32), content)
+        buf1 = sb.recv(32)
+        self.assertEqual(len(buf1), 32)
+        self.assertEqual(sb.more, 0)
+##        rcvmore = c_int()
+##        sz = c_size_t(sizeof(rcvmore))
+##        self.assertEqual(
+##            xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+##        )
+##        self.assertTrue(rcvmore)
+##        self.assertEqual(xs.recv(sb, buf1, 32, 0), 32)
+##        self.assertEqual(
+##            xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz)), 0,
+##        )
+##        self.assertFalse(rcvmore)
+##        self.assertEqual(xs.send(sb, buf1, 32, XS.SNDMORE), 32)
+        self.assertEqual(sb.send(buf1).last_rc, 32)
+##        #  Receive the bounced message.
+##        buf2 = (c_char_p * 32)()
+##        self.assertEqual(xs.recv(sc, buf2, 32, 0), 32)
+##        self.assertEqual(
+##            xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+##        )
+##        self.assertTrue(rcvmore)
+        buf2 = sc.recv(32)
+#        self.assertEqual(len(buf2), 32)
+##        self.assertEqual(
+##            xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+##        )
+##        #  Check whether the message is still the same.
+        self.assertEqual(bytearray(buf2), content)
 
     def time_assert(self, actual, expected):
         '''
         # This is a very simple test to check whether everything works OK when
         # context is terminated even before I/O threads were launched.
         self.assertTrue(self.ctx)
-        rc = self.ctx.close()
-        self.assertEqual(rc, 0)
 
     def test_max_sockets(self):
-        ctx = self.context_class(max_sockets=1)
-        # First socket should be created OK.
-        s1 = ctx.push()
-        # Creation of second socket should fail.
         try:
-            ctx.push()
-        except self.xs.XSError as e:
-            self.assertEqual(e.errno, self.XS.EMFILE)
-        # Clean up.
-        self.assertEqual(s1.close(), 0)
+            # Create context and set MAX_SOCKETS to 1.
+            ctx = self.context_class(max_sockets=1)
+            # First socket should be created OK.
+            s1 = ctx.push()
+            # Creation of second socket should fail.
+            try:
+                ctx.push()
+            except self.xs.XSError as e:
+                self.assertEqual(e.errno, self.XS.EMFILE)
+        except:
+            raise
+        finally:
+            # Clean up.
+            self.assertEqual(s1.close(), 0)
+            self.assertEqual(ctx.close(), 0)
+
+    def test_linger(self):
+        ctx = self.context_class()
+        s = ctx.push(linger=100)
+        with s.connect(b'tcp://127.0.0.1:5560'):
+            #  Send a message.
+            s.send(b'r')
+            self.assertEqual(s.last_rc, 1)
+        #  Terminate the context. This should take 0.1 second.
+        watch = self.xs.stopwatch_start()
+        ctx.close()
+        self.time_assert(self.xs.stopwatch_stop(watch) / 1000, 100)
+
+    def test_pair_ipc(self):
+        sb = self.ctx.pair()
+        sc = self.ctx.pair()
+        with sb.bind(b'ipc:///tmp/tester'), sc.connect(b'ipc:///tmp/tester'):
+            self.bounce(sb, sc)
+
+    def test_pair_inproc(self):
+        sb = self.ctx.pair()
+        sc = self.ctx.pair()
+        with sb.bind('inproc://a'), sc.connect('inproc://a'):
+            self.bounce(sb, sc)
 
     def test_pair_tcp(self):
         from time import sleep
         sb = self.ctx.pair()
-        self.assertNotEqual(sb.bind(b'tcp://127.0.0.1:5560'), -1)
         sc = self.ctx.pair()
-        self.assertNotEqual(sc.connect(b'tcp://127.0.0.1:5560'), -1)
-        self.bounce(sb, sc)
-        # Now let's try to open one more connection to the bound socket. The
-        # connection should be silently rejected rather than causing error.
-        sc2 = self.ctx.pair()
-        sc2.connect(b'tcp://127.0.0.1:5560')
-        sleep(1)
-        self.assertEqual(sc2.close(), 0)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
-
-    def test_pair_ipc(self):
-        sb = self.ctx.pair()
-        self.assertNotEqual(sb.bind(b'ipc:///tmp/tester'), -1)
-        sc = self.ctx.pair()
-        self.assertNotEqual(sc.connect(b'ipc:///tmp/tester'), -1)
-        self.bounce(sb, sc)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
-
-    def test_pair_inproc(self):
-        sb = self.ctx.pair()
-        self.assertNotEqual(sb.bind('inproc://a'), -1)
-        sc = self.ctx.pair()
-        self.assertNotEqual(sc.connect('inproc://a'), -1)
-        self.bounce(sb, sc)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
-
-    def test_linger(self):
-        xs = self.xs
-        #  Create socket.
-        s = self.ctx.push(linger=100)
-        #  Set linger to 0.1 second.
-        #  Connect to non-existent endpoing.
-        self.assertNotEqual(s.connect(b'tcp://127.0.0.1:5560'), -1)
-        #  Send a message.
-        self.assertEqual(s.send_bytes(b'r'), 1)
-        #  Close the socket.
-        self.assertEqual(s.close(), 0)
-        #  Terminate the context. This should take 0.1 second.
-        watch = xs.stopwatch_start()
-        self.assertEqual(self.ctx.close(), 0)
-        self.time_assert(xs.stopwatch_stop(watch) / 1000, 100)
+        with sb.bind(b'tcp://127.0.0.1:5560'), \
+                sc.connect(b'tcp://127.0.0.1:5560'):
+            self.bounce(sb, sc)
+            # Now let's try to open one more connection to the bound socket.
+            # The connection should be silently rejected rather than causing
+            # error.
+            sc2 = self.ctx.pair()
+            with sc2.connect(b'tcp://127.0.0.1:5560'):
+                sleep(1)
 
     def test_regrep_inproc(self):
         sb = self.ctx.rep()
-        self.assertNotEqual(sb.bind(b'inproc://a'), -1)
         sc = self.ctx.req()
-        self.assertNotEqual(sc.connect(b'inproc://a'), -1)
-        self.bounce(sb, sc)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
+        with sb.bind(b'inproc://a'), sc.connect(b'inproc://a'):
+            self.bounce(sb, sc)
 
     def test_regrep_ipc(self):
         sb = self.ctx.rep()
-        self.assertNotEqual(sb.bind(b'ipc:///tmp/tester'), -1)
         sc = self.ctx.req()
-        self.assertNotEqual(sc.connect(b'ipc:///tmp/tester'), -1)
-        self.bounce(sb, sc)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
+        with sb.bind(b'ipc:///tmp/tester'), sc.connect(b'ipc:///tmp/tester'):
+            self.bounce(sb, sc)
 
     def test_regrep_tcp(self):
         sb = self.ctx.rep()
-        self.assertNotEqual(sb.bind(b'tcp://127.0.0.1:5560'), -1)
         sc = self.ctx.req()
-        self.assertNotEqual(sc.connect(b'tcp://127.0.0.1:5560'), -1)
-        self.bounce(sb, sc)
-        self.assertEqual(sc.close(), 0)
-        self.assertEqual(sb.close(), 0)
+        with sb.bind(b'tcp://127.0.0.1:5560'), \
+                sc.connect(b'tcp://127.0.0.1:5560'):
+            self.bounce(sb, sc)
 
-    def test_invalid_rep(self):
-        from stuf.six import int2byte
-        try:
-            xrep_socket = self.ctx.xrep(linger=0)
-            req_socket = self.ctx.req(linger=0)
-            self.assertNotEqual(xrep_socket.bind(b'inproc://hi'), -1)
-            self.assertNotEqual(req_socket.connect(b'inproc://hi'), -1)
-    #        # Initial request.
-            self.assertEqual(req_socket.send_bytes(b'r'), 1)
-    #        # Receive the request.
-            addr = xrep_socket.recv_bytes(1)
-            self.assertTrue(len(addr) >= 0)
-    #        bottom = xrep_socket.recv_bytes(1)
-    #        self.assertEqual(len(bottom), 0)
-    #        body = xrep_socket.recv_bytes(1)
-    #        self.assertEqual(len(body), 1)
-    #        # Send invalid reply.
-#            self.assertEqual(len(addr), xrep_socket.send_bytes(addr))
-    #        # Send valid reply.
-            self.assertEqual(xrep_socket.send_bytes(b'b'), 1)
-    #        # Check whether we've got the valid reply.
-            self.assertEqual(int2byte(req_socket.recv_bytes(1)[0]), b'b')
-        finally:
-            # Tear down the wiring.
-            self.assertEqual(xrep_socket.close(), 0)
-            self.assertEqual(req_socket.close(), 0)
+    def test_wireformat(self):
+        import socket
+        # Create the basic infrastructure.
+        push = self.ctx.push()
+        pull = self.ctx.pull()
+        # Bind the peer and get the message.
+        with pull.bind(b'tcp://127.0.0.1:5560'), \
+                push.bind(b'tcp://127.0.0.1:5561'):
+            # Connect to the peer using raw sockets.
+            rpush = socket.socket(
+                socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
+            )
+            rpush.connect(('127.0.0.1', 5560))
+            rpull = socket.socket(
+                socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
+            )
+            rpull.connect(('127.0.0.1', 5561))
+            # Let's send some data and check if it arrived
+            self.assertEqual(rpush.send(b'\x04\0abc', 0), 5)
+            buf = pull.recv_bytes(3)
+            self.assertEqual(pull.last_rc, 3)
+            self.assertEqual(buf, b'abc')
+            # Let's push this data into another socket
+            self.assertEqual(push.send(buf).last_rc, 3)
+            self.assertNotEqual(rpull.recv(3), b'\x04\0abc')
+            rpush.close()
+            rpull.close()
+
+    def test_shutdown(self):
+        from time import sleep
+        # Create infrastructure.
+        push = self.ctx.push()
+        pull = self.ctx.pull()
+        with push.bind(b'tcp://127.0.0.1:5560'), \
+                pull.connect(b'tcp://127.0.0.1:5560'):
+            self.assertNotEqual(push.id, -1)
+            # Pass one message through to ensure the connection is established.
+            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            pull.recv(3)
+            self.assertEqual(pull.last_rc, 3)
+            # Shut down the bound endpoint.
+            self.assertEqual(push.shutdown(push.id).last_rc, 0)
+            sleep(1)
+            try:
+                # Check that sending would block (there's no outbound
+                # connection).
+                push.send(b'ABC', nowait=True)
+            except self.xs.XSError as e:
+                self.assertTrue(e.errno, self.XS.EAGAIN)
+        # Now the other way round...Create infrastructure.
+        ctx = self.context_class()
+        pull = ctx.pull()
+        push = ctx.push()
+        with pull.bind(b'tcp://127.0.0.1:5560'), \
+                push.connect(b'tcp://127.0.0.1:5560'):
+            self.assertNotEqual(push.id, -1)
+            # Pass one message through to ensure the connection is established.
+            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            pull.recv(3)
+            self.assertEqual(pull.last_rc, 3)
+            # Shut down the bound endpoint.
+            self.assertEqual(push.shutdown(push.id).last_rc, 0)
+            sleep(1)
+            try:
+                # Check that sending would block (there's no outbound
+                # connection).
+                push.send(b'ABC', nowait=True)
+            except self.xs.XSError as e:
+                self.assertTrue(e.errno, self.XS.EAGAIN)
 
     def test_reconnect(self):
         import sys
         from time import sleep
-        #  Create the basic infrastructure.
+        # Create the basic infrastructure.
         push = self.ctx.push()
         pull = self.ctx.pull()
         # Connect before bind was done at the peer and send one message.
-#        self.assertNotEqual(push.connect(b'tcp://127.0.0.1:5560'), -1)
-#        self.assertEqual(push.send_bytes(b'ABC'), 3)
-        # Wait a while for few attempts to reconnect to happen.
-#        sleep(1)
-#        # Bind the peer and get the message.
-#        self.assertNotEqual(pull.bind(b'tcp://127.0.0.1:5560'), -1)
-#        self.assertEqual(pull.recv_bytes(3), b'ABC')
-        # Clean up.
-        self.assertEqual(push.close(), 0)
-        self.assertEqual(pull.close(), 0)
-#        if sys.platform not in ('win32', 'cygwin'):
-#            # Now, let's test the same scenario with IPC.
-#            push = self.ctx.push()
-#            pull = self.ctx.pull()
-#            # Connect before bind was done at the peer and send one message.
-#            self.assertNotEqual(push.connect(b'ipc:///tmp/tester'), -1)
-#            self.assertEqual(push.send_bytes(b'ABC'), 3)
-#            # Wait a while for few attempts to reconnect to happen.
-#            sleep(1)
-#            # Bind the peer and get the message.
-#            self.assertNotEqual(pull.bind(b'ipc:///tmp/tester'), -1)
-#            self.assertEqual(pull.recv_bytes(3), 3)
+        with push.connect(b'tcp://127.0.0.1:5560'):
+            self.assertEqual(push.send(b'ABC').last_rc, 3)
+            # Wait a while for few attempts to reconnect to happen.
+            sleep(1)
+            # Bind the peer and get the message.
+            with pull.bind(b'tcp://127.0.0.1:5560'):
+                pull.recv(3)
+                self.assertEqual(pull.last_rc, 3)
+        if sys.platform not in ('win32', 'cygwin'):
+            # Now, let's test the same scenario with IPC.
+            push = self.ctx.push()
+            pull = self.ctx.pull()
+            with push.connect(b'ipc:///tmp/tester'):
+                # Connect before bind was done at the peer and send one message
+                self.assertEqual(push.send(b'ABC').last_rc, 3)
+                # Wait a while for few attempts to reconnect to happen.
+                sleep(1)
+                # Bind the peer and get the message.
+                with pull.bind(b'ipc:///tmp/tester'):
+                    pull.recv(3)
+                    self.assertEqual(pull.last_rc, 3)
+
+    def test_hwn(self):
+        # Create pair of socket, each with high watermark of 2. Thus the
+        # total buffer space should be 4 messages.
+        sb = self.ctx.pull(rcvhwm=2)
+        sc = self.ctx.push(sndhwm=2)
+        with sb.bind(b'inproc://a'), sc.connect(b'inproc://a'):
+            # Try to send 10 messages. Only 4 should succeed.
+            for t in xrange(10):
+                try:
+                    sc.send(None, 0, nowait=True)
+                    if t < 4:
+                        self.assertEqual(sc.last_rc, 0)
+                except self.xs.XSError as e:
+                    self.assertTrue(e.errno, self.XS.EAGAIN)
+            # There should be now 4 messages pending, consume them.
+            for i in xrange(4):
+                sb.recv(0)
+                self.assertEqual(sb.last_rc, 0)
+            # Now it should be possible to send one more.
+            self.assertEqual(sc.send(None, 0).last_rc, 0)
+            # Consume the remaining message.
+            sb.recv(0)
+            self.assertEqual(sb.last_rc, 0)
+            s1 = self.ctx.pull()
+            s2 = self.ctx.push(sndhwm=5)
+            # Following part of the tests checks whether small HWMs don't
+            # interact with command throttling in strange ways.
+            with s1.bind(b'tcp://127.0.0.1:5858'), \
+                    s2.connect(b'tcp://127.0.0.1:5858'):
+                self.assertTrue(s1.last_rc >= 0)
+                self.assertTrue(s2.last_rc >= 0)
+                for i in xrange(10):
+                    self.assertEqual(s2.send(b'test', nowait=True).last_rc, 4)
+                    s1.recv(4)
+                    self.assertEqual(s1.last_rc, 4)
+
+#    def test_resubscribe(self):
+#        from time import sleep
+#        from stuf.six import int2byte
+#        try:
+#            # Create the basic infrastructure.
+#            xpub = self.ctx.xpub()
+#            sub = self.ctx.sub()
+#            # Send two subscriptions upstream.
+#            self.assertNotEqual(xpub.bind(b'tcp://127.0.0.1:5560'), -1)
+#            self.assertEqual(sub.set(subscribe=b'a'), 0)
+#            self.assertEqual(sub.set(subscribe=b'b'), 0)
+#            self.assertNotEqual(sub.connect(b'tcp://127.0.0.1:5560'), -1)
+#            # Check whether subscriptions are correctly received.
+#            buf = xpub.recv(5, nowait=True)
+#            self.assertEqual(buf.length, 12)
+#            self.assertEqual(buf.data[0], 0)
+#            self.assertEqual(buf.data[1], 1)
+#            self.assertEqual(buf.data[2], 0)
+#            self.assertEqual(buf.data[3], 1)
+##            self.assertEqual(int2byte(buf.data[4]), b'a')
+#            buf = xpub.recv(5)
+#            self.assertEqual(buf.length, 12)
+#            self.assertEqual(buf.data[0], 0)
+#            self.assertEqual(buf.data[1], 1)
+#            self.assertEqual(buf.data[2], 0)
+#            self.assertEqual(buf.data[3], 1)
+##            self.assertEqual(int2byte(buf.data[4]), b'b')
+##        except:
+##            raise
+##        finally:
+##            # Tear down the connection.
+##            self.assertEqual(xpub.close(), 0)
+##        sleep(1)
+##        try:
+##            # Re-establish the connection.
+##            xpub = self.ctx.xpub()
+##            self.assertNotEqual(xpub.bind(b'tcp://127.0.0.1:5560'), -1)
+##            # We have to give control to the SUB socket here so that it has
+##            # chance to resend the subscriptions.
+##            try:
+##                sub.recv(5, nowait=True)
+##            except self.xs.XSError as e:
+##                self.assertEqual(e.errno, self.XS.EAGAIN)
+##            # Check whether subscriptions are correctly generated.
+##            buf = xpub.recv(5)
+##            self.assertEqual(buf.length, 5)
+##            self.assertEqual(buf.data[0], 0)
+##            self.assertEqual(buf.data[1], 1)
+##            self.assertEqual(buf.data[2], 0)
+##            self.assertEqual(buf.data[3], 1)
+##            self.assertEqual(int2byte(buf.data[4]), b'a')
+##            self.assertEqual(xpub.recv(5), 5)
+##            self.assertEqual(buf.data[0], 0)
+##            self.assertEqual(buf.data[1], 1)
+##            self.assertEqual(buf.data[2], 0)
+##            self.assertEqual(buf.data[3], 1)
+##            self.assertEqual(int2byte(buf.data[4]), b'b')
+#        except:
+#            raise
+#        finally:
 #            # Clean up.
-#            self.assertEqual(push.close(), 0)
-#            self.assertEqual(pull.close(), 0)
+#            self.assertEqual(sub.close(), 0)
+#            self.assertEqual(xpub.close(), 0)
+#
+    def test_sub_forward(self):
+        from time import sleep
+        # First, create an intermediate device.
+        xpub = self.ctx.xpub()
+        xsub = self.ctx.xsub()
+        with xpub.bind(b'tcp://127.0.0.1:5560'), \
+                xsub.bind(b'tcp://127.0.0.1:5561'):
+            # Create a publisher.
+            pub = self.ctx.pub()
+            sub = self.ctx.sub()
+            with pub.connect(b'tcp://127.0.0.1:5561'), \
+                    sub.connect(b'tcp://127.0.0.1:5560'):
+                # Create a subscriber and subscribe for all messages
+                self.assertEqual(sub.set(subscribe='').last_rc, 0)
+                # Pass the subscription upstream through the device.
+                buf = xpub.recv(32)
+                self.assertTrue(xpub.last_rc >= 0)
+                self.assertTrue(xsub.send(buf, 32).last_rc >= 0)
+                # Wait a bit till the subscription gets to the publisher.
+                sleep(10)
+                # Send an empty message.
+                self.assertEqual(pub.send(None, 0).last_rc, 0)
+                # Pass the message downstream through the device.
+                buf = xsub.recv(pub.last_rc, nowait=True)
+#                self.assertTrue(xsub.last_rc >= 0)
+#                self.assertTrue(xpub.send(buf).last_rc >= 0)
+#                # Receive the message in the subscriber.
+#                sub.recv(xpub.last_rc)
+#                self.assertEqual(sub.last_rc, 0)
 
 #    def test_msg_flags(self):
-#        sb = self.ctx.xrep()
-#        self.assertNotEqual(sb.bind(b'inproc://a'), -1)
-#        sc = self.ctx.xreq()
-#        self.assertNotEqual(sc.connect(b'inproc://a'), -1)
-#         Send 2 - part message.
-#        self.assertEqual(sc.sendmsg_bytes(b'AB', 2), 2)
-#         Identity comes first.
-#        self.assertEqual(sb.recvmsg_bytes(2), b'AB')
-#         Deallocate the infrastructure.
-#        self.assertEqual(sc.close(), 0)
-#        self.assertEqual(sb.close(), 0)
+#        try:
+#            # Create the infrastructure
+#            sb = self.ctx.xrep()
+#            self.assertNotEqual(sb.bind(b'inproc://a'), -1)
+#            sc = self.ctx.xreq()
+#            self.assertNotEqual(sc.connect(b'inproc://a'), -1)
+#            # Send 2 - part message.
+#            self.assertEqual(sc.send(b'A'), 1)
+#            self.assertEqual(sc.send(b'B'), 1)
+#            # Identity comes first.
+#            msg = sb.recvmsg(32)
+#            self.assertTrue(msg.length >= 0)
+#            self.assertEqual(msg.more, 1)
+#            # Then the first part of the message body.
+#            msg2 = sb.recvmsg(32)
+#            self.assertEqual(msg2.length, 1)
+#            self.assertEqual(msg2.more, 0)
+#            # And finally, the second part of the message body.
+##            msg3 = sb.recvmsg(32)
+##            self.assertEqual(msg3.length, 1)
+##            self.assertEqual(msg3.more, 0)
+#        except:
+#            raise
+#        finally:
+#            # Deallocate the infrastructure.
+#            self.assertEqual(sc.close(), 0)
+#            self.assertEqual(sb.close(), 0)
 
-
-#    def test_survey(self):
-#        from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
-#        XS, xs = twoget(self)
-#        buf = (c_ubyte * 32)()
-#        # Create the basic infrastructure.
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
-#        self.assertTrue(xsurveyor)
-#        rc = xs.bind(xsurveyor, b'inproc://a')
-#        self.assertNotEqual(rc, -1)
-#        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
-#        self.assertTrue(xrespondent)
-#        rc = xs.bind(xrespondent, b'inproc://b')
-#        self.assertNotEqual(rc, -1)
-#        surveyor = xs.socket(ctx, XS.SURVEYOR)
-#        self.assertTrue(surveyor)
-#        rc = xs.connect(surveyor, b'inproc://b')
-#        self.assertNotEqual(rc, -1)
-#        respondent1 = xs.socket(ctx, XS.RESPONDENT)
-#        self.assertTrue(respondent1)
-#        rc = xs.connect(respondent1, b'inproc://a')
-#        self.assertNotEqual(rc, -1)
-#        respondent2 = xs.socket(ctx, XS.RESPONDENT)
-#        self.assertTrue(respondent2)
-#        rc = xs.connect(respondent2, b'inproc://a')
-#        self.assertNotEqual(rc, -1)
-#        # Send the survey.
-#        rc = xs.send(surveyor, b'ABC', 3, 0)
-#        self.assertEqual(rc, 3)
-#        # Forward the survey through the intermediate device.
-#        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(xsurveyor, buf, 3, 0)
-#        self.assertEqual(rc, 3)
-#        # Respondent 1 responds to the survey.
-#        rc = xs.recv(respondent1, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(respondent1, b'DE', 2, 0)
-#        self.assertEqual(rc, 2)
-#        # Forward the response through the intermediate device.
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 2)
-#        rc = xs.send(xrespondent, buf, 2, 0)
-#        self.assertEqual(rc, 2)
-#        # Surveyor gets the response.
-#        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 2)
-#        # Respondent 2 responds to the survey.
-#        rc = xs.recv(respondent2, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(respondent2, b'FGHI', 4, 0)
-#        self.assertEqual(rc, 4)
-#        # Forward the response through the intermediate device.
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, 0)
-#        self.assertEqual(rc, 4)
-#        # Surveyor gets the response.
-#        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        # Now let's test whether survey timeout works as expected.
-#        timeout = c_int(100)
-#        rc = xs.setsockopt(
-#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-#        )
-#        self.assertEqual(rc, 0)
-#        rc = xs.send(surveyor, b'ABC', 3, 0)
-#        self.assertEqual(rc, 3)
-#        watch = xs.stopwatch_start()
-#        try:
-#            rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-#        except xs.XSError:
-#            self.assertEqual(get_errno(), XS.ETIMEDOUT)
-#        elapsed = xs.stopwatch_stop(watch) / 1000
-#        self.time_assert(elapsed, timeout.value)
-#        # Test whether responses for old surveys are discarded. First,
-#        # initiate new survey.
-#        rc = xs.setsockopt(
-#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-#        )
-#        self.assertEqual(rc, 0)
-#        rc = xs.send(surveyor, b'DE', 2, 0)
-#        self.assertEqual(rc, 2)
-#        # Read, process and reply to the old survey.
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(xrespondent, buf, 3, 0)
-#        self.assertEqual(rc, 3)
-#        # Read, process and reply to the new survey.
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 4)
-#        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-#        self.assertEqual(rc, 4)
-#        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 2)
-#        rc = xs.send(xrespondent, buf, 2, 0)
-#        self.assertEqual(rc, 2)
-#        # Get the response and check it's the response to the new survey and
-#        # that response to the old survey was silently discarded.
-#        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 2)
-#        rc = xs.close(respondent2)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(respondent1)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(surveyor)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(xrespondent)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(xsurveyor)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
-#
-
-#
-
-
-#    def test_hwn(self):
-#        # Create pair of socket, each with high watermark of 2. Thus the total
-#        # buffer space should be 4 messages.
-#        sb = self.ctx.pull(rcvhwm=2)
-#        rc = sb.bind(b'inproc://a')
-#        self.assertNotEqual(rc, -1)
-#        sc = self.ctx.push(sndhwm=2)
-#        rc = sc.connect(b'inproc://a')
-#        self.assertNotEqual(rc, -1)
-#        # Try to send 10 messages. Only 4 should succeed.
-#        for t in xrange(10):
-#            try:
-#                rc = sb.send_bytes(None, nowait=True)
-#                if t < 4:
-#                    self.assertEqual(rc, 0)
-#            except self.xs.XSError as e:
-#                self.assertTrue(e.errno, self.XS.EAGAIN)
-#        # There should be now 4 messages pending, consume them.
-#        for i in xrange(4):
-#            rc = sb.recv_bytes(None)
-#            self.assertEqual(rc, 0)
-#        # Now it should be possible to send one more.
-#        rc = sc.send_bytes(None)
-#        self.assertEqual(rc, 0)
-#        # Consume the remaining message.
-#        rc = sb.recv(None)
-#        self.assertEqual(rc, 0)
-#        rc = sc.close()
-#        self.assertEqual(rc, 0)
-#        rc = sb.close()
-#        self.assertEqual(rc, 0)
-#        rc = self.ctx.close()
-#        self.assertEqual(rc, 0)
-#        # Following part of the tests checks whether small HWMs don't interact
-#        # with command throttling in strange ways.
-#        ctx = self.context_class()
-#        s1 = ctx.pull()
-#        s2 = ctx.push(sndhwm=5)
-#        rc = s1.bind(b'tcp://127.0.0.1:5858')
-#        self.assertTrue(rc >= 0)
-#        rc = s2.connect(b'tcp://127.0.0.1:5858')
-#        self.assertTrue(rc >= 0)
-#        for i in xrange(10):
-#            rc = s2.send_bytes(b'test', 4, nowait=True)
-#            self.assertEqual(rc, 4)
-#            rc = s1.recv_bytes(4)
-#            self.assertEqual(rc, 4)
-#        rc = s2.close()
-#        self.assertEqual(rc, 0)
-#        rc = s1.close()
-#        self.assertEqual(rc, 0)
-#        rc = ctx.close()
-#        self.assertEqual(rc, 0)
-#
-#    def test_sub_forward(self):
-#        from time import sleep
-#        from ctypes import sizeof, c_ubyte
-#        XS, xs = twoget(self)
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        # First, create an intermediate device.
-#        xpub = xs.socket(ctx, XS.XPUB)
-#        self.assertTrue(xpub)
-#        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        xsub = xs.socket(ctx, XS.XSUB)
-#        self.assertTrue(xsub)
-#        rc = xs.bind(xsub, b'tcp://127.0.0.1:5561')
-#        self.assertNotEqual(rc, -1)
-#        # Create a publisher.
-#        pub = xs.socket(ctx, XS.PUB)
-#        self.assertTrue(pub)
-#        rc = xs.connect(pub, b'tcp://127.0.0.1:5561')
-#        self.assertNotEqual(rc, -1)
-#        # Create a subscriber.
-#        sub = xs.socket(ctx, XS.SUB)
-#        self.assertTrue(sub)
-#        rc = xs.connect(sub, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        # Subscribe for all messages.
-#        rc = xs.setsockopt(sub, XS.SUBSCRIBE, '', 0)
-#        self.assertEqual(rc, 0)
-#        # Pass the subscription upstream through the device.
-#        buff = (c_ubyte * 32)()
-#        rc = xs.recv(xpub, buff, sizeof(buff), 0)
-#        self.assertTrue(rc >= 0)
-#        rc = xs.send(xsub, buff, rc, 0)
-#        self.assertTrue(rc >= 0)
-#        # Wait a bit till the subscription gets to the publisher.
-#        sleep(1)
-#        # Send an empty message.
-#        rc = xs.send(pub, None, 0, 0)
-#        self.assertEqual(rc, 0)
-#        # Pass the message downstream through the device.
-#        rc = xs.recv(xsub, buff, sizeof(buff), 0)
-#        self.assertTrue(rc >= 0)
-#        rc = xs.send(xpub, buff, rc, 0)
-#        self.assertTrue(rc >= 0)
-#        # Receive the message in the subscriber.
-#        rc = xs.recv(sub, buff, sizeof(buff), 0)
-#        self.assertEqual(rc, 0)
-#        # Clean up.
-#        rc = xs.close(xpub)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(xsub)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(pub)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(sub)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
-#
-#    def test_shutdown(self):
-#        from time import sleep
-#        from ctypes import sizeof, c_ubyte
-#        XS, xs = twoget(self)
-#        buf = (c_ubyte * 32)()
-#        # Create infrastructure.
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        push = xs.socket(ctx, XS.PUSH)
-#        self.assertTrue(push)
-#        push_id = xs.bind(push, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(push_id, -1)
-#        pull = xs.socket(ctx, XS.PULL)
-#        self.assertTrue(pull)
-#        rc = xs.connect(pull, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        # Pass one message through to ensure the connection is established.
-#        rc = xs.send(push, b'ABC', 3, 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.recv(pull, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        # Shut down the bound endpoint.
-#        rc = xs.shutdown(push, push_id)
-#        self.assertEqual(rc, 0)
-#        sleep(1)
-#        try:
-#            # Check that sending would block (there's no outbound connection).
-#            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
-#        except xs.XSError:
-#            self.assertTrue(xs.errno(), XS.EAGAIN)
-#        # Clean up.
-#        rc = xs.close(pull)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(push)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
-#        # Now the other way round...Create infrastructure.
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        pull = xs.socket(ctx, XS.PULL)
-#        self.assertTrue(pull)
-#        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        push = xs.socket(ctx, XS.PUSH)
-#        self.assertTrue(push)
-#        push_id = xs.connect(push, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(push_id, -1)
-#        # Pass one message through to ensure the connection is established.
-#        rc = xs.send(push, b'ABC', 3, 0)
-#        self.assertEqual(rc, 3)
-#        rc = xs.recv(pull, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        # Shut down the bound endpoint.
-#        rc = xs.shutdown(push, push_id)
-#        self.assertEqual(rc, 0)
-#        sleep(1)
-#        try:
-#            # Check that sending would block (there's no outbound connection).
-#            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
-#        except xs.XSError:
-#            self.assertTrue(xs.errno(), XS.EAGAIN)
-#        # Clean up.
-#        rc = xs.close(pull)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(push)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
-
-#    def test_resubscribe(self):
-#        from stuf.six import int2byte
-#        from time import sleep
-#        xpub = self.ctx.pub()
-#        sub = self.ctx.sub()
-#        # Send two subscriptions upstream.
-#        self.assertNotEqual(xpub.bind(b'tcp://127.0.0.1:5560'), -1)
-#        sub.subscribe = b'a'
-#        sub.subscribe = b'a'
-#        self.assertNotEqual(sub.connect(b'tcp://127.0.0.1:5560'), -1)
-#        # Check whether subscriptions are correctly received.
-#        buf = array(c_ubyte, 5)
-#        rc = xs.recv(xpub, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'a')
-#        rc = xs.recv(xpub, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'b')
-#        # Tear down the connection.
-#        self.assertEqual(xpub.close(), 0)
-#        sleep(1)
-#        # Re-establish the connection.
-#        xpub = self.ctx.xpub()
-#        rc = xpub.bind(b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        # We have to give control to the SUB socket here so that it has
-#        # chance to resend the subscriptions.
-#        try:
-#            rc = sub.recv(buf, sizeof(buf), nowait=True)
-#        except self.xs.XSError as e:
-#            self.assertEqual(e.errno, self.XS.EAGAIN)
-#        # Check whether subscriptions are correctly generated.
-#        rc = xpub.recv_bytes(buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'a')
-#        rc = xs.recv(xpub, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 5)
-#        self.assertEqual(buf[0], 0)
-#        self.assertEqual(buf[1], 1)
-#        self.assertEqual(buf[2], 0)
-#        self.assertEqual(buf[3], 1)
-#        self.assertEqual(int2byte(buf[4]), b'b')
-#        # Clean up.
-#        self.assertEqual(sub.close(), 0)
-#        self.assertEqual(xpub.close(), 0)
-
-
-#
 #    def test_regrep_device(self):
 #        from ctypes.util import find_library
 #        from ctypes import CDLL, byref, sizeof, c_int, c_ubyte, c_size_t
 #        # Create a req/rep device.
 #        xreq = xs.socket(ctx, XS.XREQ)
 #        self.assertTrue(xreq)
-#        rc = xs.bind(xreq, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
+#        self.assertNotEqual(xs.bind(xreq, b'tcp://127.0.0.1:5560'), -1)
 #        xrep = xs.socket(ctx, XS.XREP)
 #        self.assertTrue(xrep)
-#        rc = xs.bind(xrep, b'tcp://127.0.0.1:5561')
-#        self.assertNotEqual(rc, -1)
+#        self.assertNotEqual(xs.bind(xrep, b'tcp://127.0.0.1:5561'), -1)
 #        # Create a worker.
 #        rep = xs.socket(ctx, XS.REP)
 #        self.assertTrue(rep)
-#        rc = xs.connect(rep, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
+#        self.assertNotEqual(xs.connect(rep, b'tcp://127.0.0.1:5560'), -1)
 #        # Create a client.
 #        req = xs.socket(ctx, XS.REQ)
 #        self.assertTrue(req)
-#        rc = xs.connect(req, b'tcp://127.0.0.1:5561')
-#        self.assertNotEqual(rc, -1)
+#        self.assertNotEqual(xs.connect(req, b'tcp://127.0.0.1:5561'), -1)
 #        # Send a request.
-#        rc = xs.send(req, b'ABC', 3, XS.SNDMORE)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(req, b'DEF', 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.send(req, b'ABC', 3, XS.SNDMORE), 3)
+#        self.assertEqual(xs.send(req, b'DEF', 3, 0), 3)
 #        # Pass the request through the device.
 #        for i in xrange(4):
 #            msg = xs.msg_t()
-#            rc = xs.msg_init(byref(msg))
-#            self.assertEqual(rc, 0)
-#            rc = xs.recvmsg(xrep, byref(msg), 0)
-#            self.assertTrue(rc >= 0)
+#            self.assertEqual(xs.msg_init(byref(msg)), 0)
+#            self.assertTrue(xs.recvmsg(xrep, byref(msg), 0) >= 0)
 #            rcvmore = c_int()
 #            sz = c_size_t(sizeof(rcvmore))
-#            rc = xs.getsockopt(xrep, XS.RCVMORE, byref(rcvmore), byref(sz))
-#            self.assertEqual(rc, 0)
-#            rc = xs.sendmsg(xreq, byref(msg), XS.SNDMORE if rcvmore else 0)
-#            self.assertTrue(rc >= 0)
+#            self.assertEqual(
+#                xs.getsockopt(xrep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#            )
+#            self.assertTrue(
+#                xs.sendmsg(xreq, byref(msg), XS.SNDMORE if rcvmore else 0) >= 0
+#            )
 #        # Receive the request.
 #        buff = array(c_ubyte, 3)
-#        rc = xs.recv(rep, buff, 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.recv(rep, buff, 3, 0), 3)
 #        self.assertEqual(cpp.memcmp(buff, b'ABC', 3), 0)
 #        rcvmore = c_int()
 #        sz = c_size_t(sizeof(rcvmore))
-#        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
-#        self.assertEqual(rc, 0)
+#        self.assertEqual(
+#            xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#        )
 #        self.assertTrue(rcvmore)
-#        rc = xs.recv(rep, buff, 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.recv(rep, buff, 3, 0), 3)
 #        self.assertEqual(cpp.memcmp(buff, b'DEF', 3), 0)
-#        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
-#        self.assertEqual(rc, 0)
+#        self.assertEqual(
+#            xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#        )
 #        self.assertFalse(rcvmore)
 #        # Send the reply.
-#        rc = xs.send(rep, b'GHI', 3, XS.SNDMORE)
-#        self.assertEqual(rc, 3)
-#        rc = xs.send(rep, b'JKL', 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.send(rep, b'GHI', 3, XS.SNDMORE), 3)
+#        self.assertEqual(xs.send(rep, b'JKL', 3, 0), 3)
 #        # Pass the reply through the device.
 #        for i in xrange(4):
 #            msg = xs.msg_t()
-#            rc = xs.msg_init(byref(msg))
-#            self.assertEqual(rc, 0)
-#            rc = xs.recvmsg(xreq, byref(msg), 0)
-#            self.assertTrue(rc >= 0)
-#            rc = xs.getsockopt(xreq, XS.RCVMORE, byref(rcvmore), byref(sz))
-#            self.assertEqual(rc, 0)
-#            rc = xs.sendmsg(xrep, byref(msg), XS.SNDMORE if rcvmore else 0)
-#            self.assertTrue(rc >= 0)
+#            self.assertEqual(xs.msg_init(byref(msg)), 0)
+#            self.assertTrue(xs.recvmsg(xreq, byref(msg), 0) >= 0)
+#            self.assertEqual(
+#                xs.getsockopt(xreq, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#            )
+#            self.assertTrue(
+#                xs.sendmsg(xrep, byref(msg), XS.SNDMORE if rcvmore else 0) >= 0
+#            )
 #        # Receive the reply.
-#        rc = xs.recv(req, buff, 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.recv(req, buff, 3, 0), 3)
 #        self.assertEqual(cpp.memcmp(buff, b'GHI', 3), 0)
-#        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
-#        self.assertEqual(rc, 0)
+#        self.assertEqual(
+#            xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#        )
 #        self.assertTrue(rcvmore)
-#        rc = xs.recv(req, buff, 3, 0)
-#        self.assertEqual(rc, 3)
+#        self.assertEqual(xs.recv(req, buff, 3, 0), 3)
 #        self.assertEqual(cpp.memcmp(buff, b'JKL', 3), 0)
-#        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
-#        self.assertEqual(rc, 0)
+#        self.assertEqual(
+#            xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+#        )
 #        self.assertFalse(rcvmore)
 #        # Clean up.
-#        rc = xs.close(req)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(rep)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(xrep)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(xreq)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
+#        self.assertEqual(xs.close(req), 0)
+#        self.assertEqual(xs.close(rep), 0)
+#        self.assertEqual(xs.close(xrep), 0)
+#        self.assertEqual(xs.close(xreq), 0)
 #
-#    def test_wireformat(self):
+#    def test_invalid_rep(self):
+#        from crossroads.lowest import array
+#        from stuf.six import int2byte
+#        from ctypes import byref, sizeof, c_int, c_ubyte
 #        XS, xs = twoget(self)
-#        import socket
-#        from ctypes.util import find_library
-#        from ctypes import CDLL, sizeof, c_ubyte
+#        # Create REQ/XREP wiring.
+#        ctx = xs.init()
+#        self.assertTrue(ctx)
+#        xrep_socket = xs.socket(ctx, XS.XREP)
+#        self.assertTrue(xrep_socket)
+#        req_socket = xs.socket(ctx, XS.REQ)
+#        self.assertTrue(req_socket)
+#        linger = c_int(0)
+#        self.assertEqual(xs.setsockopt(
+#            xrep_socket, XS.LINGER, byref(linger), sizeof(linger)
+#        ), 0)
+#        self.assertEqual(xs.setsockopt(
+#            req_socket, XS.LINGER, byref(linger), sizeof(linger)
+#        ), 0)
+#        self.assertNotEqual(xs.bind(xrep_socket, b'inproc://hi'), -1)
+#        self.assertNotEqual(xs.connect(req_socket, b'inproc://hi'), -1)
+#        # Initial request.
+#        self.assertEqual(xs.send(req_socket, b'r', 1, 0), 1)
+#        # Receive the request.
+#        addr = array(c_ubyte, 32)
+#        addr_size = xs.recv(xrep_socket, addr, sizeof(addr), 0)
+#        self.assertTrue(addr_size >= 0)
+#        bottom = array(c_ubyte, 1)
+#        self.assertEqual(xs.recv(xrep_socket, bottom, sizeof(bottom), 0), 0)
+#        body = array(c_ubyte, 1)
+#        self.assertEqual(xs.recv(xrep_socket, body, sizeof(body), 0), 1)
+#        # Send invalid reply.
+#        self.assertEqual(xs.send(xrep_socket, addr, addr_size, 0), addr_size)
+#        # Send valid reply.
+#        self.assertEqual(
+#            xs.send(xrep_socket, addr, addr_size, XS.SNDMORE), addr_size
+#        )
+#        self.assertEqual(xs.send(xrep_socket, bottom, 0, XS.SNDMORE), 0)
+#        self.assertEqual(xs.send(xrep_socket, b'b', 1, 0), 1)
+#        # Check whether we've got the valid reply.
+#        self.assertEqual(xs.recv(req_socket, body, sizeof(body), 0), 1)
+#        self.assertEqual(int2byte(body[0]), b'b')
+#        # Tear down the wiring.
+#        self.assertEqual(xs.close(xrep_socket), 0)
+#        self.assertEqual(xs.close(req_socket), 0)
+
+#    def test_survey(self):
 #        from crossroads.lowest import array
-#        cpp = CDLL(find_library('libcpp'), use_errno=True)
+#        from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
+#        XS, xs = twoget(self)
+#        buf = array(c_ubyte, 32)
 #        # Create the basic infrastructure.
 #        ctx = xs.init()
 #        self.assertTrue(ctx)
-#        push = xs.socket(ctx, XS.PUSH)
-#        self.assertTrue(push)
-#        pull = xs.socket(ctx, XS.PULL)
-#        self.assertTrue(push)
-#        # Bind the peer and get the message.
-#        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
-#        self.assertNotEqual(rc, -1)
-#        rc = xs.bind(push, b'tcp://127.0.0.1:5561')
-#        self.assertNotEqual(rc, -1)
-#        # Connect to the peer using raw sockets.
-#        rpush = socket.socket(
-#            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
-#        )
-#        rpush.connect(('127.0.0.1', 5560))
-#        rpull = socket.socket(
-#            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
-#        )
-#        rpull.connect(('127.0.0.1', 5561))
-#        # Let's send some data and check if it arrived
-#        rc = rpush.send(b'\x04\0abc', 0)
-#        self.assertEqual(rc, 5)
-#        buf = array(c_ubyte, 3)
-#        rc = xs.recv(pull, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        self.assertFalse(cpp.memcmp(buf, b'abc', 3))
-#        # Let's push this data into another socket
-#        rc = xs.send(push, buf, sizeof(buf), 0)
-#        self.assertEqual(rc, 3)
-#        buf2 = rpull.recv(3, 0)
-#        self.assertFalse(cpp.memcmp(buf2, b'\x04\0abc', 3))
-#        rpush.close()
-#        rpull.close()
-#        rc = xs.close(pull)
-#        self.assertEqual(rc, 0)
-#        rc = xs.close(push)
-#        self.assertEqual(rc, 0)
-#        rc = xs.term(ctx)
-#        self.assertEqual(rc, 0)
+#        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
+#        self.assertTrue(xsurveyor)
+#        self.assertNotEqual(xs.bind(xsurveyor, b'inproc://a'), -1)
+#        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
+#        self.assertTrue(xrespondent)
+#        self.assertNotEqual(xs.bind(xrespondent, b'inproc://b'), -1)
+#        surveyor = xs.socket(ctx, XS.SURVEYOR)
+#        self.assertTrue(surveyor)
+#        self.assertNotEqual(xs.connect(surveyor, b'inproc://b'), -1)
+#        respondent1 = xs.socket(ctx, XS.RESPONDENT)
+#        self.assertTrue(respondent1)
+#        self.assertNotEqual(xs.connect(respondent1, b'inproc://a'), -1)
+#        respondent2 = xs.socket(ctx, XS.RESPONDENT)
+#        self.assertTrue(respondent2)
+#        self.assertNotEqual(xs.connect(respondent2, b'inproc://a'), -1)
+#        # Send the survey.
+#        self.assertEqual(xs.send(surveyor, b'ABC', 3, 0), 3)
+#        # Forward the survey through the intermediate device.
+#        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xsurveyor, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xsurveyor, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 3)
+#        self.assertEqual(xs.send(xsurveyor, buf, 3, 0), 3)
+#        # Respondent 1 responds to the survey.
+#        self.assertEqual(xs.recv(respondent1, buf, sizeof(buf), 0), 3)
+#        self.assertEqual(xs.send(respondent1, b'DE', 2, 0), 2)
+#        # Forward the response through the intermediate device.
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 2)
+#        self.assertEqual(xs.send(xrespondent, buf, 2, 0), 2)
+#        # Surveyor gets the response.
+#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 2)
+#        # Respondent 2 responds to the survey.
+#        self.assertEqual(xs.recv(respondent2, buf, sizeof(buf), 0), 3)
+#        self.assertEqual(xs.send(respondent2, b'FGHI', 4, 0), 4)
+#        # Forward the response through the intermediate device.
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xsurveyor, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, 0), 4)
+#        # Surveyor gets the response.
+#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 4)
+#        # Now let's test whether survey timeout works as expected.
+#        timeout = c_int(100)
+#        self.assertEqual(xs.setsockopt(
+#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+#        ), 0)
+#        self.assertEqual(xs.send(surveyor, b'ABC', 3, 0), 3)
+#        watch = xs.stopwatch_start()
+#        try:
+#            xs.recv(surveyor, buf, sizeof(buf), 0)
+#        except xs.XSError:
+#            self.assertEqual(get_errno(), XS.ETIMEDOUT)
+#        self.time_assert(xs.stopwatch_stop(watch) / 1000, timeout.value)
+#        # Test whether responses for old surveys are discarded. First,
+#        # initiate new survey.
+#        self.assertEqual(xs.setsockopt(
+#            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+#        ), 0)
+#        self.assertEqual(xs.send(surveyor, b'DE', 2, 0), 2)
+#        # Read, process and reply to the old survey.
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 3)
+#        self.assertEqual(xs.send(xrespondent, buf, 3, 0), 3)
+#        # Read, process and reply to the new survey.
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 4)
+#        self.assertEqual(xs.send(xrespondent, buf, 4, XS.SNDMORE), 4)
+#        self.assertEqual(xs.recv(xrespondent, buf, sizeof(buf), 0), 2)
+#        self.assertEqual(xs.send(xrespondent, buf, 2, 0), 2)
+#        # Get the response and check it's the response to the new survey and
+#        # that response to the old survey was silently discarded.
+#        self.assertEqual(xs.recv(surveyor, buf, sizeof(buf), 0), 2)
+#        self.assertEqual(xs.close(respondent2), 0)
+#        self.assertEqual(xs.close(respondent1), 0)
+#        self.assertEqual(xs.close(surveyor), 0)
+#        self.assertEqual(xs.close(xrespondent), 0)
+#        self.assertEqual(xs.close(xsurveyor), 0)

tests/test_lowest_level.py

         self.XS = crossroads.constants
 
     def bounce(self, sb, sc):
-        from ctypes.util import find_library
-        from ctypes import CDLL, c_char_p, c_int, sizeof, byref, c_size_t
+        from ctypes import c_char_p, c_int, sizeof, byref, c_size_t, c_ubyte
+        from crossroads.lowest import array
         XS, xs = twoget(self)
         content = c_char_p(b'12345678ABCDEFGH12345678abcdefgh')
         #  Send the message.
-        rc = xs.send(sc, content, 32, XS.SNDMORE)
-        self.assertEqual(rc, 32)
-        rc = xs.send(sc, content, 32, 0)
-        self.assertEqual(rc, 32)
+        self.assertEqual(xs.send(sc, content, 32, XS.SNDMORE), 32)
+        self.assertEqual(xs.send(sc, content, 32, 0), 32)
         #  Bounce the message back.
-        buf1 = (c_char_p * 32)()
-        rc = xs.recv(sb, buf1, 32, 0)
-        self.assertEqual(rc, 32)
+        buf1 = array(c_ubyte, 32)
+        self.assertEqual(xs.recv(sb, buf1, 32, 0), 32)
         rcvmore = c_int()
         sz = c_size_t(sizeof(rcvmore))
-        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
-        self.assertEqual(rc, 0)
+        self.assertEqual(
+            xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+        )
         self.assertTrue(rcvmore)
-        rc = xs.recv(sb, buf1, 32, 0)
-        self.assertEqual(rc, 32)
-        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.recv(sb, buf1, 32, 0), 32)
+        self.assertEqual(
+            xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz)), 0,
+        )
         self.assertFalse(rcvmore)
-        rc = xs.send(sb, buf1, 32, XS.SNDMORE)
-        self.assertEqual(rc, 32)
-        rc = xs.send(sb, buf1, 32, 0)
-        self.assertEqual(rc, 32)
+        self.assertEqual(xs.send(sb, buf1, 32, XS.SNDMORE), 32)
+        self.assertEqual(xs.send(sb, buf1, 32, 0), 32)
         #  Receive the bounced message.
-        buf2 = (c_char_p * 32)()
-        rc = xs.recv(sc, buf2, 32, 0)
-        self.assertEqual(rc, 32)
-        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
-        self.assertEqual(rc, 0)
+        buf2 = array(c_ubyte, 32)
+        self.assertEqual(xs.recv(sc, buf2, 32, 0), 32)
+        self.assertEqual(
+            xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+        )
         self.assertTrue(rcvmore)
-        rc = xs.recv(sc, buf2, 32, 0)
-        self.assertEqual(rc, 32)
-        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.recv(sc, buf2, 32, 0), 32)
+        self.assertEqual(
+            xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
+        )
         self.assertFalse(rcvmore)
-        cpp = CDLL(find_library('libcpp'), use_errno=True)
         #  Check whether the message is still the same.
-        self.assertEqual(cpp.memcmp(buf2, content, 32), 0)
+        self.assertEqual(bytearray(buf2), content.value)
 
     def time_assert(self, actual, expected):
         '''
             actual > ((expected) - 50) and actual < ((expected) + 500)
         )
 
-#extern "C"
-#{
-#    static void *thread_routine (void *arg_)
-#    {
-#        arg_t *arg = (arg_t*) arg_;
-#        arg->fn (arg->arg);
-#        return NULL;
-#    }
-#}
-#
-#void *thread_create (void (*fn_) (void *arg_), void *arg_)
-#{
-#    arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
-#    assert (arg);
-#    arg->fn = fn_;
-#    arg->arg = arg_;
-#    int rc = pthread_create (&arg->handle, NULL, thread_routine, (void*) arg);
-#    assert (rc == 0);
-#    return (void*) arg;
-#}
-#
-#void thread_join (void *thread_)
-#{
-#    arg_t *arg = (arg_t*) thread_;
-#    int rc = pthread_join (arg->handle, NULL);
-#    assert (rc == 0);
-#    free (arg);
-#}
-
-    def test_survey(self):
-        from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
-        XS, xs = twoget(self)
-        buf = (c_ubyte * 32)()
-        # Create the basic infrastructure.
-        ctx = xs.init()
-        self.assertTrue(ctx)
-        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
-        self.assertTrue(xsurveyor)
-        rc = xs.bind(xsurveyor, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
-        self.assertTrue(xrespondent)
-        rc = xs.bind(xrespondent, b'inproc://b')
-        self.assertNotEqual(rc, -1)
-        surveyor = xs.socket(ctx, XS.SURVEYOR)
-        self.assertTrue(surveyor)
-        rc = xs.connect(surveyor, b'inproc://b')
-        self.assertNotEqual(rc, -1)
-        respondent1 = xs.socket(ctx, XS.RESPONDENT)
-        self.assertTrue(respondent1)
-        rc = xs.connect(respondent1, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        respondent2 = xs.socket(ctx, XS.RESPONDENT)
-        self.assertTrue(respondent2)
-        rc = xs.connect(respondent2, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        # Send the survey.
-        rc = xs.send(surveyor, b'ABC', 3, 0)
-        self.assertEqual(rc, 3)
-        # Forward the survey through the intermediate device.
-        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 3)
-        rc = xs.send(xsurveyor, buf, 3, 0)
-        self.assertEqual(rc, 3)
-        # Respondent 1 responds to the survey.
-        rc = xs.recv(respondent1, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 3)
-        rc = xs.send(respondent1, b'DE', 2, 0)
-        self.assertEqual(rc, 2)
-        # Forward the response through the intermediate device.
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 2)
-        rc = xs.send(xrespondent, buf, 2, 0)
-        self.assertEqual(rc, 2)
-        # Surveyor gets the response.
-        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 2)
-        # Respondent 2 responds to the survey.
-        rc = xs.recv(respondent2, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 3)
-        rc = xs.send(respondent2, b'FGHI', 4, 0)
-        self.assertEqual(rc, 4)
-        # Forward the response through the intermediate device.
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, 0)
-        self.assertEqual(rc, 4)
-        # Surveyor gets the response.
-        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        # Now let's test whether survey timeout works as expected.
-        timeout = c_int(100)
-        rc = xs.setsockopt(
-            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-        )
-        self.assertEqual(rc, 0)
-        rc = xs.send(surveyor, b'ABC', 3, 0)
-        self.assertEqual(rc, 3)
-        watch = xs.stopwatch_start()
-        try:
-            rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-        except xs.XSError:
-            self.assertEqual(get_errno(), XS.ETIMEDOUT)
-        elapsed = xs.stopwatch_stop(watch) / 1000
-        self.time_assert(elapsed, timeout.value)
-        # Test whether responses for old surveys are discarded. First,
-        # initiate new survey.
-        rc = xs.setsockopt(
-            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
-        )
-        self.assertEqual(rc, 0)
-        rc = xs.send(surveyor, b'DE', 2, 0)
-        self.assertEqual(rc, 2)
-        # Read, process and reply to the old survey.
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 3)
-        rc = xs.send(xrespondent, buf, 3, 0)
-        self.assertEqual(rc, 3)
-        # Read, process and reply to the new survey.
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 4)
-        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
-        self.assertEqual(rc, 4)
-        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 2)
-        rc = xs.send(xrespondent, buf, 2, 0)
-        self.assertEqual(rc, 2)
-        # Get the response and check it's the response to the new survey and
-        # that response to the old survey was silently discarded.
-        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 2)
-        rc = xs.close(respondent2)
-        self.assertEqual(rc, 0)
-        rc = xs.close(respondent1)
-        self.assertEqual(rc, 0)
-        rc = xs.close(surveyor)
-        self.assertEqual(rc, 0)
-        rc = xs.close(xrespondent)
-        self.assertEqual(rc, 0)
-        rc = xs.close(xsurveyor)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
-
     def test_empty(self):
-        _, xs = twoget(self)
+        xs = self.xs
         # This is a very simple test to check whether everything works OK when
         # context is terminated even before I/O threads were launched.
         ctx = xs.init()
         self.assertTrue(ctx)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.term(ctx), 0)
 
-    def test_reconnect(self):
-        import sys
-        from time import sleep
-        from ctypes import sizeof, c_ubyte
+    def test_max_sockets(self):
+        from ctypes import byref, sizeof, c_int
         XS, xs = twoget(self)
-        #  Create the basic infrastructure.
+        # Create context and set MAX_SOCKETS to 1.
         ctx = xs.init()
         self.assertTrue(ctx)
-        push = xs.socket(ctx, XS.PUSH)
-        self.assertTrue(push)
-        pull = xs.socket(ctx, XS.PULL)
-        self.assertTrue(push)
-        # Connect before bind was done at the peer and send one message.
-        rc = xs.connect(push, b'tcp://127.0.0.1:5560')
-        self.assertNotEqual(rc, -1)
-        rc = xs.send(push, b'ABC', 3, 0)
-        self.assertEqual(rc, 3)
-        # Wait a while for few attempts to reconnect to happen.
-        sleep(1)
-        # Bind the peer and get the message.
-        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
-        self.assertNotEqual(rc, -1)
-        buf = (c_ubyte * 3)()
-        rc = xs.recv(pull, buf, sizeof(buf), 0)
-        self.assertEqual(rc, 3)
+        max_sockets = c_int(1)
+        self.assertEqual(xs.setctxopt(
+            ctx, XS.MAX_SOCKETS, byref(max_sockets), sizeof(max_sockets)
+        ), 0)
+        # First socket should be created OK.
+        s1 = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s1)
+        # Creation of second socket should fail.
+        try:
+            xs.socket(ctx, XS.PUSH)
+        except xs.XSError as e:
+            self.assertEqual(e.errno, XS.EMFILE)
         # Clean up.
-        rc = xs.close(push)
-        self.assertEqual(rc, 0)
-        rc = xs.close(pull)
-        self.assertEqual(rc, 0)
-        if sys.platform not in ('win32', 'cygwin'):
-            # Now, let's test the same scenario with IPC.
-            push = xs.socket(ctx, XS.PUSH)
-            self.assertTrue(push)
-            pull = xs.socket(ctx, XS.PULL)
-            self.assertTrue(push)
-            # Connect before bind was done at the peer and send one message.
-            rc = xs.connect(push, b'ipc:///tmp/tester')
-            self.assertNotEqual(rc, -1)
-            rc = xs.send(push, b'ABC', 3, 0)
-            self.assertEqual(rc, 3)
-            # Wait a while for few attempts to reconnect to happen.
-            sleep(1)
-            # Bind the peer and get the message.
-            rc = xs.bind(pull, b'ipc:///tmp/tester')
-            self.assertNotEqual(rc, -1)
-            rc = xs.recv(pull, buf, sizeof(buf), 0)
-            self.assertEqual(rc, 3)
-            # Clean up.
-            rc = xs.close(push)
-            self.assertEqual(rc, 0)
-            rc = xs.close(pull)
-            self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.close(s1), 0)
+        self.assertEqual(xs.term(ctx), 0)
 
-    def test_hwn(self):
-        from ctypes import sizeof, byref, c_int, get_errno, c_ubyte
+    def test_pair_ipc(self):
         XS, xs = twoget(self)
         ctx = xs.init()
         self.assertTrue(ctx)
-        # Create pair of socket, each with high watermark of 2. Thus the total
-        # buffer space should be 4 messages.
-        sb = xs.socket(ctx, XS.PULL)
+        sb = xs.socket(ctx, XS.PAIR)
         self.assertTrue(sb)
-        hwm = c_int(2)
-        rc = xs.setsockopt(sb, XS.RCVHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.bind(sb, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        sc = xs.socket(ctx, XS.PUSH)
+        self.assertNotEqual(xs.bind(sb, b'ipc:///tmp/tester'), -1)
+        sc = xs.socket(ctx, XS.PAIR)
         self.assertTrue(sc)
-        rc = xs.setsockopt(sc, XS.SNDHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.connect(sc, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        # Try to send 10 messages. Only 4 should succeed.
-        for t in xrange(10):
-            try:
-                rc = xs.send(sc, None, 0, XS.DONTWAIT)
-                if t < 4:
-                    self.assertEqual(rc, 0)
-            except xs.XSError:
-                self.assertTrue(get_errno(), XS.EAGAIN)
-        # There should be now 4 messages pending, consume them.
-        for i in xrange(4):
-            rc = xs.recv(sb, None, 0, 0)
-            self.assertEqual(rc, 0)
-        # Now it should be possible to send one more.
-        rc = xs.send(sc, None, 0, 0)
-        self.assertEqual(rc, 0)
-        # Consume the remaining message.
-        rc = xs.recv(sb, None, 0, 0)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sc)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sb)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
-        # Following part of the tests checks whether small HWMs don't interact
-        # with command throttling in strange ways.
+        self.assertNotEqual(xs.connect(sc, b'ipc:///tmp/tester'), -1)
+        self.bounce(sb, sc)
+        self.assertEqual(xs.close(sc), 0)
+        self.assertEqual(xs.close(sb), 0)
+        self.assertEqual(xs.term(ctx), 0)
+
+    def test_pair_inproc(self):
+        XS, xs = twoget(self)
         ctx = xs.init()
         self.assertTrue(ctx)
-        s1 = xs.socket(ctx, XS.PULL)
-        self.assertTrue(s1)
-        s2 = xs.socket(ctx, XS.PUSH)
-        self.assertTrue(s2)
-        hwm = c_int(5)
-        rc = xs.setsockopt(s2, XS.SNDHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.bind(s1, b'tcp://127.0.0.1:5858')
-        self.assertTrue(rc >= 0)
-        rc = xs.connect(s2, b'tcp://127.0.0.1:5858')
-        self.assertTrue(rc >= 0)
-        for i in xrange(10):
-            rc = xs.send(s2, b'test', 4, XS.DONTWAIT)
-            self.assertEqual(rc, 4)
-            buf = (c_ubyte * 4)()
-            rc = xs.recv(s1, buf, sizeof(buf), 0)
-            self.assertEqual(rc, 4)
-        rc = xs.close(s2)
-        self.assertEqual(rc, 0)
-        rc = xs.close(s1)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        self.assertNotEqual(xs.bind(sb, 'inproc://a'), -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        self.assertNotEqual(xs.connect(sc, 'inproc://a'), -1)
+        self.bounce(sb, sc)
+        self.assertEqual(xs.close(sc), 0)
+        self.assertEqual(xs.close(sb), 0)
+        self.assertEqual(xs.term(ctx), 0)
 
     def test_pair_tcp(self):
         from time import sleep
         self.assertTrue(ctx)
         sb = xs.socket(ctx, XS.PAIR)
         self.assertTrue(sb)
-        rc = xs.bind(sb, b'tcp://127.0.0.1:5560')
-        self.assertNotEqual(rc, -1)
+        self.assertNotEqual(xs.bind(sb, b'tcp://127.0.0.1:5560'), -1)
         sc = xs.socket(ctx, XS.PAIR)
         self.assertTrue(sc)
-        rc = xs.connect(sc, b'tcp://127.0.0.1:5560')
-        self.assertNotEqual(rc, -1)
+        self.assertNotEqual(xs.connect(sc, b'tcp://127.0.0.1:5560'), -1)
         self.bounce(sb, sc)
         # Now let's try to open one more connection to the bound socket. The
         # connection should be silently rejected rather than causing error.
         sc2 = xs.socket(ctx, XS.PAIR)
         self.assertTrue(sc2)
-        rc = xs.connect(sc2, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(xs.connect(sc2, b'tcp://127.0.0.1:5560'), -1)
         sleep(1)
-        rc = xs.close(sc2)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sc)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sb)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.close(sc2), 0)
+        self.assertEqual(xs.close(sc), 0)
+        self.assertEqual(xs.close(sb), 0)
+        self.assertEqual(xs.term(ctx), 0)
 
-    def test_pair_ipc(self):
+    def test_regrep_inproc(self):
         XS, xs = twoget(self)
         ctx = xs.init()
         self.assertTrue(ctx)
-        sb = xs.socket(ctx, XS.PAIR)
+        sb = xs.socket(ctx, XS.REP)
         self.assertTrue(sb)
-        rc = xs.bind(sb, b'ipc:///tmp/tester')
-        self.assertNotEqual(rc, -1)
-        sc = xs.socket(ctx, XS.PAIR)
+        self.assertNotEqual(xs.bind(sb, b'inproc://a'), -1)
+        sc = xs.socket(ctx, XS.REQ)
         self.assertTrue(sc)
-        rc = xs.connect(sc, b'ipc:///tmp/tester')
-        self.assertNotEqual(rc, -1)
+        self.assertNotEqual(xs.connect(sc, b'inproc://a'), -1)
         self.bounce(sb, sc)
-        rc = xs.close(sc)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sb)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
+        self.assertEqual(xs.close(s