Commits

Lynn Rees committed c33c151

- breakout message

Comments (0)

Files changed (4)

crossroads/low.py

 # -*- coding: utf-8 -*-
-'''Low level API for ctypes binding to Crossroads.IO library.'''
+'''Low level API for ctypes binding to Crossroads.IO.'''
 
 from uuid import uuid4
 from functools import partial
 from operator import methodcaller
-from ctypes import (
-    byref, sizeof, c_int, c_int64, c_char_p, string_at, c_size_t, c_ubyte)
+from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_size_t
 
 from stuf.desc import Setter, lazy
-from stuf.six import OrderedDict, PY3, items, tobytes, tounicode, b
+from stuf.six import OrderedDict, items, tobytes, b
 
 from . import lowest as xs
 from . import constants as XS
+from .message import XSMessage, Message
 
 PREFIX = dict(inproc=b('inproc://'), ipc=b('ipc://'), tcp=b('tcp://'))
 split = methodcaller('rsplit', '_')
 xsget = partial(getattr, xs)
 
 
-class BaseMsg(object):
-
-    def __init__(self, size, source=None):
-        self._isize = size
-        self.source = source
-        self.more = self.rc = self.dest = None
-#        self.parts = []
-
-    def __bytes__(self, *args, **kwargs):
-        if self.dest is not None:
-            return tobytes(string_at(self.ref, len(self)), 'latin-1')
-        return b('')
-
-    def __unicode__(self, *args, **kwargs):
-        return tounicode(tobytes(self, 'utf-8'))
-
-    __str__ = __unicode__ if PY3 else __bytes__
-
-    @property
-    def array(self):
-        return bytearray(self.__bytes__())
-
-class Message(BaseMsg):
-
-    def __init__(self, size, source=None):
-        super(Message, self).__init__(size, source)
-        if source is None:
-            self.dest = xs.array(c_ubyte, size)
-
-    def __len__(self):
-        return len(self.source) if self.dest is None else sizeof(self.dest)
-
-    @property
-    def ref(self):
-        return None if self.dest is None else byref(self.dest)
-
-
-class XSMessage(BaseMsg):
-
-    _isize = 32
-
-    def __init__(self, data=None):
-        super(XSMessage, self).__init__(self._isize, data)
-        self.dest = dest = xs.msg_t()
-        if data is None:
-            self.last_rc = xs.msg_init(dest)
-        else:
-            self.last_rc = xs.msg_init_size(dest, self._isize)
-            xs.msg_init_size(dest, self._isize)
-            xs.memmove(xs.msg_data(dest), data, self._isize)
-
-    def __len__(self):
-        return sizeof(self.dest)
-
-    def close(self):
-        self.last_rc = xs.msg_close(self.dest)
-
-
 class Options(Setter):
 
     def __init__(self, **options):

crossroads/lowest.py

 # -*- coding: utf-8 -*-
-'''Lowest-level ctypes bindings to Crossroads API functions.'''
+'''Lowest-level ctypes bindings to Crossroads.IO API.'''
 
 from functools import partial
 from ctypes.util import find_library
 from ctypes import (
-    Structure, CDLL, POINTER, memmove, get_errno, byref, c_void_p, c_int,
-    c_char_p, c_ubyte, c_size_t, c_short, c_ulong)
+    Structure, CDLL, POINTER, memmove, get_errno, c_void_p, c_int, c_char_p,
+    c_ubyte, c_size_t, c_short, c_ulong)
 
 
 class XSBaseError(Exception):
 
 memmove.restype = c_void_p
 
-###############################################################################
-## CROSSROADS VERSIONING SUPPORT ##############################################
-###############################################################################
+################################################################################
+## CROSSROADS VERSIONING SUPPORT ###############################################
+################################################################################
 
-# void xs_version (int *major, int *minor, int *patch);
 version = voidfactory('version', POINTER(c_int), POINTER(c_int), POINTER(c_int))
-#VERSION = c_int(), c_int(), c_int()
-#version(byref(VERSION[0]), byref(VERSION[1]), byref(VERSION[2]))
-#VERSION = tuple(x.value for x in VERSION)
 
-###############################################################################
-## CROSSROADS ERRORS ##########################################################
-###############################################################################
+################################################################################
+## CROSSROADS ERRORS ###########################################################
+################################################################################
 
 # int xs_errno (void)
 errno = intfactory('errno')
 # const char *xs_strerror (int errnum)
 strerror = factory(c_char_p, 'strerror', c_int)
 
-###############################################################################
-## CROSSROADS MESSAGE DEFINITION ##############################################
-###############################################################################
+################################################################################
+## CROSSROADS MESSAGE DEFINITION ###############################################
+################################################################################
 
 
 class msg_t(Structure):
     'getmsgopt', POINTER(msg_t), c_int, POINTER(c_int), POINTER(c_size_t)
 )
 
-###############################################################################
-## CROSSROADS CONTEXT DEFINITION ##############################################
-###############################################################################
+################################################################################
+## CROSSROADS CONTEXT DEFINITION ###############################################
+################################################################################
 
 # void *xs_init (void);
 init = voidfactory('init')
 # );
 setctxopt = intfactory('setctxopt', c_void_p, c_int, POINTER(c_int), c_size_t)
 
-###############################################################################
-## CROSSROADS SOCKET DEFINITION ###############################################
-###############################################################################
+################################################################################
+## CROSSROADS SOCKET DEFINITION ################################################
+################################################################################
 
 #void *xs_socket (void *context, int type);
 socket = voidfactory('socket', c_void_p, c_int)
 # int xs_recvmsg (void *s, xs_msg_t *msg, int flags);
 recvmsg = intfactory('recvmsg', c_void_p, POINTER(msg_t), c_int)
 
-###############################################################################
-## I/O MULTIPLEXING ###########################################################
-###############################################################################
+################################################################################
+## I/O MULTIPLEXING ############################################################
+################################################################################
 
 
 class pollitem_t(Structure):

crossroads/message.py

+# -*- coding: utf-8 -*-
+'''Crossroads.IO messages.'''
+
+from stuf.six import PY3, tobytes, tounicode, b
+from ctypes import byref, sizeof, string_at, c_ubyte
+
+from . import lowest as xs
+
+
+class BaseMsg(object):
+
+    def __init__(self, size, source=None):
+        self.source = source
+        self.more = self.rc = self.dest = None
+
+    def __bytes__(self, *args, **kwargs):
+        if self.dest is not None:
+            return tobytes(string_at(self.ref, len(self)), 'latin-1')
+        return b('')
+
+    def __unicode__(self, *args, **kwargs):
+        return tounicode(tobytes(self, 'utf-8'))
+
+    __str__ = __unicode__ if PY3 else __bytes__
+
+    @property
+    def array(self):
+        return bytearray(self.__bytes__())
+
+
+class BaseMultipart(object):
+
+    def __init__(self, size, source=None):
+        self.source = source
+        self.more = self.rc = self.dest = None
+        self.parts = []
+        self.__add__ = self.parts.append
+
+    def __bytes__(self, *args, **kwargs):
+        if self.dest is not None:
+            return b('').join(tobytes(
+                string_at(byref(i), sizeof(i)), 'latin-1') for i in self.parts)
+        return b('')
+
+    def __unicode__(self, *args, **kwargs):
+        return tounicode(tobytes(self, 'utf-8'))
+
+    __str__ = __unicode__ if PY3 else __bytes__
+
+    @property
+    def dest(self):
+        return self.parts[-1]
+
+    @property
+    def array(self):
+        return bytearray(self.__bytes__())
+
+
+class Message(BaseMsg):
+
+    __slots__ = 'source dest last_rc more'.split()
+
+    def __init__(self, size, source=None):
+        super(Message, self).__init__(size, source)
+        if source is None:
+            self.dest = xs.array(c_ubyte, size)
+
+    def __len__(self):
+        return len(self.source) if self.dest is None else sizeof(self.dest)
+
+    @property
+    def ref(self):
+        return None if self.dest is None else byref(self.dest)
+
+
+class Multipart(BaseMultipart):
+
+    def __init__(self, size, source=None):
+        super(Multipart, self).__init__(size, source)
+
+    def __len__(self):
+        return len(self.source) if self.dest is None else sizeof(self.dest)
+
+    @property
+    def ref(self):
+        return None if self.dest is None else byref(self.dest)
+
+
+class XSMessage(BaseMsg):
+
+    __slots__ = 'source dest last_rc more ref'.split()
+
+    def __init__(self, data=None):
+        super(XSMessage, self).__init__(32, data)
+        self.dest = self.ref = dest = xs.msg_t()
+        if data is None:
+            self.last_rc = xs.msg_init(dest)
+        else:
+            self.last_rc = xs.msg_init_size(dest, 32)
+            xs.msg_init_size(dest, 32)
+            xs.memmove(xs.msg_data(dest), data, 32)
+
+    def __len__(self):
+        return sizeof(self.dest)
+
+    def close(self):
+        self.last_rc = xs.msg_close(self.dest)
+
+
+class XSMultipart(BaseMultipart):
+
+    def __init__(self, size, source=None):
+        super(XSMultipart, self).__init__(size, source)
+
+    def __len__(self):
+        return sizeof(self.dest)
+
+    def close(self):
+        self.last_rc = sum(xs.msg_close(i) for i in self.parts)

tests/test_low_level.py

 # -*- coding: utf-8 -*-
-'''Low level tests for ctypes binding to Crossroads.IO library.'''
+'''Low level tests for ctypes binding to Crossroads.IO.'''
 
 from stuf.six import unittest
 from stuf.utils import lrange
         # First, create an intermediate device.
         xpub = self.ctx.xpub()
         xsub = self.ctx.xsub()
-        with xpub.bind_tcp('127.0.0.1:5560'), xsub.bind_tcp('127.0.0.1:5561'):
-            # Create a publisher.
-            pub = self.ctx.pub()
-            sub = self.ctx.sub()
-            with pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
-                # Create a subscriber and subscribe for all messages
-                self.assertEqual(sub.set(subscribe='').last_rc, 0)
-                # Pass the subscription upstream through the device.
-                buf = xpub.recv(32)
-                self.assertTrue(xpub.last_rc >= 0)
-                self.assertTrue(xsub.send(bytes(buf), 32).last_rc >= 0)
-                # Wait a bit till the subscription gets to the publisher.
-                sleep(1)
-                # Send an empty message.
-                self.assertEqual(pub.send(None, 0).last_rc, 0)
-                # Pass the message downstream through the device.
-                buf = xsub.recv(pub.last_rc, nowait=True)
-                self.assertTrue(xsub.last_rc >= 0)
-                self.assertTrue(xpub.send(bytes(buf)).last_rc >= 0)
-                # Receive the message in the subscriber.
-                sub.recv(xpub.last_rc)
-                self.assertEqual(sub.last_rc, 0)
+        # Create a publisher.
+        pub = self.ctx.pub()
+        sub = self.ctx.sub()
+        with xpub.bind_tcp('127.0.0.1:5560'), xsub.bind_tcp('127.0.0.1:5561'), \
+                pub.connect_tcp('127.0.0.1:5561'), sub.connect_tcp('127.0.0.1:5560'):
+            # Create a subscriber and subscribe for all messages
+            self.assertEqual(sub.set(subscribe='').last_rc, 0)
+            # Pass the subscription upstream through the device.
+            buf = xpub.recv(32)
+            self.assertTrue(xpub.last_rc >= 0)
+            self.assertTrue(xsub.send(bytes(buf), 32).last_rc >= 0)
+            # Wait a bit till the subscription gets to the publisher.
+            sleep(1)
+            # Send an empty message.
+            self.assertEqual(pub.send(None, 0).last_rc, 0)
+            # Pass the message downstream through the device.
+            buf = xsub.recv(pub.last_rc, nowait=True)
+            self.assertTrue(xsub.last_rc >= 0)
+            self.assertTrue(xpub.send(bytes(buf)).last_rc >= 0)
+            # Receive the message in the subscriber.
+            sub.recv(xpub.last_rc)
+            self.assertEqual(sub.last_rc, 0)
 
 #    def test_resubscribe(self):
 #        from stuf.six import int2byte
 #        self.assertEqual(xs.close(xpub), 0)
 #        self.assertEqual(xs.term(ctx), 0)
 
-#    def test_regrep_device(self):
-#        from ctypes import byref, sizeof, c_int, c_size_t, c_ubyte
-#        from crossroads.lowest import array
-#        XS, xs = twoget(self)
-#        ctx = xs.init()
-#        self.assertTrue(ctx)
-#        # Create a req/rep device.
-#        xreq = xs.socket(ctx, XS.XREQ)
-#        self.assertTrue(xreq)
-#        self.assertNotEqual(xs.bind(xreq, b'tcp://127.0.0.1:5560'), -1)
-#        xrep = xs.socket(ctx, XS.XREP)
-#        self.assertTrue(xrep)
-#        self.assertNotEqual(xs.bind(xrep, b'tcp://127.0.0.1:5561'), -1)
-#        # Create a worker.
-#        rep = xs.socket(ctx, XS.REP)
-#        self.assertTrue(rep)
-#        self.assertNotEqual(xs.connect(rep, b'tcp://127.0.0.1:5560'), -1)
-#        # Create a client.
-#        req = xs.socket(ctx, XS.REQ)
-#        self.assertTrue(req)
-#        self.assertNotEqual(xs.connect(req, b'tcp://127.0.0.1:5561'), -1)
-#        # Send a request.
-#        self.assertEqual(xs.send(req, b'ABC', 3, XS.SNDMORE), 3)
-#        self.assertEqual(xs.send(req, b'DEF', 3, 0), 3)
-#        # Pass the request through the device.
-#        for i in xrange(4):
-#            msg = xs.msg_t()
-#            self.assertEqual(xs.msg_init(byref(msg)), 0)
-#            self.assertTrue(xs.recvmsg(xrep, byref(msg), 0) >= 0)
-#            rcvmore = c_int()
-#            sz = c_size_t(sizeof(rcvmore))
-#            self.assertEqual(
-#                xs.getsockopt(xrep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#            )
-#            self.assertTrue(
-#                xs.sendmsg(xreq, byref(msg), XS.SNDMORE if rcvmore else 0) >= 0
-#            )
-#        # Receive the request.
-#        buff = array(c_ubyte, 3)
-#        self.assertEqual(xs.recv(rep, buff, 3, 0), 3)
-#        self.assertEqual(bytearray(buff), b'ABC')
-#        rcvmore = c_int()
-#        sz = c_size_t(sizeof(rcvmore))
-#        self.assertEqual(
-#            xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#        )
-#        self.assertTrue(rcvmore)
-#        self.assertEqual(xs.recv(rep, buff, 3, 0), 3)
-#        self.assertEqual(bytearray(buff), b'DEF')
-#        self.assertEqual(
-#            xs.getsockopt(rep, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#        )
-#        self.assertFalse(rcvmore)
-#        # Send the reply.
-#        self.assertEqual(xs.send(rep, b'GHI', 3, XS.SNDMORE), 3)
-#        self.assertEqual(xs.send(rep, b'JKL', 3, 0), 3)
-#        # Pass the reply through the device.
-#        for i in xrange(4):
-#            msg = xs.msg_t()
-#            self.assertEqual(xs.msg_init(byref(msg)), 0)
-#            self.assertTrue(xs.recvmsg(xreq, byref(msg), 0) >= 0)
-#            self.assertEqual(
-#                xs.getsockopt(xreq, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#            )
-#            self.assertTrue(
-#                xs.sendmsg(xrep, byref(msg), XS.SNDMORE if rcvmore else 0) >= 0
-#            )
-#        # Receive the reply.
-#        self.assertEqual(xs.recv(req, buff, 3, 0), 3)
-#        self.assertEqual(bytearray(buff), b'GHI')
-#        self.assertEqual(
-#            xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#        )
-#        self.assertTrue(rcvmore)
-#        self.assertEqual(xs.recv(req, buff, 3, 0), 3)
-#        self.assertEqual(bytearray(buff), b'JKL')
-#        self.assertEqual(
-#            xs.getsockopt(req, XS.RCVMORE, byref(rcvmore), byref(sz)), 0
-#        )
-#        self.assertFalse(rcvmore)
-#        # Clean up.
-#        self.assertEqual(xs.close(req), 0)
-#        self.assertEqual(xs.close(rep), 0)
-#        self.assertEqual(xs.close(xrep), 0)
-#        self.assertEqual(xs.close(xreq), 0)
-#        self.assertEqual(xs.term(ctx), 0)
+    def test_regrep_device(self):
+        # Create a req/rep device.
+        xreq = self.ctx.xreq()
+        xrep = self.ctx.xrep()
+        # Create a worker.
+        rep = self.ctx.rep()
+        # Create a client.
+        req = self.ctx.req()
+        with xreq.bind_tcp('127.0.0.1:5560'), xrep.bind_tcp('127.0.0.1:5561'), \
+            rep.connect_tcp('127.0.0.1:5560'), req.connect_tcp('127.0.0.1:5561'):
+            # Send a request.
+            self.assertEqual(req.send(b'ABC', more=True).last_rc, 3)
+            self.assertEqual(req.send(b'DEF').last_rc, 3)
+            # Pass the request through the device.
+            for i in lrange(4):
+                msg = xrep.recvmsg()
+                self.assertTrue(msg.last_rc >= 0)
+                self.assertEqual(msg.more, 1)
+                self.assertTrue(
+                    xreq.sendmsg(msg.dest, more=True if msg.more else False).last_rc >= 0
+                )
+            # Receive the request.
+            buff = rep.recv(3)
+            self.assertEqual(buff.last_rc, 3)
+            self.assertEqual(bytes(buff), b'ABC')
+            self.assertEqual(buff.more, 0)
+            buff2 = rep.recv(3)
+            self.assertEqual(buff2.last_rc, 3)
+            self.assertEqual(bytearray(buff), b'DEF')
+            self.assertEqual(buff2.more, 0)
+            # Send the reply.
+            self.assertEqual(rep.send(b'GHI', more=True).last_rc, 3)
+            self.assertEqual(rep.send(b'JKL').last_rc, 3)
+            # Pass the reply through the device.
+            for i in lrange(4):
+                msg = xrep.recvmsg()
+                self.assertTrue(msg.last_rc >= 0)
+                self.assertEqual(msg.more, 0)
+                self.assertTrue(
+                    xreq.sendmsg(msg.dest, more=True if msg.more else False).last_rc >= 0
+                )
+            # Receive the reply.
+            buff = req.recv(3)
+            self.assertEqual(buff.last_rc, 3)
+            self.assertEqual(bytes(buff), b'GHI')
+            self.assertEqual(buff.more, 0)
+            buff2 = req.recv(3)
+            self.assertEqual(buff2.last_rc, 3)
+            self.assertEqual(bytes(buff), b'JKL')
+            self.assertEqual(buff2.more, 0)
+
 
     def test_invalid_rep(self):
         from stuf.six import int2byte