Commits

Lynn Rees committed f143187

- more tests
- low level api

Comments (0)

Files changed (12)

crossroads/api.py

-# -*- coding: utf-8 -*-
-'''Crossroads API functions.'''
-
-from functools import partial
-from ctypes.util import find_library
-from ctypes import Structure, CDLL, POINTER, memmove, get_errno  # , byref,
-from ctypes import (
-    c_void_p as void, c_int as cint, c_char_p as char, c_ubyte as ubyte,
-    c_size_t as size_t, c_short as short, c_ulong)
-
-
-class XSBaseError(Exception):
-
-    '''Base exception.'''
-
-
-class XSError(XSBaseError):
-
-    def __init__(self, errno=None):
-        if errno is None:
-            errno = get_errno()
-        self.strerror = strerror(errno)
-        self.errno = errno
-
-    def __str__(self):
-        return self.strerror
-
-
-def _check_nonzero(result, func, arguments):
-    if result == -1:
-        raise XSError(get_errno())
-    return result
-
-
-def _check_not_null(result, func, arguments):
-    if result is None:
-        raise XSError(get_errno())
-    return result
-
-
-def _check_errno(result, func, arguments):
-    errno = get_errno()
-    if errno != 0:
-        raise XSError(errno)
-    return result
-
-
-def _factory(lib, prefix, restype, func, *args):
-    newfunc = getattr(lib, '_'.join([prefix, func]))
-    newfunc.argtypes = list(args)
-    newfunc.restype = restype
-    if newfunc.restype is cint:
-        newfunc.errcheck = _check_nonzero
-    elif newfunc.restype is void:
-        newfunc.errcheck = _check_not_null
-    return newfunc
-
-
-XS = CDLL(find_library('xs'), use_errno=True)
-array = lambda ctype, length: (ctype * length)()
-factory = partial(_factory, XS, 'xs')
-intfactory = partial(_factory, XS, 'xs', cint)
-voidfactory = partial(_factory, XS, 'xs', void)
-
-memmove.restype = void
-
-###############################################################################
-## CROSSROADS VERSIONING SUPPORT ##############################################
-###############################################################################
-
-# void xs_version (int *major, int *minor, int *patch);
-#version = voidfactory('version', POINTER(cint), POINTER(cint), POINTER(cint))
-#VERSION = cint(), cint(), cint()
-#version(byref(VERSION[0]), byref(VERSION[1]), byref(VERSION[2]))
-#VERSION = tuple(x.value for x in VERSION)
-
-###############################################################################
-## CROSSROADS ERRORS ##########################################################
-###############################################################################
-
-# int xs_errno (void)
-errno = intfactory('errno')
-
-# const char *xs_strerror (int errnum)
-strerror = factory(char, 'strerror', cint)
-
-###############################################################################
-## CROSSROADS MESSAGE DEFINITION ##############################################
-###############################################################################
-
-
-class msg_t(Structure):
-
-    # typedef struct {unsigned char _ [32];} xs_msg_t;
-    _fields_ = [('_', ubyte * 32)]
-
-# int xs_msg_init (xs_msg_t *msg);
-msg_init = intfactory('msg_init', POINTER(msg_t))
-
-# int xs_msg_init_size (xs_msg_t *msg, size_t size);
-msg_init_size = intfactory('msg_init_size', POINTER(msg_t), size_t)
-
-# int xs_msg_init_data (
-#  xs_msg_t *msg, void *data, size_t size, xs_free_fn *ffn, void *hint
-# );
-# requires a free function for fourth argument e.g.
-# typedef void (xs_free_fn) (void *data, void *hint);
-msg_init_data = intfactory(
-    'msg_init_data', POINTER(msg_t), void, size_t, void, void,
-)
-
-# int xs_msg_close (xs_msg_t *msg);
-msg_close = intfactory('msg_close', POINTER(msg_t))
-
-# int xs_msg_move (xs_msg_t *dest, xs_msg_t *src);
-msg_move = intfactory('msg_move', POINTER(msg_t), POINTER(msg_t))
-
-# int xs_msg_copy (xs_msg_t *dest, xs_msg_t *src);
-msg_copy = intfactory('msg_copy', POINTER(msg_t), POINTER(msg_t))
-
-# void *xs_msg_data (xs_msg_t *msg);
-msg_data = voidfactory('msg_data', POINTER(msg_t))
-
-# size_t xs_msg_size (xs_msg_t *msg);
-msg_size = factory(size_t, 'msg_size', POINTER(msg_t))
-
-# int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, size_t *optvallen)
-getmsgopt = intfactory('getmsgopt', POINTER(msg_t), cint, void, POINTER(size_t))
-
-###############################################################################
-## CROSSROADS CONTEXT DEFINITION ##############################################
-###############################################################################
-
-# void *xs_init (void);
-init = voidfactory('init')
-
-# int xs_term (void *context);
-term = intfactory('term', void)
-
-# int xs_setctxopt (
-#  void *context, int option, const void *optval, size_t optvallen
-# );
-setctxopt = intfactory('setctxopt', void, cint, void, size_t)
-
-###############################################################################
-## CROSSROADS SOCKET DEFINITION ###############################################
-###############################################################################
-
-#void *xs_socket (void *context, int type);
-socket = voidfactory('socket', void, cint)
-
-# int xs_close (void *s);
-close = intfactory('close', void)
-
-# int xs_setsockopt (void *s, int option, const void *optval, size_t optvallen)
-setsockopt = intfactory('setsockopt', void, cint, void, size_t)
-
-# int xs_getsockopt (void *s, int option, void *optval, size_t *optvallen);
-getsockopt = intfactory('getsockopt', void, cint, void, POINTER(size_t))
-
-# int xs_bind (void *s, const char *addr);
-bind = intfactory('bind', void, char)
-
-# int xs_connect (void *s, const char *addr);
-connect = intfactory('connect', void, char)
-
-# int xs_shutdown (void *s, int how);
-shutdown = intfactory('shutdown', void, cint)
-
-# int xs_send (void *s, const void *buf, size_t len, int flags);
-send = intfactory('send', void, void, size_t, cint)
-
-# int xs_recv (void *s, void *buf, size_t len, int flags);
-recv = intfactory('recv', void, void, size_t, cint)
-
-# int xs_sendmsg (void *s, xs_msg_t *msg, int flags);
-sendmsg = intfactory('sendmsg', void, POINTER(msg_t), cint)
-
-# int xs_recvmsg (void *s, xs_msg_t *msg, int flags);
-recvmsg = intfactory('recvmsg', void, POINTER(msg_t), cint)
-
-###############################################################################
-## I/O MULTIPLEXING ###########################################################
-###############################################################################
-
-
-class pollitem_t(Structure):
-
-    # typedef struct { void *socket; int fd; short events; short revents;
-    # } xs_pollitem_t;
-
-    _fields_ = [
-        ('socket', void), ('fd', cint),  ('events', short), ('revents', short),
-    ]
-
-# int xs_poll (xs_pollitem_t *items, int nitems, int timeout);
-poll = intfactory('poll', POINTER(pollitem_t), cint, cint)
-
-# Starts the stopwatch. Returns the handle to the watch.
-# void *xs_stopwatch_start (void);
-stopwatch_start = voidfactory('stopwatch_start')
-
-# Stops the stopwatch. Returns the number of microseconds elapsed since the
-# stopwatch was started.
-# unsigned long xs_stopwatch_stop (void *watch);
-stopwatch_stop = factory(c_ulong, 'stopwatch_stop', void)

crossroads/constants.py

 POLLOUT = 2
 POLLERR = 4
 
-bytes_sockopts = [SUBSCRIBE, UNSUBSCRIBE, IDENTITY]
-int64_sockopts = [AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RCVMORE]
-int_sockopts = [FD, EVENTS, TYPE, LINGER, RECONNECT_IVL, BACKLOG]
+BINARY_OPTS = (SUBSCRIBE, UNSUBSCRIBE, IDENTITY)
+INT_OPTS = (
+    SNDHWM, RCVHWM, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, LINGER, RECONNECT_IVL,
+    RECONNECT_IVL_MAX, BACKLOG, MULTICAST_HOPS, RCVTIMEO, SNDTIMEO, IPV4ONLY,
+    KEEPALIVE, SURVEY_TIMEOUT,
+)
+INT64_OPTS = (AFFINITY, MAXMSGSIZE)
 
 from errno import (
     EAGAIN, EFAULT, EINVAL, ENETDOWN, ENOBUFS, ENODEV, ENOMEM, ETIMEDOUT,
-    EMFILE)  # @UnusedImport
+    EMFILE)

crossroads/core.py

 
 from stuf.six import isunicode, isbytes, tobytes
 
-from . import api as xs
+from . import lowest as xs
 from . import constants as XS
 
 

crossroads/low.py

+# -*- coding: utf-8 -*-
+'''Low level API for ctypes binding to Crossroads.IO library.'''
+
+from functools import partial
+from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_ubyte, string_at
+
+from stuf.deep import setter
+from stuf.six import items, tobytes, isstring, tounicode
+
+from . import lowest as xs
+from . import constants as XS
+from crossroads.lowest import array
+
+BAD_TYPE = 'address must be bytes, got {r} instead'.format
+
+
+class Context(object):
+
+    def __init__(self, threads=1, max_sockets=512):
+        self.ctx = ctx = xs.init()
+        if threads != 1:
+            if threads != 0:
+                iothreads = c_int(threads)
+                xs.setctxopt(
+                    ctx, XS.IO_THREADS, byref(iothreads), sizeof(iothreads)
+                )
+                del iothreads
+            else:
+                raise xs.XSError('must have 1 or more I/O threads')
+        if max_sockets != 512:
+            if max_sockets != 0:
+                max_sockets = c_int(max_sockets)
+                xs.setctxopt(
+                    ctx,
+                    XS.MAX_SOCKETS,
+                    byref(max_sockets),
+                    sizeof(max_sockets),
+                )
+                del max_sockets
+            else:
+                raise xs.XSError('must have 1 or more sockets')
+
+    def open(self, stype, **options):
+        return Socket(self.ctx, stype, **options)
+
+    def close(self):
+        xs.term(self.ctx)
+
+    def pair(self, **options):
+        return self.open(XS.PAIR, **options)
+
+    def publish(self, **options):
+        return self.open(XS.PUB, **options)
+
+    def subscribe(self, **options):
+        return self.open(XS.SUB, **options)
+
+    def request(self, **options):
+        return self.open(XS.REQ, **options)
+
+    def reply(self, **options):
+        return self.open(XS.REP, **options)
+
+    def xrequest(self, **options):
+        return self.open(XS.XREQ, **options)
+
+    def xreply(self, **options):
+        return self.open(XS.XREP, **options)
+
+    def pull(self, **options):
+        return self.open(XS.PULL, **options)
+
+    def push(self, **options):
+        return self.open(XS.PUSH, **options)
+
+    def xpublish(self, **options):
+        return self.open(XS.XPUB, **options)
+
+    def xsubscribe(self, **options):
+        return self.open(XS.XSUB, **options)
+
+    def surveyor(self, **options):
+        return self.open(XS.SURVEYOR, **options)
+
+    def respondent(self, **options):
+        return self.open(XS.RESPONDENT, **options)
+
+    def xsurveyor(self, **options):
+        return self.open(XS.XSURVEYOR, **options)
+
+    def xrespondent(self, **options):
+        return self.open(XS.XRESPONDENT, **options)
+
+
+class Socket(object):
+
+    def __init__(self, context, stype, **options):
+        self._ctx = context
+        self._type = type
+        self._socket = xs.socket(context, type)
+        if options:
+            self.set(**options)
+
+    def __getattr__(self, key, getr=object.__getattribute__):
+        try:
+            return getr(self, key)
+        except AttributeError:
+            try:
+                this = getattr(xs, key.strip('_'))
+                return setter(self, key, partial(this, self._socket))
+            except AttributeError:
+                raise AttributeError(key)
+
+    def set(self, **options):
+        byref, c_int, c_int64, c_char_p = byref, c_int, c_int64, c_char_p
+        INT_OPTS = XS.INT_OPTS
+        INT64_OPTS = XS.INT64_OPTS
+        BINARY_OPTS = XS.BINARY_OPTS
+        setsocket = self._setsocket
+        getr = getattr
+        for k, v in items(options):
+            const = getr(XS, k.upper())
+            if const in INT_OPTS:
+                value = c_int(v)
+            elif const in INT64_OPTS:
+                value = c_int64(v)
+            elif const in BINARY_OPTS:
+                value = c_char_p(v)
+            setsocket(const, byref(value), sizeof(value))
+
+    def bind(self, address):
+        if isstring(address):
+            # if string, coerce to bytes
+            return self._bind(tobytes(address))
+        else:
+            raise xs.XSError(BAD_TYPE(type(address)))
+
+    def connect(self, address):
+        if isstring(address):
+            # if string, coerce to bytes
+            return self._connect(tobytes(address))
+        else:
+            raise xs.XSError(BAD_TYPE(type(address)))
+
+    def shutdown(self, sid):
+        return self._shutdown(c_int(sid))
+
+    def send_bytes(self, msg):
+        return self._send(msg, len(msg), 0)
+
+    def send_unicode(self, msg):
+        msg = tobytes(msg)
+        return self._send(msg, len(msg), 0)
+
+    def recv_bytes(self, bufsize, encoding='latin-1', nowait=False):
+        msg = array(c_ubyte, bufsize)
+        rc = self._recv(msg, sizeof(msg), 1 if nowait else 0)
+        return tobytes(string_at(msg, rc), encoding)
+
+    def recv_unicode(self, bufsize, encoding='utf-8', nowait=False):
+        msg = array(c_ubyte, bufsize)
+        rc = self._recv(msg, sizeof(msg), 1 if nowait else 0)
+        return tounicode(string_at(msg, rc), encoding)
+
+    def sendmsg_bytes(self, msg, flags=0):
+        msg = c_ubyte(msg)
+        return self._sendmsg(msg, sizeof(msg), flags)
+
+    def sendmsg_unicode(self, msg, flags=0):
+        msg = c_ubyte(msg)
+        return self._sendmsg(msg, sizeof(msg), flags)
+
+    def recvmsg_bytes(self, msg, flags=0):
+        msg = c_ubyte(msg)
+        return self._recvmsg(msg, sizeof(msg), flags)
+
+    def recvmsg_unicode(self, msg, flags=0):
+        msg = c_ubyte(msg)
+        return self._recvmsg(msg, sizeof(msg), flags)
+
+    def close(self):
+        return self._close()

crossroads/lowest.py

+# -*- coding: utf-8 -*-
+'''Crossroads API functions.'''
+
+from functools import partial
+from ctypes.util import find_library
+from ctypes import Structure, CDLL, POINTER, memmove, get_errno  # , byref,
+from ctypes import (
+    c_void_p as void, c_int as cint, c_char_p as char, c_ubyte as ubyte,
+    c_size_t as size_t, c_short as short, c_ulong)
+
+
+class XSBaseError(Exception):
+
+    '''Base exception.'''
+
+
+class XSError(XSBaseError):
+
+    def __init__(self, errno=None):
+        if errno is None:
+            errno = get_errno()
+        self.strerror = strerror(errno)
+        self.errno = errno
+
+    def __str__(self):
+        return self.strerror
+
+
+def _check_nonzero(result, func, arguments):
+    if result == -1:
+        raise XSError(get_errno())
+    return result
+
+
+def _check_not_null(result, func, arguments):
+    if result is None:
+        raise XSError(get_errno())
+    return result
+
+
+def _check_errno(result, func, arguments):
+    errno = get_errno()
+    if errno != 0:
+        raise XSError(errno)
+    return result
+
+
+def _factory(lib, prefix, restype, func, *args):
+    newfunc = getattr(lib, '_'.join([prefix, func]))
+    newfunc.argtypes = list(args)
+    newfunc.restype = restype
+    if newfunc.restype is cint:
+        newfunc.errcheck = _check_nonzero
+    elif newfunc.restype is void:
+        newfunc.errcheck = _check_not_null
+    return newfunc
+
+
+XS = CDLL(find_library('xs'), use_errno=True)
+array = lambda ctype, length: (ctype * length)()
+factory = partial(_factory, XS, 'xs')
+intfactory = partial(_factory, XS, 'xs', cint)
+voidfactory = partial(_factory, XS, 'xs', void)
+
+memmove.restype = void
+
+###############################################################################
+## CROSSROADS VERSIONING SUPPORT ##############################################
+###############################################################################
+
+# void xs_version (int *major, int *minor, int *patch);
+#version = voidfactory('version', POINTER(cint), POINTER(cint), POINTER(cint))
+#VERSION = cint(), cint(), cint()
+#version(byref(VERSION[0]), byref(VERSION[1]), byref(VERSION[2]))
+#VERSION = tuple(x.value for x in VERSION)
+
+###############################################################################
+## CROSSROADS ERRORS ##########################################################
+###############################################################################
+
+# int xs_errno (void)
+errno = intfactory('errno')
+
+# const char *xs_strerror (int errnum)
+strerror = factory(char, 'strerror', cint)
+
+###############################################################################
+## CROSSROADS MESSAGE DEFINITION ##############################################
+###############################################################################
+
+
+class msg_t(Structure):
+
+    # typedef struct {unsigned char _ [32];} xs_msg_t;
+    _fields_ = [('_', ubyte * 32)]
+
+# int xs_msg_init (xs_msg_t *msg);
+msg_init = intfactory('msg_init', POINTER(msg_t))
+
+# int xs_msg_init_size (xs_msg_t *msg, size_t size);
+msg_init_size = intfactory('msg_init_size', POINTER(msg_t), size_t)
+
+# int xs_msg_init_data (
+#  xs_msg_t *msg, void *data, size_t size, xs_free_fn *ffn, void *hint
+# );
+# requires a free function for fourth argument e.g.
+# typedef void (xs_free_fn) (void *data, void *hint);
+msg_init_data = intfactory(
+    'msg_init_data', POINTER(msg_t), void, size_t, void, void,
+)
+
+# int xs_msg_close (xs_msg_t *msg);
+msg_close = intfactory('msg_close', POINTER(msg_t))
+
+# int xs_msg_move (xs_msg_t *dest, xs_msg_t *src);
+msg_move = intfactory('msg_move', POINTER(msg_t), POINTER(msg_t))
+
+# int xs_msg_copy (xs_msg_t *dest, xs_msg_t *src);
+msg_copy = intfactory('msg_copy', POINTER(msg_t), POINTER(msg_t))
+
+# void *xs_msg_data (xs_msg_t *msg);
+msg_data = voidfactory('msg_data', POINTER(msg_t))
+
+# size_t xs_msg_size (xs_msg_t *msg);
+msg_size = factory(size_t, 'msg_size', POINTER(msg_t))
+
+# int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, size_t *optvallen)
+getmsgopt = intfactory('getmsgopt', POINTER(msg_t), cint, void, POINTER(size_t))
+
+###############################################################################
+## CROSSROADS CONTEXT DEFINITION ##############################################
+###############################################################################
+
+# void *xs_init (void);
+init = voidfactory('init')
+
+# int xs_term (void *context);
+term = intfactory('term', void)
+
+# int xs_setctxopt (
+#  void *context, int option, const void *optval, size_t optvallen
+# );
+setctxopt = intfactory('setctxopt', void, cint, void, size_t)
+
+###############################################################################
+## CROSSROADS SOCKET DEFINITION ###############################################
+###############################################################################
+
+#void *xs_socket (void *context, int type);
+socket = voidfactory('socket', void, cint)
+
+# int xs_close (void *s);
+close = intfactory('close', void)
+
+# int xs_setsockopt (void *s, int option, const void *optval, size_t optvallen)
+setsockopt = intfactory('setsockopt', void, cint, void, size_t)
+
+# int xs_getsockopt (void *s, int option, void *optval, size_t *optvallen);
+getsockopt = intfactory('getsockopt', void, cint, void, POINTER(size_t))
+
+# int xs_bind (void *s, const char *addr);
+bind = intfactory('bind', void, char)
+
+# int xs_connect (void *s, const char *addr);
+connect = intfactory('connect', void, char)
+
+# int xs_shutdown (void *s, int how);
+shutdown = intfactory('shutdown', void, cint)
+
+# int xs_send (void *s, const void *buf, size_t len, int flags);
+send = intfactory('send', void, void, size_t, cint)
+
+# int xs_recv (void *s, void *buf, size_t len, int flags);
+recv = intfactory('recv', void, void, size_t, cint)
+
+# int xs_sendmsg (void *s, xs_msg_t *msg, int flags);
+sendmsg = intfactory('sendmsg', void, POINTER(msg_t), cint)
+
+# int xs_recvmsg (void *s, xs_msg_t *msg, int flags);
+recvmsg = intfactory('recvmsg', void, POINTER(msg_t), cint)
+
+###############################################################################
+## I/O MULTIPLEXING ###########################################################
+###############################################################################
+
+
+class pollitem_t(Structure):
+
+    # typedef struct { void *socket; int fd; short events; short revents;
+    # } xs_pollitem_t;
+
+    _fields_ = [
+        ('socket', void), ('fd', cint),  ('events', short), ('revents', short),
+    ]
+
+# int xs_poll (xs_pollitem_t *items, int nitems, int timeout);
+poll = intfactory('poll', POINTER(pollitem_t), cint, cint)
+
+# Starts the stopwatch. Returns the handle to the watch.
+# void *xs_stopwatch_start (void);
+stopwatch_start = voidfactory('stopwatch_start')
+
+# Stops the stopwatch. Returns the number of microseconds elapsed since the
+# stopwatch was started.
+# unsigned long xs_stopwatch_stop (void *watch);
+stopwatch_stop = factory(c_ulong, 'stopwatch_stop', void)

tests/test_backlog.py

-from stuf.six import unittest
-
-
-BACKLOG = 10
-PARALLEL_CONNECTS = 50
-
-#if 0
-
-extern "C"
-{
-
-#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
-    static void ipc_worker (void *ctx_)
-    {
-        int rc;
-        void *push;
-
-        push = xs_socket (ctx_, XS_PUSH);
-        assert (push);
-        rc = xs_connect (push, "ipc:///tmp/test-backlog");
-        assert (rc >= 0);
-        rc = xs_send (push, NULL, 0, 0);
-        assert (rc == 0);
-        rc = xs_close (push);
-        assert (rc == 0);
-    }
-#endif
-    static void tcp_worker (void *ctx_)
-    {
-        int rc;
-        void *push;
-
-        push = xs_socket (ctx_, XS_PUSH);
-        assert (push);
-        rc = xs_connect (push, "tcp://127.0.0.1:5560");
-        assert (rc >= 0);
-
-        rc = xs_send (push, NULL, 0, 0);
-        assert (rc == 0);
-        rc = xs_close (push);
-        assert (rc == 0);
-    }
-}
-#endif
-
-class TestBacklog(unittest.TestCase):
-
-
-#if 0
-    int rc;
-    void *ctx;
-    void *pull;
-    int i;
-    int backlog;
-    void *threads [PARALLEL_CONNECTS];
-
-    #  Test the exhaustion on IPC backlog.
-#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
-    ctx = xs_init ();
-    assert (ctx);
-
-    pull = xs_socket (ctx, XS_PULL);
-    assert (pull);
-    backlog = BACKLOG;
-    rc = xs_setsockopt (pull, XS_BACKLOG, &backlog, sizeof (backlog));
-    assert (rc == 0);
-    rc = xs_bind (pull, "ipc:///tmp/test-backlog");
-    assert (rc >= 0);
-
-    
-    for (i = 0; i < PARALLEL_CONNECTS; i++)
-        threads [i] = thread_create (ipc_worker, ctx);
-
-    for (i = 0; i < PARALLEL_CONNECTS; i++) {
-        rc = xs_recv (pull, NULL, 0, 0);
-        assert (rc == 0);
-    }
-    
-    rc = xs_close (pull);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    for (i = 0; i < PARALLEL_CONNECTS; i++)
-        thread_join (threads [i]);
-#endif
-
-    //  Test the exhaustion on TCP backlog.
-
-    ctx = xs_init ();
-    assert (ctx);
-
-    pull = xs_socket (ctx, XS_PULL);
-    assert (pull);
-    backlog = BACKLOG;
-    rc = xs_setsockopt (pull, XS_BACKLOG, &backlog, sizeof (backlog));
-    assert (rc == 0);
-    rc = xs_bind (pull, "tcp://127.0.0.1:5560");
-    assert (rc >= 0);
-
-    for (i = 0; i < PARALLEL_CONNECTS; i++)
-        threads [i] = thread_create (tcp_worker, ctx);
-
-    for (i = 0; i < PARALLEL_CONNECTS; i++) {
-        rc = xs_recv (pull, NULL, 0, 0);
-        assert (rc == 0);
-    }
-    
-    rc = xs_close (pull);
-    assert (rc == 0);
-    rc = xs_term (ctx);
-    assert (rc == 0);
-
-    for (i = 0; i < PARALLEL_CONNECTS; i++)
-        thread_join (threads [i]);
-#endif
-
-    return 0;
-}

tests/test_low_level.py

+# -*- coding: utf-8 -*-
+'''Lowest level tests for ctypes binding to Crossroads.IO library.'''
+
+from stuf.six import unittest
+
+from operator import attrgetter
+
+twoget = attrgetter(*'XS xs'.split())
+
+
+class TestLowestLevel(unittest.TestCase):
+
+    def setUp(self):
+        import crossroads.lowest
+        self.xs = crossroads.lowest
+        import crossroads.constants
+        self.XS = crossroads.constants
+
+    def bounce(self, sb, sc):
+        from ctypes.util import find_library
+        from ctypes import CDLL, c_char_p, c_int, sizeof, byref, c_size_t
+        XS, xs = twoget(self)
+        content = c_char_p(b'12345678ABCDEFGH12345678abcdefgh')
+        #  Send the message.
+        rc = xs.send(sc, content, 32, XS.SNDMORE)
+        self.assertEqual(rc, 32)
+        rc = xs.send(sc, content, 32, 0)
+        self.assertEqual(rc, 32)
+        #  Bounce the message back.
+        buf1 = (c_char_p * 32)()
+        rc = xs.recv(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        rcvmore = c_int()
+        sz = c_size_t(sizeof(rcvmore))
+        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sb, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        rc = xs.send(sb, buf1, 32, XS.SNDMORE)
+        self.assertEqual(rc, 32)
+        rc = xs.send(sb, buf1, 32, 0)
+        self.assertEqual(rc, 32)
+        #  Receive the bounced message.
+        buf2 = (c_char_p * 32)()
+        rc = xs.recv(sc, buf2, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(sc, buf2, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.getsockopt(sc, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        #  Check whether the message is still the same.
+        self.assertEqual(cpp.memcmp(buf2, content, 32), 0)
+
+    def time_assert(self, actual, expected):
+        '''
+        Check whether measured time is the expected time (in milliseconds). The
+        upper tolerance is 1/2 sec so that the test doesn't fail even on very
+        slow or very loaded systems.
+        '''
+        self.assertTrue(
+            actual > ((expected) - 50) and actual < ((expected) + 500)
+        )
+
+#extern "C"
+#{
+#    static void *thread_routine (void *arg_)
+#    {
+#        arg_t *arg = (arg_t*) arg_;
+#        arg->fn (arg->arg);
+#        return NULL;
+#    }
+#}
+#
+#void *thread_create (void (*fn_) (void *arg_), void *arg_)
+#{
+#    arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
+#    assert (arg);
+#    arg->fn = fn_;
+#    arg->arg = arg_;
+#    int rc = pthread_create (&arg->handle, NULL, thread_routine, (void*) arg);
+#    assert (rc == 0);
+#    return (void*) arg;
+#}
+#
+#void thread_join (void *thread_)
+#{
+#    arg_t *arg = (arg_t*) thread_;
+#    int rc = pthread_join (arg->handle, NULL);
+#    assert (rc == 0);
+#    free (arg);
+#}
+
+    def test_survey(self):
+        from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
+        XS, xs = twoget(self)
+        buf = (c_ubyte * 32)()
+        # Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xsurveyor = xs.socket(ctx, XS.XSURVEYOR)
+        self.assertTrue(xsurveyor)
+        rc = xs.bind(xsurveyor, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        xrespondent = xs.socket(ctx, XS.XRESPONDENT)
+        self.assertTrue(xrespondent)
+        rc = xs.bind(xrespondent, b'inproc://b')
+        self.assertNotEqual(rc, -1)
+        surveyor = xs.socket(ctx, XS.SURVEYOR)
+        self.assertTrue(surveyor)
+        rc = xs.connect(surveyor, b'inproc://b')
+        self.assertNotEqual(rc, -1)
+        respondent1 = xs.socket(ctx, XS.RESPONDENT)
+        self.assertTrue(respondent1)
+        rc = xs.connect(respondent1, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        respondent2 = xs.socket(ctx, XS.RESPONDENT)
+        self.assertTrue(respondent2)
+        rc = xs.connect(respondent2, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Send the survey.
+        rc = xs.send(surveyor, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        # Forward the survey through the intermediate device.
+        # Survey consist of identity(4 bytes), survey ID(4 bytes) and the body.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xsurveyor, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(xsurveyor, buf, 3, 0)
+        self.assertEqual(rc, 3)
+        # Respondent 1 responds to the survey.
+        rc = xs.recv(respondent1, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(respondent1, b'DE', 2, 0)
+        self.assertEqual(rc, 2)
+        # Forward the response through the intermediate device.
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.send(xrespondent, buf, 2, 0)
+        self.assertEqual(rc, 2)
+        # Surveyor gets the response.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        # Respondent 2 responds to the survey.
+        rc = xs.recv(respondent2, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(respondent2, b'FGHI', 4, 0)
+        self.assertEqual(rc, 4)
+        # Forward the response through the intermediate device.
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xsurveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, 0)
+        self.assertEqual(rc, 4)
+        # Surveyor gets the response.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        # Now let's test whether survey timeout works as expected.
+        timeout = c_int(100)
+        rc = xs.setsockopt(
+            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.send(surveyor, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        watch = xs.stopwatch_start()
+        try:
+            rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        except xs.XSError:
+            self.assertEqual(get_errno(), XS.ETIMEDOUT)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, timeout.value)
+        # Test whether responses for old surveys are discarded. First,
+        # initiate new survey.
+        rc = xs.setsockopt(
+            surveyor, XS.SURVEY_TIMEOUT, byref(timeout), sizeof(timeout)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.send(surveyor, b'DE', 2, 0)
+        self.assertEqual(rc, 2)
+        # Read, process and reply to the old survey.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        rc = xs.send(xrespondent, buf, 3, 0)
+        self.assertEqual(rc, 3)
+        # Read, process and reply to the new survey.
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 4)
+        rc = xs.send(xrespondent, buf, 4, XS.SNDMORE)
+        self.assertEqual(rc, 4)
+        rc = xs.recv(xrespondent, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.send(xrespondent, buf, 2, 0)
+        self.assertEqual(rc, 2)
+        # Get the response and check it's the response to the new survey and
+        # that response to the old survey was silently discarded.
+        rc = xs.recv(surveyor, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 2)
+        rc = xs.close(respondent2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(respondent1)
+        self.assertEqual(rc, 0)
+        rc = xs.close(surveyor)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xrespondent)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xsurveyor)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_empty(self):
+        _, xs = twoget(self)
+        # This is a very simple test to check whether everything works OK when
+        # context is terminated even before I/O threads were launched.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_reconnect(self):
+        import sys
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        #  Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(push)
+        # Connect before bind was done at the peer and send one message.
+        rc = xs.connect(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        # Wait a while for few attempts to reconnect to happen.
+        sleep(1)
+        # Bind the peer and get the message.
+        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        buf = (c_ubyte * 3)()
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Clean up.
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        if sys.platform not in ('win32', 'cygwin'):
+            # Now, let's test the same scenario with IPC.
+            push = xs.socket(ctx, XS.PUSH)
+            self.assertTrue(push)
+            pull = xs.socket(ctx, XS.PULL)
+            self.assertTrue(push)
+            # Connect before bind was done at the peer and send one message.
+            rc = xs.connect(push, b'ipc:///tmp/tester')
+            self.assertNotEqual(rc, -1)
+            rc = xs.send(push, b'ABC', 3, 0)
+            self.assertEqual(rc, 3)
+            # Wait a while for few attempts to reconnect to happen.
+            sleep(1)
+            # Bind the peer and get the message.
+            rc = xs.bind(pull, b'ipc:///tmp/tester')
+            self.assertNotEqual(rc, -1)
+            rc = xs.recv(pull, buf, sizeof(buf), 0)
+            self.assertEqual(rc, 3)
+            # Clean up.
+            rc = xs.close(push)
+            self.assertEqual(rc, 0)
+            rc = xs.close(pull)
+            self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_hwn(self):
+        from ctypes import sizeof, byref, c_int, get_errno, c_ubyte
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create pair of socket, each with high watermark of 2. Thus the total
+        # buffer space should be 4 messages.
+        sb = xs.socket(ctx, XS.PULL)
+        self.assertTrue(sb)
+        hwm = c_int(2)
+        rc = xs.setsockopt(sb, XS.RCVHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(sc)
+        rc = xs.setsockopt(sc, XS.SNDHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Try to send 10 messages. Only 4 should succeed.
+        for t in xrange(10):
+            try:
+                rc = xs.send(sc, None, 0, XS.DONTWAIT)
+                if t < 4:
+                    self.assertEqual(rc, 0)
+            except xs.XSError:
+                self.assertTrue(get_errno(), XS.EAGAIN)
+        # There should be now 4 messages pending, consume them.
+        for i in xrange(4):
+            rc = xs.recv(sb, None, 0, 0)
+            self.assertEqual(rc, 0)
+        # Now it should be possible to send one more.
+        rc = xs.send(sc, None, 0, 0)
+        self.assertEqual(rc, 0)
+        # Consume the remaining message.
+        rc = xs.recv(sb, None, 0, 0)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        # Following part of the tests checks whether small HWMs don't interact
+        # with command throttling in strange ways.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        s1 = xs.socket(ctx, XS.PULL)
+        self.assertTrue(s1)
+        s2 = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s2)
+        hwm = c_int(5)
+        rc = xs.setsockopt(s2, XS.SNDHWM, byref(hwm), sizeof(hwm))
+        self.assertEqual(rc, 0)
+        rc = xs.bind(s1, b'tcp://127.0.0.1:5858')
+        self.assertTrue(rc >= 0)
+        rc = xs.connect(s2, b'tcp://127.0.0.1:5858')
+        self.assertTrue(rc >= 0)
+        for i in xrange(10):
+            rc = xs.send(s2, b'test', 4, XS.DONTWAIT)
+            self.assertEqual(rc, 4)
+            buf = (c_ubyte * 4)()
+            rc = xs.recv(s1, buf, sizeof(buf), 0)
+            self.assertEqual(rc, 4)
+        rc = xs.close(s2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(s1)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_tcp(self):
+        from time import sleep
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        # Now let's try to open one more connection to the bound socket. The
+        # connection should be silently rejected rather than causing error.
+        sc2 = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc2)
+        rc = xs.connect(sc2, b'tcp://127.0.0.1:5560')
+        sleep(1)
+        rc = xs.close(sc2)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_ipc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_sub_forward(self):
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # First, create an intermediate device.
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        xsub = xs.socket(ctx, XS.XSUB)
+        self.assertTrue(xsub)
+        rc = xs.bind(xsub, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a publisher.
+        pub = xs.socket(ctx, XS.PUB)
+        self.assertTrue(pub)
+        rc = xs.connect(pub, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a subscriber.
+        sub = xs.socket(ctx, XS.SUB)
+        self.assertTrue(sub)
+        rc = xs.connect(sub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Subscribe for all messages.
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, '', 0)
+        self.assertEqual(rc, 0)
+        # Pass the subscription upstream through the device.
+        buff = (c_ubyte * 32)()
+        rc = xs.recv(xpub, buff, sizeof(buff), 0)
+        self.assertTrue(rc >= 0)
+        rc = xs.send(xsub, buff, rc, 0)
+        self.assertTrue(rc >= 0)
+        # Wait a bit till the subscription gets to the publisher.
+        sleep(1)
+        # Send an empty message.
+        rc = xs.send(pub, None, 0, 0)
+        self.assertEqual(rc, 0)
+        # Pass the message downstream through the device.
+        rc = xs.recv(xsub, buff, sizeof(buff), 0)
+        self.assertTrue(rc >= 0)
+        rc = xs.send(xpub, buff, rc, 0)
+        self.assertTrue(rc >= 0)
+        # Receive the message in the subscriber.
+        rc = xs.recv(sub, buff, sizeof(buff), 0)
+        self.assertEqual(rc, 0)
+        # Clean up.
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xsub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(pub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sub)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_pair_inproc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, 'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.PAIR)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, 'inproc://a')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_shutdown(self):
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        XS, xs = twoget(self)
+        buf = (c_ubyte * 32)()
+        # Create infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        push_id = xs.bind(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(push_id, -1)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(pull)
+        rc = xs.connect(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Pass one message through to ensure the connection is established.
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Shut down the bound endpoint.
+        rc = xs.shutdown(push, push_id)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        try:
+            # Check that sending would block (there's no outbound connection).
+            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
+        except xs.XSError:
+            self.assertTrue(xs.errno(), XS.EAGAIN)
+        # Clean up.
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        # Now the other way round...Create infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(pull)
+        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        push_id = xs.connect(push, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(push_id, -1)
+        # Pass one message through to ensure the connection is established.
+        rc = xs.send(push, b'ABC', 3, 0)
+        self.assertEqual(rc, 3)
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        # Shut down the bound endpoint.
+        rc = xs.shutdown(push, push_id)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        try:
+            # Check that sending would block (there's no outbound connection).
+            rc = xs.send(push, b'ABC', 3, XS.DONTWAIT)
+        except xs.XSError:
+            self.assertTrue(xs.errno(), XS.EAGAIN)
+        # Clean up.
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_linger(self):
+        from ctypes import byref, sizeof, c_int
+        XS, xs = twoget(self)
+        #  Create socket.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        s = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s)
+        #  Set linger to 0.1 second.
+        linger = c_int(100)
+        rc = xs.setsockopt(s, XS.LINGER, byref(linger), sizeof(linger))
+        #  Connect to non-existent endpoing.
+        self.assertEqual(rc, 0)
+        rc = xs.connect(s, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        #  Send a message.
+        rc = xs.send(s, b'r', 1, 0)
+        self.assertEqual(rc, 1)
+        #  Close the socket.
+        rc = xs.close(s)
+        self.assertEqual(rc, 0)
+        #  Terminate the context. This should take 0.1 second.
+        watch = xs.stopwatch_start()
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+        ms = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(ms, linger.value)
+
+    def test_msg_flags(self):
+        from ctypes import byref, sizeof, c_int, c_size_t
+        XS, xs = twoget(self)
+        # Create the infrastructure
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.XREP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.XREQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        # Send 2 - part message.
+        rc = xs.send(sc, b'A', 1, XS.SNDMORE)
+        self.assertEqual(rc, 1)
+        rc = xs.send(sc, b'B', 1, 0)
+        self.assertEqual(rc, 1)
+        # Identity comes first.
+        msg = xs.msg_t()
+        rc = xs.msg_init(byref(msg))
+        self.assertEqual(rc, 0)
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertTrue(rc >= 0)
+        more = c_int()
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 1)
+        # Then the first part of the message body.
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertEqual(rc, 1)
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 1)
+        # And finally, the second part of the message body.
+        rc = xs.recvmsg(sb, byref(msg), 0)
+        self.assertEqual(rc, 1)
+        more_size = c_size_t(sizeof(more))
+        rc = xs.getmsgopt(byref(msg), XS.MORE, byref(more), byref(more_size))
+        self.assertEqual(rc, 0)
+        self.assertEqual(more.value, 0)
+        # Deallocate the infrastructure.
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_max_sockets(self):
+        from ctypes import byref, sizeof, c_int, get_errno
+        XS, xs = twoget(self)
+        # Create context and set MAX_SOCKETS to 1.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        max_sockets = c_int(1)
+        rc = xs.setctxopt(
+            ctx, XS.MAX_SOCKETS, byref(max_sockets), sizeof(max_sockets)
+        )
+        self.assertEqual(rc, 0)
+        # First socket should be created OK.
+        s1 = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(s1)
+        # Creation of second socket should fail.
+        try:
+            xs.socket(ctx, XS.PUSH)
+        except xs.XSError:
+            self.assertEqual(get_errno(), XS.EMFILE)
+        # Clean up.
+        rc = xs.close(s1)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_invalid_rep(self):
+        from stuf.six import int2byte
+        from ctypes import byref, sizeof, c_int, c_ubyte
+        XS, xs = twoget(self)
+        # Create REQ/XREP wiring.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xrep_socket = xs.socket(ctx, XS.XREP)
+        self.assertTrue(xrep_socket)
+        req_socket = xs.socket(ctx, XS.REQ)
+        self.assertTrue(req_socket)
+        linger = c_int(0)
+        rc = xs.setsockopt(
+            xrep_socket, XS.LINGER, byref(linger), sizeof(linger)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.setsockopt(
+            req_socket, XS.LINGER, byref(linger), sizeof(linger)
+        )
+        self.assertEqual(rc, 0)
+        rc = xs.bind(xrep_socket, b'inproc://hi')
+        self.assertNotEqual(rc, -1)
+        rc = xs.connect(req_socket, b'inproc://hi')
+        self.assertNotEqual(rc, -1)
+        # Initial request.
+        rc = xs.send(req_socket, b'r', 1, 0)
+        self.assertEqual(rc, 1)
+        # Receive the request.
+        addr = (c_ubyte * 32)()
+        bottom = (c_ubyte * 1)()
+        body = (c_ubyte * 1)()
+        addr_size = xs.recv(xrep_socket, addr, sizeof(addr), 0)
+        self.assertTrue(addr_size >= 0)
+        rc = xs.recv(xrep_socket, bottom, sizeof(bottom), 0)
+        self.assertEqual(rc, 0)
+        rc = xs.recv(xrep_socket, body, sizeof(body), 0)
+        self.assertEqual(rc, 1)
+        # Send invalid reply.
+        rc = xs.send(xrep_socket, addr, addr_size, 0)
+        self.assertEqual(rc, addr_size)
+        # Send valid reply.
+        rc = xs.send(xrep_socket, addr, addr_size, XS.SNDMORE)
+        self.assertEqual(rc, addr_size)
+        rc = xs.send(xrep_socket, bottom, 0, XS.SNDMORE)
+        self.assertEqual(rc, 0)
+        rc = xs.send(xrep_socket, b'b', 1, 0)
+        self.assertEqual(rc, 1)
+        # Check whether we've got the valid reply.
+        rc = xs.recv(req_socket, body, sizeof(body), 0)
+        self.assertEqual(rc, 1)
+        self.assertEqual(int2byte(body[0]), b'b')
+        # Tear down the wiring.
+        rc = xs.close(xrep_socket)
+        self.assertEqual(rc, 0)
+        rc = xs.close(req_socket)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_resubscribe(self):
+        from stuf.six import int2byte
+        from time import sleep
+        from ctypes import sizeof, c_ubyte
+        from crossroads.lowest import array
+        XS, xs = twoget(self)
+        # Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        sub = xs.socket(ctx, XS.SUB)
+        self.assertTrue(sub)
+        # Send two subscriptions upstream.
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, b'a', 1)
+        self.assertEqual(rc, 0)
+        rc = xs.setsockopt(sub, XS.SUBSCRIBE, b'b', 1)
+        self.assertEqual(rc, 0)
+        rc = xs.connect(sub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Check whether subscriptions are correctly received.
+        buf = array(c_ubyte, 5)
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'a')
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'b')
+        # Tear down the connection.
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        sleep(1)
+        # Re-establish the connection.
+        xpub = xs.socket(ctx, XS.XPUB)
+        self.assertTrue(xpub)
+        rc = xs.bind(xpub, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # We have to give control to the SUB socket here so that it has
+        # chance to resend the subscriptions.
+        try:
+            rc = xs.recv(sub, buf, sizeof(buf), XS.DONTWAIT)
+        except xs.XSError:
+            self.assertEqual(xs.errno(), XS.EAGAIN)
+        # Check whether subscriptions are correctly generated.
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'a')
+        rc = xs.recv(xpub, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 5)
+        self.assertEqual(buf[0], 0)
+        self.assertEqual(buf[1], 1)
+        self.assertEqual(buf[2], 0)
+        self.assertEqual(buf[3], 1)
+        self.assertEqual(int2byte(buf[4]), b'b')
+        # Clean up.
+        rc = xs.close(sub)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xpub)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_inproc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'inproc://a')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_ipc(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'ipc:///tmp/tester')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_tcp(self):
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        sb = xs.socket(ctx, XS.REP)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        sc = xs.socket(ctx, XS.REQ)
+        self.assertTrue(sc)
+        rc = xs.connect(sc, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        self.bounce(sb, sc)
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_regrep_device(self):
+        from ctypes.util import find_library
+        from ctypes import CDLL, byref, sizeof, c_int, c_ubyte, c_size_t
+        from crossroads.lowest import array
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        XS, xs = twoget(self)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create a req/rep device.
+        xreq = xs.socket(ctx, XS.XREQ)
+        self.assertTrue(xreq)
+        rc = xs.bind(xreq, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        xrep = xs.socket(ctx, XS.XREP)
+        self.assertTrue(xrep)
+        rc = xs.bind(xrep, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Create a worker.
+        rep = xs.socket(ctx, XS.REP)
+        self.assertTrue(rep)
+        rc = xs.connect(rep, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        # Create a client.
+        req = xs.socket(ctx, XS.REQ)
+        self.assertTrue(req)
+        rc = xs.connect(req, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Send a request.
+        rc = xs.send(req, b'ABC', 3, XS.SNDMORE)
+        self.assertEqual(rc, 3)
+        rc = xs.send(req, b'DEF', 3, 0)
+        self.assertEqual(rc, 3)
+        # Pass the request through the device.
+        for i in xrange(4):
+            msg = xs.msg_t()
+            rc = xs.msg_init(byref(msg))
+            self.assertEqual(rc, 0)
+            rc = xs.recvmsg(xrep, byref(msg), 0)
+            self.assertTrue(rc >= 0)
+            rcvmore = c_int()
+            sz = c_size_t(sizeof(rcvmore))
+            rc = xs.getsockopt(xrep, XS.RCVMORE, byref(rcvmore), byref(sz))
+            self.assertEqual(rc, 0)
+            rc = xs.sendmsg(xreq, byref(msg), XS.SNDMORE if rcvmore else 0)
+            self.assertTrue(rc >= 0)
+        # Receive the request.
+        buff = array(c_ubyte, 3)
+        rc = xs.recv(rep, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'ABC', 3), 0)
+        rcvmore = c_int()
+        sz = c_size_t(sizeof(rcvmore))
+        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(rep, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'DEF', 3), 0)
+        rc = xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        # Send the reply.
+        rc = xs.send(rep, b'GHI', 3, XS.SNDMORE)
+        self.assertEqual(rc, 3)
+        rc = xs.send(rep, b'JKL', 3, 0)
+        self.assertEqual(rc, 3)
+        # Pass the reply through the device.
+        for i in xrange(4):
+            msg = xs.msg_t()
+            rc = xs.msg_init(byref(msg))
+            self.assertEqual(rc, 0)
+            rc = xs.recvmsg(xreq, byref(msg), 0)
+            self.assertTrue(rc >= 0)
+            rc = xs.getsockopt(xreq, XS.RCVMORE, byref(rcvmore), byref(sz))
+            self.assertEqual(rc, 0)
+            rc = xs.sendmsg(xrep, byref(msg), XS.SNDMORE if rcvmore else 0)
+            self.assertTrue(rc >= 0)
+        # Receive the reply.
+        rc = xs.recv(req, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'GHI', 3), 0)
+        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertTrue(rcvmore)
+        rc = xs.recv(req, buff, 3, 0)
+        self.assertEqual(rc, 3)
+        self.assertEqual(cpp.memcmp(buff, b'JKL', 3), 0)
+        rc = xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz))
+        self.assertEqual(rc, 0)
+        self.assertFalse(rcvmore)
+        # Clean up.
+        rc = xs.close(req)
+        self.assertEqual(rc, 0)
+        rc = xs.close(rep)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xrep)
+        self.assertEqual(rc, 0)
+        rc = xs.close(xreq)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_wireformat(self):
+        XS, xs = twoget(self)
+        import socket
+        from ctypes.util import find_library
+        from ctypes import CDLL, sizeof, c_ubyte
+        from crossroads.lowest import array
+        cpp = CDLL(find_library('libcpp'), use_errno=True)
+        # Create the basic infrastructure.
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        push = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(push)
+        pull = xs.socket(ctx, XS.PULL)
+        self.assertTrue(push)
+        # Bind the peer and get the message.
+        rc = xs.bind(pull, b'tcp://127.0.0.1:5560')
+        self.assertNotEqual(rc, -1)
+        rc = xs.bind(push, b'tcp://127.0.0.1:5561')
+        self.assertNotEqual(rc, -1)
+        # Connect to the peer using raw sockets.
+        rpush = socket.socket(
+            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP
+        )
+        rpush.connect(('127.0.0.1', 5560))
+        rpull = socket.socket(
+            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
+        )
+        rpull.connect(('127.0.0.1', 5561))
+        # Let's send some data and check if it arrived
+        rc = rpush.send(b'\x04\0abc', 0)
+        self.assertEqual(rc, 5)
+        buf = array(c_ubyte, 3)
+        rc = xs.recv(pull, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        self.assertFalse(cpp.memcmp(buf, b'abc', 3))
+        # Let's push this data into another socket
+        rc = xs.send(push, buf, sizeof(buf), 0)
+        self.assertEqual(rc, 3)
+        buf2 = rpull.recv(3, 0)
+        self.assertFalse(cpp.memcmp(buf2, b'\x04\0abc', 3))
+        rpush.close()
+        rpull.close()
+        rc = xs.close(pull)
+        self.assertEqual(rc, 0)
+        rc = xs.close(push)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_timeo(self):
+        from time import sleep
+        from ctypes import c_int, byref, c_char_p, c_size_t, sizeof
+        XS, xs = twoget(self)
+        def timeo_worker(ctx_):
+            # Worker thread connects after delay of 1 second. Then it waits
+            # for 1 more second, so that async connect has time to succeed.
+            sleep(1)
+            sc = xs.socket(ctx_, XS.PUSH)
+            self.assertTrue(sc)
+            rc = xs.connect(sc, b'inproc://timeout_test')
+            self.assertNotEqual(rc, -1)
+            sleep(1)
+            rc = xs.close(sc)
+            self.assertEqual(rc, 0)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create a disconnected socket.
+        sb = xs.socket(ctx, XS.PULL)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://timeout_test')
+        self.assertNotEqual(rc, -1)
+        # Check whether non-blocking recv returns immediately.
+        buf = c_char_p('12345678ABCDEFGH12345678abcdefgh')
+        try:
+            rc = xs.recv(sb, buf, 32, XS.DONTWAIT)
+        except xs.XSError:
+            self.assertEqual(xs.errno(), XS.EAGAIN)
+        # Check whether recv timeout is honoured.
+        timeout = c_int(500)
+        timeout_size = c_size_t(sizeof(timeout))
+        rc = xs.setsockopt(sb, XS.RCVTIMEO, byref(timeout), timeout_size)
+        self.assertEqual(rc, 0)
+        watch = xs.stopwatch_start()
+        try:
+            rc = xs.recv(sb, buf, 32, 0)
+        except xs.XSError:
+            self.assertEqual(xs.errno(), XS.EAGAIN)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, timeout.value)
+        # Check whether connection during the wait doesn't distort the timeout.
+        timeout = c_int(2000)
+        rc = xs.setsockopt(sb, XS.RCVTIMEO, byref(timeout), timeout_size)
+        self.assertEqual(rc, 0)
+#        thread = self.thread_create(timeo_worker, ctx)
+#        self.assertTrue(thread)
+        watch = xs.stopwatch_start()
+        try:
+            rc = xs.recv(sb, buf, 32, 0)
+            self.assertEqual(rc, -1)
+        except xs.XSError:
+            self.assertTrue(xs.errno(), XS.EAGAIN)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, timeout.value)
+#        self.thread_join(thread)
+        # Check that timeouts don't break normal message transfer.
+        sc = xs.socket(ctx, XS.PUSH)
+        self.assertTrue(sc)
+        rc = xs.setsockopt(sb, XS.RCVTIMEO, byref(timeout), timeout_size)
+        self.assertEqual(rc, 0)
+        rc = xs.setsockopt(sb, XS.SNDTIMEO, byref(timeout), timeout_size)
+        self.assertEqual(rc, 0)
+        rc = xs.connect(sc, b'inproc://timeout_test')
+        self.assertNotEqual(rc, -1)
+        rc = xs.send(sc, buf, 32, 0)
+        self.assertEqual(rc, 32)
+        rc = xs.recv(sb, buf, 32, 0)
+        self.assertEqual(rc, 32)
+        # Clean-up.
+        rc = xs.close(sc)
+        self.assertEqual(rc, 0)
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+    def test_polltimeo(self):
+        from time import sleep
+        from ctypes import byref
+        XS, xs = twoget(self)
+        def polltimeo_worker(ctx_):
+            # Worker thread connects after delay of 1 second. Then it waits
+            # for 1 more second, so that async connect has time to succeed.
+            sleep(1)
+            sc = xs.socket(ctx_, XS.PUSH)
+            self.assertTrue(sc)
+            rc = xs.connect(sc, b'inproc://timeout_test')
+            self.assertNotEqual(rc, -1)
+            sleep(1)
+            rc = xs.close(sc)
+            self.assertEqual(rc, 0)
+        ctx = xs.init()
+        self.assertTrue(ctx)
+        # Create a disconnected socket.
+        sb = xs.socket(ctx, XS.PULL)
+        self.assertTrue(sb)
+        rc = xs.bind(sb, b'inproc://timeout_test')
+        self.assertNotEqual(rc, -1)
+        # Check whether timeout is honoured.
+        pi = xs.pollitem_t()
+        pi.socket = sb
+        pi.events = XS.POLLIN
+        watch = xs.stopwatch_start()
+        rc = xs.poll(byref(pi), 1, 500)
+        self.assertEqual(rc, 0)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, 500)
+        # Check whether connection during the wait doesn't distort the timeout.
+#        thread = self.thread_create(polltimeo_worker, ctx)
+#        self.assertTrue(thread)
+        watch = xs.stopwatch_start()
+        rc = xs.poll(byref(pi), 1, 2000)
+        self.assertEqual(rc, 0)
+        elapsed = xs.stopwatch_stop(watch) / 1000
+        self.time_assert(elapsed, 2000)
+#        self.thread_join(thread)
+        # Clean-up.
+        rc = xs.close(sb)
+        self.assertEqual(rc, 0)
+        rc = xs.term(ctx)
+        self.assertEqual(rc, 0)
+
+#    def test_shutdown_stress(self):
+#        from ctypes import c_int, byref, sizeof, get_errno
+#        XS, xs = twoget(self)
+#        THREAD_COUNT = 100
+#        def shutdown_stress_worker(s_):
+#            rc = xs.connect(s_, b'tcp://127.0.0.1:5560')
+#            self.assertNotEqual(rc, -1)
+#            # Start closing the socket while the connecting process is underway.
+#            rc = xs.close(s_)
+#            self.assertEqual(rc, 0)
+#        threads = []  # THREAD_COUNT
+#        for j in xrange(10):
+#            # Check the shutdown with many parallel I/O threads.
+#            ctx = xs.init()
+#            self.assertTrue(ctx)
+#            io_threads = c_int(7)
+#            rc = xs.setctxopt(
+#                ctx, XS.IO_THREADS, byref(io_threads), sizeof(io_threads)
+#            )
+#            self.assertEqual(rc, 0)
+#            s1 = xs.socket(ctx, XS.PUB)
+#            self.assertTrue(s1)
+#            rc = xs.bind(s1, b'tcp://127.0.0.1:5560')
+#            self.assertNotEqual(rc, -1)
+#            for i in xrange(THREAD_COUNT):
+#                s2 = xs.socket(ctx, XS.SUB)
+#                if not s2 and (
+#                    get_errno() == XS.EMFILE or get_errno() == XS.ENFILE
+#                ):
+#                    threads[i] = None
+#                else:
+#                    self.assertTrue(s2)
+#                    threads[i] = self.thread_create(shutdown_stress_worker, s2)
+#                    self.assertTrue(threads[i])
+#            for i in xrange(THREAD_COUNT):
+#                if threads[i]:
+#                    self.thread_join(threads[i])
+#            rc = xs.close(s1)
+#            self.assertEqual(rc, 0)
+#            rc = xs.term(ctx)
+#            self.assertEqual(rc, 0)
+
+#    def test_backlog(self):
+#        BACKLOG = 10
+#        PARALLEL_CONNECTS = 50
+#        def ipc_worker (ctx_):
+#            int rc;
+#            push;
+#            push = xs.socket (ctx_, XS.PUSH);
+#            self.assertTrue(push);
+#            rc = xs.connect (push, 'ipc:///tmp/test-backlog');
+#            self.assertTrue(rc >= 0);
+#            rc = xs.send (push, None, 0, 0);
+#            self.assertEqual(rc, 0)
+#            rc = xs.close (push);
+#            self.assertEqual(rc, 0)
+#
+#        def tcp_worker(ctx_):
+#            int rc;
+#            push;
+#            push = xs.socket (ctx_, XS.PUSH);
+#            self.assertTrue(push);
+#            rc = xs.connect (push, 'tcp://127.0.0.1:5560');
+#            self.assertTrue(rc >= 0);
+#            rc = xs.send (push, None, 0, 0);
+#            self.assertEqual(rc, 0)
+#            rc = xs.close (push);
+#            self.assertEqual(rc, 0)
+#            int rc;
+#            ctx;
+#            pull;
+#            int i;
+#            int backlog;
+#            threads [PARALLEL_CONNECTS];
+#            #  Test the exhaustion on IPC backlog.
+#        #if !defined XS.HAVE_WINDOWS byrefbyref(!defined XS.HAVE_OPENVMS
+#            ctx = xs.init ();
+#            self.assertTrue(ctx)
+#            pull = xs.socket (ctx, XS.PULL);
+#            self.assertTrue(pull);
+#            backlog = BACKLOG;
+#            rc = xs.setsockopt (pull, XS.BACKLOG, byrefbacklog, sizeof (backlog));
+#            self.assertEqual(rc, 0)
+#            rc = xs.bind (pull, 'ipc:///tmp/test-backlog');
+#            self.assertTrue(rc >= 0);
+#            for (i = 0; i < PARALLEL_CONNECTS; i++)
+#                threads [i] = thread_create (ipc_worker, ctx);
+#            for (i = 0; i < PARALLEL_CONNECTS; i++) {
+#                rc = xs.recv (pull, None, 0, 0);
+#                self.assertEqual(rc, 0)
+#            }
+#            rc = xs.close (pull);
+#            self.assertEqual(rc, 0)
+#            rc = xs.term (ctx);
+#            self.assertEqual(rc, 0)
+#            for (i = 0; i < PARALLEL_CONNECTS; i++)
+#                thread_join (threads [i]);
+#        #endif
+#            # Test the exhaustion on TCP backlog.
+#            ctx = xs.init ();
+#            self.assertTrue(ctx)
+#            pull = xs.socket (ctx, XS.PULL);
+#            self.assertTrue(pull);
+#            backlog = BACKLOG;
+#            rc = xs.setsockopt (pull, XS.BACKLOG, byrefbacklog, sizeof (backlog));
+#            self.assertEqual(rc, 0)
+#            rc = xs.bind (pull, b'tcp://127.0.0.1:5560');
+#            self.assertTrue(rc >= 0);
+#            for (i = 0; i < PARALLEL_CONNECTS; i++)
+#                threads [i] = thread_create (tcp_worker, ctx);
+#            for (i = 0; i < PARALLEL_CONNECTS; i++) {
+#                rc = xs.recv (pull, None, 0, 0);
+#                self.assertEqual(rc, 0)
+#            }
+#            rc = xs.close (pull);
+#            self.assertEqual(rc, 0)
+#            rc = xs.term (ctx);
+#            self.assertEqual(rc, 0)
+#
+#            for (i = 0; i < PARALLEL_CONNECTS; i++)
+#                thread_join (threads [i]);
+#        #endif

tests/test_lowest_level.py

+# -*- coding: utf-8 -*-
+'''Lowest level tests for ctypes binding to Crossroads.IO library.'''
+
 from stuf.six import unittest
 
 from operator import attrgetter
+from ctypes import string_at
 
 twoget = attrgetter(*'XS xs'.split())
 
 class TestLowestLevel(unittest.TestCase):
 
     def setUp(self):
-        import crossroads.api
-        self.xs = crossroads.api
+        import crossroads.lowest
+        self.xs = crossroads.lowest
         import crossroads.constants
         self.XS = crossroads.constants
 
             actual > ((expected) - 50) and actual < ((expected) + 500)
         )
 
+#extern "C"
+#{
+#    static void *thread_routine (void *arg_)
+#    {
+#        arg_t *arg = (arg_t*) arg_;
+#        arg->fn (arg->arg);
+#        return NULL;
+#    }
+#}
+#
+#void *thread_create (void (*fn_) (void *arg_), void *arg_)
+#{
+#    arg_t *arg = (arg_t*) malloc (sizeof (arg_t));
+#    assert (arg);
+#    arg->fn = fn_;
+#    arg->arg = arg_;
+#    int rc = pthread_create (&arg->handle, NULL, thread_routine, (void*) arg);
+#    assert (rc == 0);
+#    return (void*) arg;
+#}
+#
+#void thread_join (void *thread_)
+#{
+#    arg_t *arg = (arg_t*) thread_;
+#    int rc = pthread_join (arg->handle, NULL);
+#    assert (rc == 0);
+#    free (arg);
+#}
+
     def test_survey(self):
         from ctypes import sizeof, c_ubyte, byref, c_int, get_errno
         XS, xs = twoget(self)
         from stuf.six import int2byte
         from time import sleep
         from ctypes import sizeof, c_ubyte
-        from crossroads.api import array
+        from crossroads.lowest import array
         XS, xs = twoget(self)
         # Create the basic infrastructure.
         ctx = xs.init()
     def test_regrep_device(self):
         from ctypes.util import find_library
         from ctypes import CDLL, byref, sizeof, c_int, c_ubyte, c_size_t
-        from crossroads.api import array
+        from crossroads.lowest import array
         cpp = CDLL(find_library('libcpp'), use_errno=True)
         XS, xs = twoget(self)
         ctx = xs.init()
         # Create a req/rep device.
         xreq = xs.socket(ctx, XS.XREQ)
         self.assertTrue(xreq)
-        rc = xs.bind(xreq, 'tcp://127.0.0.1:5560')
+        rc = xs.bind(xreq, b'tcp://127.0.0.1:5560')
         self.assertNotEqual(rc, -1)
         xrep = xs.socket(ctx, XS.XREP)
         self.assertTrue(xrep)
-        rc = xs.bind(xrep, 'tcp://127.0.0.1:5561')
+        rc = xs.bind(xrep, b'tcp://127.0.0.1:5561')
         self.assertNotEqual(rc, -1)
         # Create a worker.
         rep = xs.socket(ctx, XS.REP)
         self.assertTrue(rep)
-        rc = xs.connect(rep, 'tcp://127.0.0.1:5560')
+        rc = xs.connect(rep, b'tcp://127.0.0.1:5560')
         self.assertNotEqual(rc, -1)
         # Create a clie