Lynn Rees committed 04c59a1

- tweak casting

Comments (0)

Files changed (6)


 # -*- coding: utf-8 -*-
 '''Low level API for ctypes binding to Crossroads.IO.'''
-from uuid import uuid4
 from functools import partial
 from ctypes import byref, sizeof, c_int, c_int64, c_char, c_size_t
+from stuf.utils import unique_id
 from stuf.desc import Setter, lazy
-from stuf.six import OrderedDict, items, tobytes, isstring, b
+from stuf.six import OrderedDict, items, tobytes, isstring
 from . import lowest as xs
 from . import constants as XS
 bigxsget = partial(getattr, XS)
 xsget = partial(getattr, xs)
-unique_id = lambda: b(uuid4().hex.upper())
 class Context(Setter):


 from ctypes import sizeof, string_at, c_ubyte, byref, c_size_t, c_char, c_int
+from stuf.base import norm
 from stuf.six import PY3, tobytes, tounicode, u, b, isunicode
 from . import lowest as xs
 SLOTS = 'msg rc more size last_rc'.split()
 class Msg(object):
     def __init__(self):
     def __bytes__(self):
         if self.msg is None:
-            return b''
+            return b('')
         return tobytes(string_at(byref(self.msg), self.size))
     def __unicode__(self):
-        if self.msg is None:
-            return u('')
-        return tounicode(self.array)
+        return u('') if self.msg is None else tounicode(norm(self.__bytes__()))
     def __len__(self):
         return self.size






     def test_pair_ipc(self):
         sb = self.ctx.pair()
         sc = self.ctx.pair()
-        with sb.bind_ipc(b'/tmp/tester'), sc.connect_ipc(b'/tmp/tester'):
+        with sb.bind_ipc('/tmp/tester'), sc.connect_ipc('/tmp/tester'):
             self.bounce(sb, sc)
     def test_pair_inproc(self):
         from time import sleep
         sb = self.ctx.pair()
         sc = self.ctx.pair()
-        with sb.bind_tcp(b''), sc.connect_tcp(b''):
+        with sb.bind_tcp(''), sc.connect_tcp(''):
             self.bounce(sb, sc)
             # Now let's try to open one more connection to the bound socket. The
             # connection should be silently rejected rather than causing error.
             msg = sb.recvmsg()
             self.assertTrue(msg.last_rc >= 0)
             self.assertEqual(msg.moremsg, 0)
     def test_wireformat(self):
         import socket
         # create the basic infrastructure
                 with pull.bind_ipc('/tmp/tester'):
                     self.assertEqual(pull.last_rc, 3)
     def test_hwn(self):
         # Create pair of socket, each with high watermark of 2. Thus the
         # total buffer space should be 4 messages.
             # Get the response and check it's the response to the new survey and
             # that response to the old survey was silently discarded.
             self.assertEqual(surveyor.recv(2).last_rc, 2)
     def test_invalid_rep(self):
         from stuf.six import int2byte
         # create REQ/XREP wiring
             self.assertEqual(buff2.last_rc, 3)
             self.assertEqual(bytes(buff2), b'JKL')
             self.assertEqual(buff2.more, 0)
     def test_resubscribe(self):
             from stuf.six import int2byte
-envlist = py27,py32,py33
+envlist = py27,py32,py33,pypy,py31