Commits

Lynn Rees  committed d7a6d4e

- make msg functionality more explicit, less implicit

  • Participants
  • Parent commits 7b504f0

Comments (0)

Files changed (3)

File crossroads/low.py

 
 from . import lowest as xs
 from . import constants as XS
-from .message import MessageXS, Message
+from .message import RecvMsg, RecvMsg32, SendMsg, SendMsg32
 
 bigxsget = partial(getattr, XS)
 xsget = partial(getattr, xs)
         return unique_id()
 
     def send(self, data, size=None, more=False, nowait=False):
-        msg = Message(size, data)
+        msg = SendMsg(size, data)
         msg.last_rc = self.last_rc = self._send(
-            msg.src,
-            len(msg),
+            msg.msg,
+            msg.size,
             XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
         )
         return msg
 
     def sendmsg(self, data, more=False, nowait=False):
-        msg = MessageXS(data)
-        src = msg.src
+        msg = SendMsg32(data)
         msg.last_rc = self.last_rc = self._sendmsg(
-            src, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
+            msg.msg, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
         )
         return msg
 
     def recv(self, size, nowait=False, char=False):
-        msg = Message(size, char=char)
+        msg = RecvMsg(size, char)
         msg.last_rc = self.last_rc = self._recv(
-            msg.dest, msg.size, XS.DONTWAIT if nowait else 0,
+            msg.msg, msg.size, XS.DONTWAIT if nowait else 0,
         )
         msg.more = self.rcvmore
         return msg
 
     def recvmsg(self, nowait=False):
-        msg = MessageXS()
+        msg = RecvMsg32()
         msg.last_rc = self.last_rc = self._recvmsg(
-            msg.dest, XS.DONTWAIT if nowait else 0,
+            msg.msg, XS.DONTWAIT if nowait else 0,
         )
         msg.more = self.rcvmore
         return msg

File crossroads/message.py

 # -*- coding: utf-8 -*-
 '''Crossroads.IO messages.'''
 
-from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t, c_char
+from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t, c_char, c_int
 
 from stuf.six import PY3, tobytes, tounicode, strings
 
+from . import lowest as xs
 from .constants import MORE
-from . import lowest as xs
-from stuf.deep import getcls
-from test.test_multiprocessing import c_int
 
+SLOTS = 'msg rc more size last_rc'.split()
 
-class BaseMessage(object):
+class Msg(object):
 
     def __init__(self):
-        self.more = self.rc = self.dest = self.last_rc = None
+        self.more = self.rc = self.msg = self.last_rc = None
 
     def __bytes__(self):
-        if self.dest is None:
+        if self.msg is None:
             return b''
-        return tobytes(string_at(byref(self.dest), self.size))
+        return tobytes(string_at(byref(self.msg), self.size))
 
     def __unicode__(self):
         return tounicode(self)
 
+    def __len__(self):
+        return self.size
+
     __str__ = __unicode__ if PY3 else __bytes__
 
     @property
         return bytearray(self.__bytes__())
 
 
-class Message(BaseMessage):
+class RecvMsg(Msg):
 
-    __slots__ = 'src dest rc more size last_rc'.split()
+    __slots__ = SLOTS
 
-    def __init__(self, size, source=None, char=False):
-        super(Message, self).__init__()
-        self.src = source
-        if source is None:
-            self.dest = xs.array(c_char if char else c_ubyte, size)
-            self.size = size
+    def __init__(self, size, char=False):
+        super(RecvMsg, self).__init__()
+        self.msg = xs.array(c_char if char else c_ubyte, size)
+        self.size = size
+
+
+class SendMsg(Msg):
+
+    __slots__ = SLOTS
+
+    def __init__(self, size, source):
+        super(SendMsg, self).__init__()
+        if isinstance(source, (Msg, strings)):
+            self.msg = source = bytes(source)
         else:
-            if isinstance(source, (getcls(self), strings)):
-                self.src = source = bytes(source)
-            self.size = len(source) if size is None else size
-            self.dest = None
+            self.msg = source
+        self.size = len(source) if size is None else size
 
-    def __len__(self):
-        return self.size
 
+class MsgXS(Msg):
 
-class MessageXS(BaseMessage):
+    def close(self):
+        self.rc = xs.msg_close(self.msg)
 
-    __slots__ = 'src dest rc more last_rc'.split()
 
-    def __init__(self, source=None):
-        super(MessageXS, self).__init__()
-        if source is None:
-            self.dest = xs.msg_t()
-            self.src = None
-            self.rc = xs.msg_init(self.dest)
-            self.size = xs.msg_size(self.dest)
-        elif isinstance(source, getcls(self)):
-            self.src = source.dest
-            self.dest = None
-            self.size = xs.msg_size(self.src)
-        else:
-            self.src = xs.msg_t()
-            source = bytes(source)
-            length = self.size = len(source)
-            self.rc = xs.msg_init_size(self.src, length)
-            xs.memmove(xs.msg_data(self.src), source, length)
+class RecvMsg32(MsgXS):
 
-    def __len__(self):
-        return self.size
+    __slots__ = SLOTS
+
+    def __init__(self):
+        super(RecvMsg32, self).__init__()
+        self.msg = msg = xs.msg_t()
+        self.rc = xs.msg_init(msg)
 
     @property
     def moremsg(self):
         more = c_int()
         more_size = c_size_t(sizeof(more))
-        self.rc = xs.getmsgopt(self.dest, MORE, more, more_size)
+        self.rc = xs.getmsgopt(self.msg, MORE, more, more_size)
         value = more.value
         del more_size, more
         return value
 
-    def close(self):
-        if self.src is not None:
-            self.rc = xs.msg_close(self.src)
-        if self.dest is not None:
-            self.rc = xs.msg_close(self.dest)
+
+class SendMsg32(MsgXS):
+
+    __slots__ = SLOTS
+
+    def __init__(self, source):
+        super(SendMsg32, self).__init__()
+        if isinstance(source, Msg):
+            self.msg = source.msg
+        else:
+            self.msg = xs.msg_t()
+            source = bytes(source)
+            length = len(source)
+            self.rc = xs.msg_init_size(self.msg, length)
+            xs.memmove(xs.msg_data(self.msg), source, length)

File tests/test_low_level.py

             # Get the response and check it's the response to the new survey and
             # that response to the old survey was silently discarded.
             self.assertEqual(surveyor.recv(2).last_rc, 2)
-#
+
     def test_invalid_rep(self):
         from stuf.six import int2byte
         # create REQ/XREP wiring
             # check whether we've got the valid reply
             body = req_socket.recv(1, nowait=True)
             self.assertEqual(body.last_rc, 1)
-            self.assertEqual(int2byte(body.dest[0]), b'b')
+            self.assertEqual(int2byte(body.msg[0]), b'b')
 
     def test_sub_forward(self):
         from time import sleep
             # pass the subscription upstream through the device
             buf = xpub.recv(32)
             self.assertEqual(xpub.last_rc, 4)
-            self.assertEqual(xsub.send(buf.dest, 4).last_rc, 4)
+            self.assertEqual(xsub.send(buf.msg, 4).last_rc, 4)
             # wait a bit till the subscription gets to the publisher
             sleep(1)
             # send an empty message
             # pass the message downstream through the device
             buf = xsub.recv(32)
             self.assertTrue(xsub.last_rc >= 0)
-            self.assertTrue(xpub.send(buf.dest, buf.last_rc).last_rc >= 0)
+            self.assertTrue(xpub.send(buf.msg, buf.last_rc).last_rc >= 0)
             # receive the message in the subscriber
             sub.recv(xpub.last_rc)
             self.assertEqual(sub.last_rc, 0)
             self.assertEqual(buf[2], 0)
             self.assertEqual(buf[3], 1)
             self.assertEqual(int2byte(buf[4]), b'b')
-
         finally:
             xpub.close()
             sub.close()