Commits

Lynn Rees committed f5f8ac8

- more tests

Comments (0)

Files changed (25)

 
 from functools import partial
 from ctypes.util import find_library
-from ctypes import Structure, CDLL, POINTER, memmove, byref, get_errno
+from ctypes import Structure, CDLL, POINTER, memmove, get_errno  # , byref,
 from ctypes import (
     c_void_p as void, c_int as cint, c_char_p as char, c_ubyte as ubyte,
-    c_size_t as size_t, c_short as short)
+    c_size_t as size_t, c_short as short, c_ulong)
 
 
 class XSBaseError(Exception):
 
 
 XS = CDLL(find_library('xs'), use_errno=True)
+array = lambda ctype, length: (ctype * length)()
 factory = partial(_factory, XS, 'xs')
 intfactory = partial(_factory, XS, 'xs', cint)
 voidfactory = partial(_factory, XS, 'xs', void)
 msg_size = factory(size_t, 'msg_size', POINTER(msg_t))
 
 # int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, size_t *optvallen)
-getmsgopt = intfactory('getmsgopt', POINTER(msg_t), cint, void, size_t)
+getmsgopt = intfactory('getmsgopt', POINTER(msg_t), cint, void, POINTER(size_t))
 
 ###############################################################################
 ## CROSSROADS CONTEXT DEFINITION ##############################################
 
 # int xs_poll (xs_pollitem_t *items, int nitems, int timeout);
 poll = intfactory('poll', POINTER(pollitem_t), cint, cint)
+
+# Starts the stopwatch. Returns the handle to the watch.
+# void *xs_stopwatch_start (void);
+stopwatch_start = voidfactory('stopwatch_start')
+
+# Stops the stopwatch. Returns the number of microseconds elapsed since the
+# stopwatch was started.
+# unsigned long xs_stopwatch_stop (void *watch);
+stopwatch_stop = factory(c_ulong, 'stopwatch_stop', void)

crossroads/constants.py

 
 # number random enough not to collide with different errno ranges on different
 # OSes. The assumption is that error_t is at least 32-bit type.
-XS_HAUSNUMERO = 156384712
+HAUSNUMERO = 156384712
 # NOTE: on Windows platforms some of the standard POSIX errnos are not defined
-ENOTSUP = XS_HAUSNUMERO + 1
-EPROTONOSUPPORT = XS_HAUSNUMERO + 2
-ENOBUFS = XS_HAUSNUMERO + 3
-ENETDOWN = XS_HAUSNUMERO + 4
-EADDRINUSE = XS_HAUSNUMERO + 5
-EADDRNOTAVAIL = XS_HAUSNUMERO + 6
-ECONNREFUSED = XS_HAUSNUMERO + 7
-EINPROGRESS = XS_HAUSNUMERO + 8
-ENOTSOCK = XS_HAUSNUMERO + 9
-EAFNOSUPPORT = XS_HAUSNUMERO + 10
+ENOTSUP = HAUSNUMERO + 1
+EPROTONOSUPPORT = HAUSNUMERO + 2
+ENOBUFS = HAUSNUMERO + 3
+ENETDOWN = HAUSNUMERO + 4
+EADDRINUSE = HAUSNUMERO + 5
+EADDRNOTAVAIL = HAUSNUMERO + 6
+ECONNREFUSED = HAUSNUMERO + 7
+EINPROGRESS = HAUSNUMERO + 8
+ENOTSOCK = HAUSNUMERO + 9
+EAFNOSUPPORT = HAUSNUMERO + 10
 
 # native Crossroads error codes
-EFSM = XS_HAUSNUMERO + 51
-ENOCOMPATPROTO = XS_HAUSNUMERO + 52
-ETERM = XS_HAUSNUMERO + 53
+EFSM = HAUSNUMERO + 51
+ENOCOMPATPROTO = HAUSNUMERO + 52
+ETERM = HAUSNUMERO + 53
 
 ###############################################################################
 # CROSSROADS CONTEXT DEFINITION ###############################################
 ###############################################################################
 
-XS_MAX_SOCKETS = 1
-XS_IO_THREADS = 2
-XS_PLUGIN = 3
+MAX_SOCKETS = 1
+IO_THREADS = 2
+PLUGIN = 3
 
 ###############################################################################
 ## CROSSROADS SOCKET DEFINITION ###############################################
 int64_sockopts = [AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RCVMORE]
 int_sockopts = [FD, EVENTS, TYPE, LINGER, RECONNECT_IVL, BACKLOG]
 
-from errno import EAGAIN, EFAULT, EINVAL, ENETDOWN, ENOBUFS, ENODEV, ENOMEM  # @UnusedImport
+from errno import (
+    EAGAIN, EFAULT, EINVAL, ENETDOWN, ENOBUFS, ENODEV, ENOMEM, ETIMEDOUT,
+    EMFILE)  # @UnusedImport

tests/test_backlog.py

-/*
-    Copyright (c) 2012 250bpm s.r.o.
-    Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+from stuf.six import unittest
 
-    This file is part of Crossroads I/O project.
 
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-#define BACKLOG 10
-#define PARALLEL_CONNECTS 50
+BACKLOG = 10
+PARALLEL_CONNECTS = 50
 
 #if 0
 
 }
 #endif
 
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "backlog test running...\n");
+class TestBacklog(unittest.TestCase):
+
 
 #if 0
     int rc;
     int backlog;
     void *threads [PARALLEL_CONNECTS];
 
-    //  Test the exhaustion on IPC backlog.
+    #  Test the exhaustion on IPC backlog.
 #if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
     ctx = xs_init ();
     assert (ctx);

tests/test_emptyctx.py

-
-from stuf.six import unittest
-
-
-class TestEmptyCTX(unittest.TestCase):
-
-    def test_empty(self):
-        from crossroads.api import init, term
-        # This is a very simple test to check whether everything works OK when
-        # context is terminated even before I/O threads were launched.
-        ctx = init()
-        self.assertTrue(ctx)
-        rc = term(ctx)
-        self.assertEqual(rc, 0)

tests/test_hwn.py

-from stuf.six import unittest
-
-
-#include "testutil.hpp"
-
-class TestHWN(unittest.TestCase):
-
-    def test_hwn(self):
-        from ctypes import sizeof, byref, c_int, get_errno, c_ubyte
-        import crossroads.constants as XS
-        import crossroads.api as xs
-        ctx = xs.init()
-        self.assertTrue(ctx)
-        # Create pair of socket, each with high watermark of 2. Thus the total
-        # buffer space should be 4 messages.
-        sb = xs.socket(ctx, XS.PULL)
-        self.assertTrue(sb)
-        hwm = c_int(2)
-        rc = xs.setsockopt(sb, XS.RCVHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.bind(sb, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        sc = xs.socket(ctx, XS.PUSH)
-        self.assertTrue(sc)
-        rc = xs.setsockopt(sc, XS.SNDHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.connect(sc, b'inproc://a')
-        self.assertNotEqual(rc, -1)
-        # Try to send 10 messages. Only 4 should succeed.
-        for t in xrange(10):
-            try:
-                rc = xs.send(sc, None, 0, XS.DONTWAIT)
-                if t < 4:
-                    self.assertEqual(rc, 0)
-                else:
-                    self.assertTrue(rc < 0 and get_errno() == XS.EAGAIN)
-            except xs.XSError:
-                pass
-        # There should be now 4 messages pending, consume them.
-        for i in xrange(4):
-            rc = xs.recv(sb, None, 0, 0)
-            self.assertEqual(rc, 0)
-        # Now it should be possible to send one more.
-        rc = xs.send(sc, None, 0, 0)
-        self.assertEqual(rc, 0)
-        # Consume the remaining message.
-        rc = xs.recv(sb, None, 0, 0)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sc)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sb)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)
-        # Following part of the tests checks whether small HWMs don't interact
-        # with command throttling in strange ways.
-        ctx = xs.init()
-        self.assertTrue(ctx)
-        s1 = xs.socket(ctx, XS.PULL)
-        self.assertTrue(s1)
-        s2 = xs.socket(ctx, XS.PUSH)
-        self.assertTrue(s2)
-        hwm = c_int(5)
-        rc = xs.setsockopt(s2, XS.SNDHWM, byref(hwm), sizeof(hwm))
-        self.assertEqual(rc, 0)
-        rc = xs.bind(s1, b'tcp://127.0.0.1:5858')
-        self.assertTrue(rc >= 0)
-        rc = xs.connect(s2, b'tcp://127.0.0.1:5858')
-        self.assertTrue(rc >= 0)
-        for i in xrange(10):
-            rc = xs.send(s2, b'test', 4, XS.DONTWAIT)
-            self.assertEqual(rc, 4)
-            buf = (c_ubyte * 4)()
-            rc = xs.recv(s1, buf, sizeof(buf), 0)
-            self.assertEqual(rc, 4)
-        rc = xs.close(s2)
-        self.assertEqual(rc, 0)
-        rc = xs.close(s1)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)

tests/test_invalid_rep.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2011 VMware, Inc.
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "invalid_rep test running...\n");
-
-    //  Create REQ/XREP wiring.
-    void *ctx = xs_init ();
-    assert (ctx);
-    void *xrep_socket = xs_socket (ctx, XS_XREP);
-    assert (xrep_socket);
-    void *req_socket = xs_socket (ctx, XS_REQ);
-    assert (req_socket);
-    int linger = 0;
-    int rc = xs_setsockopt (xrep_socket, XS_LINGER, &linger, sizeof (int));
-    assert (rc == 0);
-    rc = xs_setsockopt (req_socket, XS_LINGER, &linger, sizeof (int));
-    assert (rc == 0);
-    rc = xs_bind (xrep_socket, "inproc://hi");
-    assert (rc != -1);
-    rc = xs_connect (req_socket, "inproc://hi");
-    assert (rc != -1);
-
-    //  Initial request.
-    rc = xs_send (req_socket, "r", 1, 0);
-    assert (rc == 1);
-
-    //  Receive the request.
-    char addr [32];
-    int addr_size;
-    char bottom [1];
-    char body [1];
-    addr_size = xs_recv (xrep_socket, addr, sizeof (addr), 0);
-    assert (addr_size >= 0);
-    rc = xs_recv (xrep_socket, bottom, sizeof (bottom), 0);
-    assert (rc == 0);
-    rc = xs_recv (xrep_socket, body, sizeof (body), 0);
-    assert (rc == 1);
-
-    //  Send invalid reply.
-    rc = xs_send (xrep_socket, addr, addr_size, 0);
-    assert (rc == addr_size);
-
-    //  Send valid reply.
-    rc = xs_send (xrep_socket, addr, addr_size, XS_SNDMORE);
-    assert (rc == addr_size);
-    rc = xs_send (xrep_socket, bottom, 0, XS_SNDMORE);
-    assert (rc == 0);
-    rc = xs_send (xrep_socket, "b", 1, 0);
-    assert (rc == 1);
-
-    //  Check whether we've got the valid reply.
-    rc = xs_recv (req_socket, body, sizeof (body), 0);
-    assert (rc == 1);
-	assert (body [0] == 'b');
-
-    //  Tear down the wiring.
-    rc = xs_close (xrep_socket);
-    assert (rc == 0);
-    rc = xs_close (req_socket);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0;
-}
-

tests/test_linger.py

-/*
-    Copyright (c) 2012 250bpm s.r.o.
-    Copyright (c) 2012 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "linger test running...\n");
-
-    //  Create socket.
-    void *ctx = xs_init ();
-    assert (ctx);
-    void *s = xs_socket (ctx, XS_PUSH);
-    assert (s);
-
-    //  Set linger to 0.1 second.
-    int linger = 100;
-    int rc = xs_setsockopt (s, XS_LINGER, &linger, sizeof (int));
-
-    //  Connect to non-existent endpoing.
-    assert (rc == 0);
-    rc = xs_connect (s, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-
-    //  Send a message.
-    rc = xs_send (s, "r", 1, 0);
-    assert (rc == 1);
-
-    //  Close the socket.
-    rc = xs_close (s);
-    assert (rc == 0);
-
-    //  Terminate the context. This should take 0.1 second.
-    void *watch = xs_stopwatch_start ();
-    rc = xs_term (ctx);
-    assert (rc == 0);
-    int ms = (int) xs_stopwatch_stop (watch) / 1000;
-    time_assert (ms, linger);
-
-    return 0;
-}

tests/test_lowest_level.py

+from stuf.six import unittest
+
+from operator import attrgetter
+
+twoget = attrgetter(*'XS xs'.split())
+
+
+class TestLowestLevel(unittest.TestCase):
+
+    def setUp(self):
+        import crossroads.api
+        self.xs = crossroads.api
+        import crossroads.constants
+        self.XS = crossroads.constants
+
+    def bounce(self, sb, sc):
+        from ctypes.util import find_library
+        from ctypes import CDLL, c_char_p, c_int, sizeof, byref, c_size_t
+        XS, xs = twoget(self)
+        content = c_char_p(b'12345678ABCDEFGH12345678abcdefgh')
+        #  Send the message.
+        rc = xs.send(sc, content, 32, XS.SNDMORE)
+        self.assertEqual(rc, 32)
+        rc = xs.send(sc, content, 32, 0)
+        self.assertEqual(rc, 32)
+        #  Bounce the message back.
+        buf1 = (c_char_p * 32)()
+        rc = xs.recv(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        rcvmore = c_int()
+        sz = c_size_t(sizeof(rcvmore))
+        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        rc = xs.send(sb, buf1, 32, XS.SNDMORE)
+        self.assertEqual(rc, 32)
+        rc = xs.send(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        #  Receive the bounced message.
+        buf2 = (c_char_p * 32)()
+        rc = xs.recv(sc, buf2, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(sc, buf2, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        #  Check whether the message is still the same.
+        self.assertEqual(cpp.memcmp(buf2, content, 32), 0)
+
+    def time_assert(self, actual, expected):
+        '''
+        Check whether measured time is the expected time (in milliseconds). The
+        upper tolerance is 1/2 sec so that the test doesn't fail even on very
+        slow or very loaded systems.
+        '''
+        self.assertTrue(
+            actual > ((expected) - 50) and actual < ((expected) + 500)
+        )
+
+    def test_survey(self):
+        from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
+        XS, xs = twoget(self)
+        buf = (c_ubyte * 32)()
+        # Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
+        self.assertTrue(xsurveyor)
+        rc = xs.bind(xsurveyor, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
+        self.assertTrue(xrespondent)
+        rc = xs.bind(xrespondent, b'inproc://b')
+        self.assertNotEqual(rc, -1)
+        surveyor = xs.socket(ctx, XS.SURVEYOR)
+        self.assertTrue(surveyor)
+        rc = xs.connect(surveyor, b'inproc://b')
+        self.assertNotEqual(rc, -1)
+        respondent1 = xs.socket(ctx, XS.RESPONDENT)
+        self.assertTrue(respondent1)
+        rc = xs.connect(respondent1, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        respondent2 = xs.socket(ctx, XS.RESPONDENT)
+        self.assertTrue(respondent2)
+        rc = xs.connect(respondent2, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Send the survey.
+        rc = xs.send(surveyor, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        # Forward the survey through the intermediate device.
+        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(xsurveyor, buf, 3, 0)
+        self.assertEqual(rc, 3)
+        # Respondent 1 responds to the survey.
+        rc = xs.recv(respondent1, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(respondent1, b'DE', 2, 0)
+        self.assertEqual(rc, 2)
+        # Forward the response through the intermediate device.
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.send(xrespondent, buf, 2, 0)
+        self.assertEqual(rc, 2)
+        # Surveyor gets the response.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        # Respondent 2 responds to the survey.
+        rc = xs.recv(respondent2, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(respondent2, b'FGHI', 4, 0)
+        self.assertEqual(rc, 4)
+        # Forward the response through the intermediate device.
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, 0)
+        self.assertEqual(rc, 4)
+        # Surveyor gets the response.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        # Now let's test whether survey timeout works as expected.
+        timeout = c_int(100)
+        rc = xs.setsockopt(
+            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.send(surveyor, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        watch = xs.stopwatch_start()
+        try:
+            rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        except xs.XSError:
+            self.assertEqual(get_errno(), XS.ETIMEDOUT)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, timeout.value)
+        # Test whether responses for old surveys are discarded. First,
+        # initiate new survey.
+        rc = xs.setsockopt(
+            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.send(surveyor, b'DE', 2, 0)
+        self.assertEqual(rc, 2)
+        # Read, process and reply to the old survey.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(xrespondent, buf, 3, 0)
+        self.assertEqual(rc, 3)
+        # Read, process and reply to the new survey.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.send(xrespondent, buf, 2, 0)
+        self.assertEqual(rc, 2)
+        # Get the response and check it's the response to the new survey and
+        # that response to the old survey was silently discarded.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.close(respondent2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(respondent1)
+        self.assertEqual(rc, 0)
+        rc = xs.close(surveyor)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xrespondent)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xsurveyor)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_empty(self):
+        _, xs = twoget(self)
+        # This is a very simple test to check whether everything works OK when
+        # context is terminated even before I/O threads were launched.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_reconnect(self):
+        import sys
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        #  Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(push)
+        # Connect before bind was done at the peer and send one message.
+        rc = xs.connect(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        # Wait a while for few attempts to reconnect to happen.
+        sleep(1)
+        # Bind the peer and get the message.
+        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        buf = (c_ubyte * 3)()
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Clean up.
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        if sys.platform not in ('win32', 'cygwin'):
+            # Now, let's test the same scenario with IPC.
+            push = xs.socket(ctx, XS.PUSH)
+            self.assertTrue(push)
+            pull = xs.socket(ctx, XS.PULL)
+            self.assertTrue(push)
+            # Connect before bind was done at the peer and send one message.
+            rc = xs.connect(push, b'ipc:///tmp/tester')
+            self.assertNotEqual(rc, -1)
+            rc = xs.send(push, b'ABC', 3, 0)
+            self.assertEqual(rc, 3)
+            # Wait a while for few attempts to reconnect to happen.
+            sleep(1)
+            # Bind the peer and get the message.
+            rc = xs.bind(pull, b'ipc:///tmp/tester')
+            self.assertNotEqual(rc, -1)
+            rc = xs.recv(pull, buf, sizeof(buf), 0)
+            self.assertEqual(rc, 3)
+            # Clean up.
+            rc = xs.close(push)
+            self.assertEqual(rc, 0)
+            rc = xs.close(pull)
+            self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_hwn(self):
+        from ctypes import sizeof, byref, c_int, get_errno, c_ubyte
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create pair of socket, each with high watermark of 2. Thus the total
+        # buffer space should be 4 messages.
+        sb = xs.socket(ctx, XS.PULL)
+        self.assertTrue(sb)
+        hwm = c_int(2)
+        rc = xs.setsockopt(sb, XS.RCVHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(sc)
+        rc = xs.setsockopt(sc, XS.SNDHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Try to send 10 messages. Only 4 should succeed.
+        for t in xrange(10):
+            try:
+                rc = xs.send(sc, None, 0, XS.DONTWAIT)
+                if t < 4:
+                    self.assertEqual(rc, 0)
+            except xs.XSError:
+                self.assertTrue(get_errno(), XS.EAGAIN)
+        # There should be now 4 messages pending, consume them.
+        for i in xrange(4):
+            rc = xs.recv(sb, None, 0, 0)
+            self.assertEqual(rc, 0)
+        # Now it should be possible to send one more.
+        rc = xs.send(sc, None, 0, 0)
+        self.assertEqual(rc, 0)
+        # Consume the remaining message.
+        rc = xs.recv(sb, None, 0, 0)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        # Following part of the tests checks whether small HWMs don't interact
+        # with command throttling in strange ways.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        s1 = xs.socket(ctx, XS.PULL)
+        self.assertTrue(s1)
+        s2 = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s2)
+        hwm = c_int(5)
+        rc = xs.setsockopt(s2, XS.SNDHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.bind(s1, b'tcp://127.0.0.1:5858')
+        self.assertTrue(rc >= 0)
+        rc = xs.connect(s2, b'tcp://127.0.0.1:5858')
+        self.assertTrue(rc >= 0)
+        for i in xrange(10):
+            rc = xs.send(s2, b'test', 4, XS.DONTWAIT)
+            self.assertEqual(rc, 4)
+            buf = (c_ubyte * 4)()
+            rc = xs.recv(s1, buf, sizeof(buf), 0)
+            self.assertEqual(rc, 4)
+        rc = xs.close(s2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(s1)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_tcp(self):
+        from time import sleep
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        # Now let's try to open one more connection to the bound socket. The
+        # connection should be silently rejected rather than causing error.
+        sc2 = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc2)
+        rc = xs.connect(sc2, b'tcp://127.0.0.1:5560')
+        sleep(1)
+        rc = xs.close(sc2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_ipc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_sub_forward(self):
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # First, create an intermediate device.
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        xsub = xs.socket(ctx, XS.XSUB)
+        self.assertTrue(xsub)
+        rc = xs.bind(xsub, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a publisher.
+        pub = xs.socket(ctx, XS.PUB)
+        self.assertTrue(pub)
+        rc = xs.connect(pub, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a subscriber.
+        sub = xs.socket(ctx, XS.SUB)
+        self.assertTrue(sub)
+        rc = xs.connect(sub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Subscribe for all messages.
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, '', 0)
+        self.assertEqual(rc, 0)
+        # Pass the subscription upstream through the device.
+        buff = (c_ubyte * 32)()
+        rc = xs.recv(xpub, buff, sizeof(buff), 0)
+        self.assertTrue(rc >= 0)
+        rc = xs.send(xsub, buff, rc, 0)
+        self.assertTrue(rc >= 0)
+        # Wait a bit till the subscription gets to the publisher.
+        sleep(1)
+        # Send an empty message.
+        rc = xs.send(pub, None, 0, 0)
+        self.assertEqual(rc, 0)
+        # Pass the message downstream through the device.
+        rc = xs.recv(xsub, buff, sizeof(buff), 0)
+        self.assertTrue(rc >= 0)
+        rc = xs.send(xpub, buff, rc, 0)
+        self.assertTrue(rc >= 0)
+        # Receive the message in the subscriber.
+        rc = xs.recv(sub, buff, sizeof(buff), 0)
+        self.assertEqual(rc, 0)
+        # Clean up.
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xsub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(pub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sub)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_inproc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, 'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, 'inproc://a')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_shutdown(self):
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        buf = (c_ubyte * 32)()
+        # Create infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        push_id = xs.bind(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(push_id, -1)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(pull)
+        rc = xs.connect(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Pass one message through to ensure the connection is established.
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Shut down the bound endpoint.
+        rc = xs.shutdown(push, push_id)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        try:
+            # Check that sending would block (there's no outbound connection).
+            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
+        except xs.XSError:
+            self.assertTrue(xs.errno(), XS.EAGAIN)
+        # Clean up.
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        # Now the other way round...Create infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(pull)
+        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        push_id = xs.connect(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(push_id, -1)
+        # Pass one message through to ensure the connection is established.
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Shut down the bound endpoint.
+        rc = xs.shutdown(push, push_id)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        try:
+            # Check that sending would block (there's no outbound connection).
+            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
+        except xs.XSError:
+            self.assertTrue(xs.errno(), XS.EAGAIN)
+        # Clean up.
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_linger(self):
+        from ctypes import byref, sizeof, c_int
+        XS, xs = twoget(self)
+        #  Create socket.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        s = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s)
+        #  Set linger to 0.1 second.
+        linger = c_int(100)
+        rc = xs.setsockopt(s, XS.LINGER, byref(linger), sizeof(linger))
+        #  Connect to non-existent endpoing.
+        self.assertEqual(rc, 0)
+        rc = xs.connect(s, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        #  Send a message.
+        rc = xs.send(s, b'r', 1, 0)
+        self.assertEqual(rc, 1)
+        #  Close the socket.
+        rc = xs.close(s)
+        self.assertEqual(rc, 0)
+        #  Terminate the context. This should take 0.1 second.
+        watch = xs.stopwatch_start()
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        ms = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(ms, linger.value)
+
+    def test_msg_flags(self):
+        from ctypes import byref, sizeof, c_int, c_size_t
+        XS, xs = twoget(self)
+        # Create the infrastructure
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.XREP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.XREQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Send 2 - part message.
+        rc = xs.send(sc, b'A', 1, XS.SNDMORE)
+        self.assertEqual(rc, 1)
+        rc = xs.send(sc, b'B', 1, 0)
+        self.assertEqual(rc, 1)
+        # Identity comes first.
+        msg = xs.msg_t()
+        rc = xs.msg_init(byref(msg))
+        self.assertEqual(rc, 0)
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertTrue(rc >= 0)
+        more = c_int()
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 1)
+        # Then the first part of the message body.
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertEqual(rc, 1)
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 1)
+        # And finally, the second part of the message body.
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertEqual(rc, 1)
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 0)
+        # Deallocate the infrastructure.
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_max_sockets(self):
+        from ctypes import byref, sizeof, c_int, get_errno
+        XS, xs = twoget(self)
+        # Create context and set MAX_SOCKETS to 1.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        max_sockets = c_int(1)
+        rc = xs.setctxopt(
+            ctx, XS.MAX_SOCKETS, byref(max_sockets), sizeof(max_sockets)
+        )
+        self.assertEqual(rc, 0)
+        # First socket should be created OK.
+        s1 = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s1)
+        # Creation of second socket should fail.
+        try:
+            xs.socket(ctx, XS.PUSH)
+        except xs.XSError:
+            self.assertEqual(get_errno(), XS.EMFILE)
+        # Clean up.
+        rc = xs.close(s1)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_invalid_rep(self):
+        from stuf.six import int2byte
+        from ctypes import byref, sizeof, c_int, c_ubyte
+        XS, xs = twoget(self)
+        # Create REQ/XREP wiring.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xrep_socket = xs.socket(ctx, XS.XREP)
+        self.assertTrue(xrep_socket)
+        req_socket = xs.socket(ctx, XS.REQ)
+        self.assertTrue(req_socket)
+        linger = c_int(0)
+        rc = xs.setsockopt(
+            xrep_socket, XS.LINGER, byref(linger), sizeof(linger)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.setsockopt(
+            req_socket, XS.LINGER, byref(linger), sizeof(linger)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.bind(xrep_socket, b'inproc://hi')
+        self.assertNotEqual(rc, -1)
+        rc = xs.connect(req_socket, b'inproc://hi')
+        self.assertNotEqual(rc, -1)
+        # Initial request.
+        rc = xs.send(req_socket, b'r', 1, 0)
+        self.assertEqual(rc, 1)
+        # Receive the request.
+        addr = (c_ubyte * 32)()
+        bottom = (c_ubyte * 1)()
+        body = (c_ubyte * 1)()
+        addr_size = xs.recv(xrep_socket, addr, sizeof(addr), 0)
+        self.assertTrue(addr_size >= 0)
+        rc = xs.recv(xrep_socket, bottom, sizeof(bottom), 0)
+        self.assertEqual(rc, 0)
+        rc = xs.recv(xrep_socket, body, sizeof(body), 0)
+        self.assertEqual(rc, 1)
+        # Send invalid reply.
+        rc = xs.send(xrep_socket, addr, addr_size, 0)
+        self.assertEqual(rc, addr_size)
+        # Send valid reply.
+        rc = xs.send(xrep_socket, addr, addr_size, XS.SNDMORE)
+        self.assertEqual(rc, addr_size)
+        rc = xs.send(xrep_socket, bottom, 0, XS.SNDMORE)
+        self.assertEqual(rc, 0)
+        rc = xs.send(xrep_socket, b'b', 1, 0)
+        self.assertEqual(rc, 1)
+        # Check whether we've got the valid reply.
+        rc = xs.recv(req_socket, body, sizeof(body), 0)
+        self.assertEqual(rc, 1)
+        self.assertEqual(int2byte(body[0]), b'b')
+        # Tear down the wiring.
+        rc = xs.close(xrep_socket)
+        self.assertEqual(rc, 0)
+        rc = xs.close(req_socket)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_resubscribe(self):
+        from stuf.six import int2byte
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        from crossroads.api import array
+        XS, xs = twoget(self)
+        # Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        sub = xs.socket(ctx, XS.SUB)
+        self.assertTrue(sub)
+        # Send two subscriptions upstream.
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, b'a', 1)
+        self.assertEqual(rc, 0)
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, b'b', 1)
+        self.assertEqual(rc, 0)
+        rc = xs.connect(sub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Check whether subscriptions are correctly received.
+        buf = array(c_ubyte, 5)
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'a')
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'b')
+        # Tear down the connection.
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        # Re-establish the connection.
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # We have to give control to the SUB socket here so that it has
+        # chance to resend the subscriptions.
+        try:
+            rc = xs.recv(sub, buf, sizeof(buf), XS.DONTWAIT)
+        except xs.XSError:
+            self.assertEqual(xs.errno(), XS.EAGAIN)
+        # Check whether subscriptions are correctly generated.
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'a')
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'b')
+        # Clean up.
+        rc = xs.close(sub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_inproc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_ipc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_tcp(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_device(self):
+        from ctypes.util import find_library
+        from ctypes import CDLL, byref, sizeof, c_int, c_ubyte, c_size_t
+        from crossroads.api import array
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create a req/rep device.
+        xreq = xs.socket(ctx, XS.XREQ)
+        self.assertTrue(xreq)
+        rc = xs.bind(xreq, 'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        xrep = xs.socket(ctx, XS.XREP)
+        self.assertTrue(xrep)
+        rc = xs.bind(xrep, 'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a worker.
+        rep = xs.socket(ctx, XS.REP)
+        self.assertTrue(rep)
+        rc = xs.connect(rep, 'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Create a client.
+        req = xs.socket(ctx, XS.REQ)
+        self.assertTrue(req)
+        rc = xs.connect(req, 'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Send a request.
+        rc = xs.send(req, 'ABC', 3, XS.SNDMORE)
+        self.assertEqual(rc, 3)
+        rc = xs.send(req, 'DEF', 3, 0)
+        self.assertEqual(rc, 3)
+        # Pass the request through the device.
+        for i in xrange(4):
+            msg = xs.msg_t()
+            rc = xs.msg_init(byref(msg))
+            self.assertEqual(rc, 0)
+            rc = xs.recvmsg(xrep, byref(msg), 0)
+            self.assertTrue(rc >= 0)
+            rcvmore = c_int()
+            sz = c_size_t(sizeof(rcvmore))
+            rc = xs.getsockopt(xrep, XS.RCVMORE, byref(rcvmore), byref(sz))
+            self.assertEqual(rc, 0)
+            rc = xs.sendmsg(xreq, byref(msg), XS.SNDMORE if rcvmore else 0)
+            self.assertTrue(rc >= 0)
+        # Receive the request.
+        buff = array(c_ubyte, 3)
+        rc = xs.recv(rep, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'ABC', 3), 0)
+        rcvmore = c_int()
+        sz = c_size_t(sizeof(rcvmore))
+        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(rep, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'DEF', 3), 0)
+        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        # Send the reply.
+        rc = xs.send(rep, 'GHI', 3, XS.SNDMORE)
+        self.assertEqual(rc, 3)
+        rc = xs.send(rep, 'JKL', 3, 0)
+        self.assertEqual(rc, 3)
+        # Pass the reply through the device.
+        for i in xrange(4):
+            msg = xs.msg_t()
+            rc = xs.msg_init(byref(msg))
+            self.assertEqual(rc, 0)
+            rc = xs.recvmsg(xreq, byref(msg), 0)
+            self.assertTrue(rc >= 0)
+            rc = xs.getsockopt(xreq, XS.RCVMORE, byref(rcvmore), byref(sz))
+            self.assertEqual(rc, 0)
+            rc = xs.sendmsg(xrep, byref(msg), XS.SNDMORE if rcvmore else 0)
+            self.assertTrue(rc >= 0)
+        # Receive the reply.
+        rc = xs.recv(req, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, 'GHI', 3), 0)
+        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(req, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, 'JKL', 3), 0)
+        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        # Clean up.
+        rc = xs.close(req)
+        self.assertEqual(rc, 0)
+        rc = xs.close(rep)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xrep)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xreq)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_wireformat(self):
+        XS, xs = twoget(self)
+        import sys
+        import socket
+        from ctypes.util import find_library
+        from ctypes import CDLL, byref, sizeof, c_int, c_ubyte, c_size_t
+        from crossroads.api import array
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        # Create the basic infrastructure.
+        ctx = ws.init()
+        assert (ctx)
+        push = ws.socket(ctx, XS.PUSH)
+        assert (push)
+        pull = ws.socket(ctx, XS.PULL)
+        assert (push)
+
+        # Bind the peer and get the message.
+        rc = ws.bind(pull, "tcp://127.0.0.1:5560")
+        self.assertNotEqual(rc, -1)
+        rc = ws.bind(push, "tcp://127.0.0.1:5561")
+        self.assertNotEqual(rc, -1)
+
+        #Connect to the peer using raw sockets.
+        rpush = socket(
+            socket.socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
+        )
+        address = socket.address()
+        address.sin_family = socket.AF_INET
+        address.sin_addr.s_addr = inet_addr("127.0.0.1")
+        address.sin_port = htons(5560)
+        rc = connect(rpush, (struct sockaddr * ) & address, sizeof(address))
+        self.assertEqual(rc, 0)
+
+        rpull = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
+        address.sin_family = AF_INET
+        address.sin_addr.s_addr = inet_addr("127.0.0.1")
+        address.sin_port = htons(5561)
+        rc = connect (rpull, (struct sockaddr * ) & address, sizeof (address))
+        self.assertEqual(rc, 0)
+        #Let's send some data and check if it arrived
+        rc = send(rpush, "\x04\0abc", 5, 0)
+        assert (rc == 5)
+        buf = array(c_ubyte, 3)
+        buf2 = array(c_ubyte, 3)
+        rc = ws.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        self.assertFalse(cpp.memcmp(buf, "abc"), 3)
+        #Let's push this data into another socket
+        rc = ws.send(push, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+#        rc = recv (rpull, (char *) buf2, sizeof (buf2), 0)
+        self.assertEqual(rc, 3)
+#        assert (!memcmp(buf2, "\x04\0abc", 3))
+        rc = ws.close(pull)
+        self.assertEqual(rc, 0)
+        rc = ws.close(push)
+        self.assertEqual(rc, 0)
+
+        rc = ws.term(ctx)
+        self.assertEqual(rc, 0)

tests/test_max_sockets.py

-/*
-    Copyright (c) 2012 250bpm s.r.o.
-    Copyright (c) 2012 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "max_sockets test running...\n");
-
-    //  Create context and set MAX_SOCKETS to 1.
-    void *ctx = xs_init ();
-    assert (ctx);
-    int max_sockets = 1;
-    int rc = xs_setctxopt (ctx, XS_MAX_SOCKETS, &max_sockets,
-        sizeof (max_sockets));
-    assert (rc == 0);
-
-    //  First socket should be created OK.
-    void *s1 = xs_socket (ctx, XS_PUSH);
-    assert (s1);
-
-    //  Creation of second socket should fail.
-    void *s2 = xs_socket (ctx, XS_PUSH);
-    assert (!s2 && errno == EMFILE);
-
-    //  Clean up.
-    rc = xs_close (s1);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0;
-}
-

tests/test_msg_flags.py

-/*
-    Copyright (c) 2011-2012 250bpm s.r.o.
-    Copyright (c) 2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "msg_flags test running...\n");
-
-    //  Create the infrastructure
-    void *ctx = xs_init ();
-    assert (ctx);
-    void *sb = xs_socket (ctx, XS_XREP);
-    assert (sb);
-    int rc = xs_bind (sb, "inproc://a");
-    assert (rc != -1);
-    void *sc = xs_socket (ctx, XS_XREQ);
-    assert (sc);
-    rc = xs_connect (sc, "inproc://a");
-    assert (rc != -1);
-   
-    //  Send 2-part message.
-    rc = xs_send (sc, "A", 1, XS_SNDMORE);
-    assert (rc == 1);
-    rc = xs_send (sc, "B", 1, 0);
-    assert (rc == 1);
-
-    //  Identity comes first.
-    xs_msg_t msg;
-    rc = xs_msg_init (&msg);
-    assert (rc == 0);
-    rc = xs_recvmsg (sb, &msg, 0);
-    assert (rc >= 0);
-    int more;
-    size_t more_size = sizeof (more);
-    rc = xs_getmsgopt (&msg, XS_MORE, &more, &more_size);
-    assert (rc == 0);
-    assert (more == 1);
-
-    //  Then the first part of the message body.
-    rc = xs_recvmsg (sb, &msg, 0);
-    assert (rc == 1);
-    more_size = sizeof (more);
-    rc = xs_getmsgopt (&msg, XS_MORE, &more, &more_size);
-    assert (rc == 0);
-    assert (more == 1);
-
-    //  And finally, the second part of the message body.
-    rc = xs_recvmsg (sb, &msg, 0);
-    assert (rc == 1);
-    more_size = sizeof (more);
-    rc = xs_getmsgopt (&msg, XS_MORE, &more, &more_size);
-    assert (rc == 0);
-    assert (more == 0);
-
-    //  Deallocate the infrastructure.
-    rc = xs_close (sc);
-    assert (rc == 0);
-    rc = xs_close (sb);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-    return 0 ;
-}
-

tests/test_pair_inproc.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "pair_inproc test running...\n");
-
-    void *ctx = xs_init ();
-    assert (ctx);
-
-    void *sb = xs_socket (ctx, XS_PAIR);
-    assert (sb);
-    int rc = xs_bind (sb, "inproc://a");
-    assert (rc != -1);
-
-    void *sc = xs_socket (ctx, XS_PAIR);
-    assert (sc);
-    rc = xs_connect (sc, "inproc://a");
-    assert (rc != -1);
-    
-    bounce (sb, sc);
-
-    rc = xs_close (sc);
-    assert (rc == 0);
-
-    rc = xs_close (sb);
-    assert (rc == 0);
-
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}

tests/test_pair_ipc.py

-
-from stuf.six import unittest
-
-from tests.utils import bounce
-
-
-class TestPair(unittest.TestCase):
-
-    def test_pair(self):
-        import crossroads.constants as XS
-        import crossroads.api as xs
-        ctx = xs.init()
-        self.assertTrue(ctx)
-        sb = xs.socket(ctx, XS.PAIR)
-        self.assertTrue(sb)
-        rc = xs.bind(sb, b'ipc:///tmp/tester')
-        self.assertNotEqual(rc, -1)
-        sc = xs.socket(ctx, XS.PAIR)
-        self.assertTrue(sc)
-        rc = xs.connect(sc, "ipc:///tmp/tester")
-        self.assertNotEqual(rc, -1)
-        self.assertTrue(bounce(sb, sc))
-        rc = xs.close(sc)
-        self.assertEqual(rc, 0)
-        rc = xs.close(sb)
-        self.assertEqual(rc, 0)
-        rc = xs.term(ctx)
-        self.assertEqual(rc, 0)

tests/test_pair_tcp.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2011 iMatix Corporation
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "pair_tcp test running...\n");
-
-    void *ctx = xs_init ();
-    assert (ctx);
-
-    void *sb = xs_socket (ctx, XS_PAIR);
-    assert (sb);
-    int rc = xs_bind (sb, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-
-    void *sc = xs_socket (ctx, XS_PAIR);
-    assert (sc);
-    rc = xs_connect (sc, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-    
-    bounce (sb, sc);
-
-    //  Now let's try to open one more connection to the bound socket.
-    //  The connection should be silently rejected rather than causing error.
-    void *sc2 = xs_socket (ctx, XS_PAIR);
-    assert (sc2);
-    rc = xs_connect (sc2, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-    sleep (1);
-
-    rc = xs_close (sc2);
-    assert (rc == 0);
-
-    rc = xs_close (sc);
-    assert (rc == 0);
-
-    rc = xs_close (sb);
-    assert (rc == 0);
-
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}

tests/test_reconnect.py

-/*
-    Copyright (c) 2012 250bpm s.r.o.
-    Copyright (c) 2012 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "reconnect test running...\n");
-
-    //  Create the basic infrastructure.
-    void *ctx = xs_init ();
-    assert (ctx);
-    void *push = xs_socket (ctx, XS_PUSH);
-    assert (push);
-    void *pull = xs_socket (ctx, XS_PULL);
-    assert (push);
-
-    //  Connect before bind was done at the peer and send one message.
-    int rc = xs_connect (push, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-    rc = xs_send (push, "ABC", 3, 0);
-    assert (rc == 3);
-
-    //  Wait a while for few attempts to reconnect to happen.
-    sleep (1);
-
-    //  Bind the peer and get the message.
-    rc = xs_bind (pull, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-    unsigned char buf [3];
-    rc = xs_recv (pull, buf, sizeof (buf), 0);
-    assert (rc == 3);
-
-    //  Clean up.
-    rc = xs_close (push);
-    assert (rc == 0);
-    rc = xs_close (pull);
-    assert (rc == 0);
-
-#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
-
-    //  Now, let's test the same scenario with IPC.
-    push = xs_socket (ctx, XS_PUSH);
-    assert (push);
-    pull = xs_socket (ctx, XS_PULL);
-    assert (push);
-
-    //  Connect before bind was done at the peer and send one message.
-    rc = xs_connect (push, "ipc:///tmp/tester");
-    assert (rc != -1);
-    rc = xs_send (push, "ABC", 3, 0);
-    assert (rc == 3);
-
-    //  Wait a while for few attempts to reconnect to happen.
-    sleep (1);
-
-    //  Bind the peer and get the message.
-    rc = xs_bind (pull, "ipc:///tmp/tester");
-    assert (rc != -1);
-    rc = xs_recv (pull, buf, sizeof (buf), 0);
-    assert (rc == 3);
-
-    //  Clean up.
-    rc = xs_close (push);
-    assert (rc == 0);
-    rc = xs_close (pull);
-    assert (rc == 0);
-
-#endif
-
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}

tests/test_reqrep_device.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2011 VMware, Inc.
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "reqrep_device test running...\n");
-
-    void *ctx = xs_init ();
-    assert (ctx);
-
-    //  Create a req/rep device.
-    void *xreq = xs_socket (ctx, XS_XREQ);
-    assert (xreq);
-    int rc = xs_bind (xreq, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-    void *xrep = xs_socket (ctx, XS_XREP);
-    assert (xrep);
-    rc = xs_bind (xrep, "tcp://127.0.0.1:5561");
-    assert (rc != -1);
-
-    //  Create a worker.
-    void *rep = xs_socket (ctx, XS_REP);
-    assert (rep);
-    rc = xs_connect (rep, "tcp://127.0.0.1:5560");
-    assert (rc != -1);
-
-    //  Create a client.
-    void *req = xs_socket (ctx, XS_REQ);
-    assert (req);
-    rc = xs_connect (req, "tcp://127.0.0.1:5561");
-    assert (rc != -1);
-
-    //  Send a request.
-    rc = xs_send (req, "ABC", 3, XS_SNDMORE);
-    assert (rc == 3);
-    rc = xs_send (req, "DEF", 3, 0);
-    assert (rc == 3);
-
-    //  Pass the request through the device.
-    for (int i = 0; i != 4; i++) {
-        xs_msg_t msg;
-        rc = xs_msg_init (&msg);
-        assert (rc == 0);
-        rc = xs_recvmsg (xrep, &msg, 0);
-        assert (rc >= 0);
-        int rcvmore;
-        size_t sz = sizeof (rcvmore);
-        rc = xs_getsockopt (xrep, XS_RCVMORE, &rcvmore, &sz);
-        assert (rc == 0);
-        rc = xs_sendmsg (xreq, &msg, rcvmore ? XS_SNDMORE : 0);
-        assert (rc >= 0);
-    }
-
-    //  Receive the request.
-    char buff [3];
-    rc = xs_recv (rep, buff, 3, 0);
-    assert (rc == 3);
-    assert (memcmp (buff, "ABC", 3) == 0);
-    int rcvmore;
-    size_t sz = sizeof (rcvmore);
-    rc = xs_getsockopt (rep, XS_RCVMORE, &rcvmore, &sz);
-    assert (rc == 0);
-    assert (rcvmore);
-    rc = xs_recv (rep, buff, 3, 0);
-    assert (rc == 3);
-    assert (memcmp (buff, "DEF", 3) == 0);
-    rc = xs_getsockopt (rep, XS_RCVMORE, &rcvmore, &sz);
-    assert (rc == 0);
-    assert (!rcvmore);
-
-    //  Send the reply.
-    rc = xs_send (rep, "GHI", 3, XS_SNDMORE);
-    assert (rc == 3);
-    rc = xs_send (rep, "JKL", 3, 0);
-    assert (rc == 3);
-
-    //  Pass the reply through the device.
-    for (int i = 0; i != 4; i++) {
-        xs_msg_t msg;
-        rc = xs_msg_init (&msg);
-        assert (rc == 0);
-        rc = xs_recvmsg (xreq, &msg, 0);
-        assert (rc >= 0);
-        rc = xs_getsockopt (xreq, XS_RCVMORE, &rcvmore, &sz);
-        assert (rc == 0);
-        rc = xs_sendmsg (xrep, &msg, rcvmore ? XS_SNDMORE : 0);
-        assert (rc >= 0);
-    }
-
-    //  Receive the reply.
-    rc = xs_recv (req, buff, 3, 0);
-    assert (rc == 3);
-    assert (memcmp (buff, "GHI", 3) == 0);
-    rc = xs_getsockopt (req, XS_RCVMORE, &rcvmore, &sz);
-    assert (rc == 0);
-    assert (rcvmore);
-    rc = xs_recv (req, buff, 3, 0);
-    assert (rc == 3);
-    assert (memcmp (buff, "JKL", 3) == 0);
-    rc = xs_getsockopt (req, XS_RCVMORE, &rcvmore, &sz);
-    assert (rc == 0);
-    assert (!rcvmore);
-
-    //  Clean up.
-    rc = xs_close (req);
-    assert (rc == 0);
-    rc = xs_close (rep);
-    assert (rc == 0);
-    rc = xs_close (xrep);
-    assert (rc == 0);
-    rc = xs_close (xreq);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}

tests/test_reqrep_inproc.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "reqrep_inproc test running...\n");
-
-    void *ctx = xs_init ();
-    assert (ctx);
-
-    void *sb = xs_socket (ctx, XS_REP);
-    assert (sb);
-    int rc = xs_bind (sb, "inproc://a");
-    assert (rc != -1);
-
-    void *sc = xs_socket (ctx, XS_REQ);
-    assert (sc);
-    rc = xs_connect (sc, "inproc://a");
-    assert (rc != -1);
-    
-    bounce (sb, sc);
-
-    rc = xs_close (sc);
-    assert (rc == 0);
-
-    rc = xs_close (sb);
-    assert (rc == 0);
-
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}

tests/test_reqrep_ipc.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "testutil.hpp"
-
-#if defined XS_HAVE_WINDOWS || defined XS_HAVE_OPENVMS
-int XS_TEST_MAIN ()
-{
-    return 0;
-}
-#else
-
-int XS_TEST_MAIN ()
-{
-    fprintf (stderr, "reqrep_ipc test running...\n");
-
-    void *ctx = xs_init ();
-    assert (ctx);
-
-    void *sb = xs_socket (ctx, XS_REP);
-    assert (sb);
-    int rc = xs_bind (sb, "ipc:///tmp/tester");
-    assert (rc != -1);
-
-    void *sc = xs_socket (ctx, XS_REQ);
-    assert (sc);
-    rc = xs_connect (sc, "ipc:///tmp/tester");
-    assert (rc != -1);
-    
-    bounce (sb, sc);
-
-    rc = xs_close (sc);
-    assert (rc == 0);
-
-    rc = xs_close (sb);
-    assert (rc == 0);
-
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    return 0 ;
-}
-
-#endif

tests/test_reqrep_tcp.py

-/*
-    Copyright (c) 2010-2012 250bpm s.r.o.
-    Copyright (c) 2011 iMatix Corporation
-    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
-
-    This file is part of Crossroads I/O project.
-
-    Crossroads I/O is free software; you can redistribute it and/or modify it
-    under the terms of the GNU Lesser General Public License as published by
-    the Free Software Foundation; either version 3 of the License, or
-    (at your option) any later version.
-
-    Crossroads is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU Lesser General Public License for more details.
-
-    You should have received a copy of the GNU Lesser General Public