Commits

Lynn Rees  committed 7b504f0

- current low level tests pass

  • Participants
  • Parent commits 4355211

Comments (0)

Files changed (3)

File crossroads/low.py

 '''Low level API for ctypes binding to Crossroads.IO.'''
 
 from functools import partial
-from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_size_t
+from ctypes import byref, sizeof, c_int, c_int64, c_char, c_size_t
 
 from stuf.utils import unique_id
 from stuf.desc import Setter, lazy
-from stuf.six import OrderedDict, items, tobytes
+from stuf.six import OrderedDict, items, tobytes, isstring
 
 from . import lowest as xs
 from . import constants as XS
         elif opt_key in XS.INT64:
             opt_type = c_int64
         elif opt_key in XS.BINARY:
-            opt_type = c_char_p
+            opt_type = c_char
         else:
             raise AttributeError(
                 '{0} is not a valid socket attribute'.format(key)
 
     def _set(self, key, value):
         opt_key, opt_type = self._opt_type(key)
-        value = opt_type(value)
+        if not value  and isstring(value):
+            new_value = c_char()
+        else:
+            new_value = opt_type(value)
         self.last_rc = self._setsockopt(
             opt_key,
-            byref(value),
-            0 if opt_key == XS.SUBSCRIBE else sizeof(value),
+            byref(new_value),
+            len(value) if opt_key == XS.SUBSCRIBE else sizeof(new_value),
         )
         return self.last_rc
 
         )
         return msg
 
-    def recv(self, size, nowait=False):
-        msg = Message(size)
+    def recv(self, size, nowait=False, char=False):
+        msg = Message(size, char=char)
         msg.last_rc = self.last_rc = self._recv(
             msg.dest, msg.size, XS.DONTWAIT if nowait else 0,
         )

File crossroads/message.py

 # -*- coding: utf-8 -*-
 '''Crossroads.IO messages.'''
 
-from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t
+from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t, c_char
 
 from stuf.six import PY3, tobytes, tounicode, strings
 
 
     __str__ = __unicode__ if PY3 else __bytes__
 
+    @property
+    def array(self):
+        return bytearray(self.__bytes__())
+
 
 class Message(BaseMessage):
 
     __slots__ = 'src dest rc more size last_rc'.split()
 
-    def __init__(self, size, source=None):
+    def __init__(self, size, source=None, char=False):
         super(Message, self).__init__()
         self.src = source
         if source is None:
-            self.dest = xs.array(c_ubyte, size)
+            self.dest = xs.array(c_char if char else c_ubyte, size)
             self.size = size
         else:
             if isinstance(source, (getcls(self), strings)):
             self.rc = xs.msg_close(self.src)
         if self.dest is not None:
             self.rc = xs.msg_close(self.dest)
-#
-#    __del__ = close
-

File tests/test_low_level.py

             self.assertEqual(rep.send(b'JKL', 3).last_rc, 3)
             # pass the reply through the device
             for i in lrange(4):
-                msg = xreq.recvmsg(nowait=True)
+                msg = xreq.recvmsg()
                 self.assertTrue(msg.last_rc >= 0)
                 rc = xrep.sendmsg(msg, True if msg.more else False).last_rc
                 self.assertTrue(rc >= 0)
             self.assertEqual(bytes(buff2), b'JKL')
             self.assertEqual(buff2.more, 0)
 
-#    def test_resubscribe(self):
-#        from stuf.six import int2byte
-##        from time import sleep
-#        # Create the basic infrastructure.
-#        xpub = self.ctx.xpub()
-#        sub = self.ctx.sub()
-#        # Send two subscriptions upstream.
-#        self.assertEqual(sub.set(subscribe = b'a'), 0)
-#        self.assertEqual(sub.set(subscribe = b'b'), 0)
-#        with xpub.bind_tcp('127.0.0.1:5560'), sub.connect_tcp('127.0.0.1:5560'):
-#            # Check whether subscriptions are correctly received.
-#            buf = xpub.recv(5)
-#            self.assertEqual(buf.last_rc, 5)
-#            self.assertEqual(buf.dest[0], 0)
-#            self.assertEqual(buf.dest[1], 1)
-#            self.assertEqual(buf.dest[2], 0)
-#            self.assertEqual(buf.dest[3], 1)
-#            self.assertEqual(int2byte(buf.dest[4]), b'a')
-#            buf = xpub.recv(5)
-#            self.assertEqual(buf.last_rc, 5)
-#            self.assertEqual(buf.dest[0], 0)
-#            self.assertEqual(buf.dest[1], 1)
-#            self.assertEqual(buf.dest[2], 0)
-#            self.assertEqual(buf.dest[3], 1)
-#            self.assertEqual(int2byte(buf.dest[4]), b'b')
-#                # Tear down the connection.
-#                self.assertEqual(xpub.close(), 0)
-#                sleep(1)
-#                # Re-establish the connection.
-#                xpub = self.ctx.xpub()
-#                self.assertNotEqual(xpub.bind_tcp('127.0.0.1:5560'), -1)
-#                # We have to give control to the SUB socket here so that it has
-#                # chance to resend the subscriptions.
-#                try:
-#                    sub.recv(3, self.XS.DONTWAIT)
-#                except self.xs.XSError as e:
-#                    self.assertEqual(e.errno, self.XS.EAGAIN)
-#                # Check whether subscriptions are correctly generated.
-#                buf = xpub.recv(5)
-#                self.assertEqual(buf.last_rc, 5)
-#                self.assertEqual(buf.dest[0], 0)
-#                self.assertEqual(buf.dest[1], 1)
-#                self.assertEqual(buf.dest[2], 0)
-#                self.assertEqual(buf.dest[3], 1)
-#                self.assertEqual(int2byte(buf.dest[4]), b'a')
-#                buf = xpub.recv(5)
-#                self.assertEqual(buf.last_rc, 5)
-#                self.assertEqual(buf.dest[0], 0)
-#                self.assertEqual(buf.dest[1], 1)
-#                self.assertEqual(buf.dest[2], 0)
-#                self.assertEqual(buf.dest[3], 1)
-#                self.assertEqual(int2byte(buf.dest[4]), b'b')
+    def test_resubscribe(self):
+        try:
+            from stuf.six import int2byte
+            from time import sleep
+            # Create the basic infrastructure.
+            xpub = self.ctx.xpub()
+            sub = self.ctx.sub()
+            xpub.bind_tcp('127.0.0.1:5560')
+            # Send two subscriptions upstream.
+            sub.subscribe = b'a'
+            self.assertEqual(sub.last_rc, 0)
+            sub.subscribe = b'b'
+            self.assertEqual(sub.last_rc, 0)
+            sub.connect_tcp('127.0.0.1:5560')
+            self.assertTrue(sub.last_rc >= -1)
+            # Check whether subscriptions are correctly received.
+            buf = xpub.recv(5, char=True).array
+            self.assertEqual(xpub.last_rc, 5)
+            self.assertEqual(buf[0], 0)
+            self.assertEqual(buf[1], 1)
+            self.assertEqual(buf[2], 0)
+            self.assertEqual(buf[3], 1)
+            self.assertEqual(int2byte(buf[4]), b'a')
+            buf = xpub.recv(5, char=True).array
+            self.assertEqual(xpub.last_rc, 5)
+            self.assertEqual(buf[0], 0)
+            self.assertEqual(buf[1], 1)
+            self.assertEqual(buf[2], 0)
+            self.assertEqual(buf[3], 1)
+            self.assertEqual(int2byte(buf[4]), b'b')
+            # Tear down the connection.
+            self.assertEqual(xpub.close(), 0)
+            sleep(1)
+            # Re-establish the connection.
+            xpub = self.ctx.xpub()
+            self.assertNotEqual(xpub.bind_tcp('127.0.0.1:5560'), -1)
+            # We have to give control to the SUB socket here so that it has
+            # chance to resend the subscriptions.
+            try:
+                sub.recv(3, nowait=True)
+            except self.xs.XSError as e:
+                self.assertEqual(e.errno, self.XS.EAGAIN)
+            # Check whether subscriptions are correctly generated.
+            buf = xpub.recv(5, char=True).array
+            self.assertEqual(xpub.last_rc, 5)
+            self.assertEqual(buf[0], 0)
+            self.assertEqual(buf[1], 1)
+            self.assertEqual(buf[2], 0)
+            self.assertEqual(buf[3], 1)
+            self.assertEqual(int2byte(buf[4]), b'a')
+            buf = xpub.recv(5, char=True).array
+            self.assertEqual(xpub.last_rc, 5)
+            self.assertEqual(buf[0], 0)
+            self.assertEqual(buf[1], 1)
+            self.assertEqual(buf[2], 0)
+            self.assertEqual(buf[3], 1)
+            self.assertEqual(int2byte(buf[4]), b'b')
+
+        finally:
+            xpub.close()
+            sub.close()