Source

crossroads / crossroads / low.py

Full commit
Lynn Rees f143187 
Lynn Rees c33c151 
Lynn Rees f143187 
Lynn Rees 80d846f 
Lynn Rees f143187 
Lynn Rees 7b504f0 
Lynn Rees f143187 
Lynn Rees 783cb3d 
Lynn Rees 80d846f 
Lynn Rees f143187 


Lynn Rees d7a6d4e 
Lynn Rees 01cd663 


Lynn Rees 80d846f 
Lynn Rees f143187 
Lynn Rees 01cd663 
Lynn Rees 3e6c695 
Lynn Rees ba82b0b 
Lynn Rees 13fd5a0 
Lynn Rees 01cd663 
Lynn Rees 13fd5a0 
Lynn Rees 3e6c695 

Lynn Rees 01cd663 
Lynn Rees 3e6c695 
Lynn Rees 01cd663 




Lynn Rees 3e6c695 
Lynn Rees 01cd663 




Lynn Rees a453039 
Lynn Rees f143187 
Lynn Rees ba82b0b 




Lynn Rees 4355211 

Lynn Rees cf91d0c 
Lynn Rees 4355211 
Lynn Rees ba82b0b 


Lynn Rees 3e6c695 








Lynn Rees 01cd663 


Lynn Rees 783cb3d 
Lynn Rees 01cd663 



Lynn Rees 4355211 

Lynn Rees f143187 

Lynn Rees 13fd5a0 







Lynn Rees f143187 

Lynn Rees 3e6c695 
Lynn Rees f143187 
Lynn Rees 4355211 
Lynn Rees 13fd5a0 
Lynn Rees ba82b0b 
Lynn Rees 4355211 
Lynn Rees ba82b0b 

Lynn Rees 90ac222 




Lynn Rees a453039 
Lynn Rees 01cd663 
Lynn Rees a453039 

Lynn Rees f143187 
Lynn Rees 3e6c695 


Lynn Rees f143187 




Lynn Rees 3e6c695 








Lynn Rees 783cb3d 
Lynn Rees 90ac222 

Lynn Rees 783cb3d 
Lynn Rees 3e6c695 
Lynn Rees f143187 
Lynn Rees 3e6c695 




Lynn Rees a453039 
Lynn Rees 4355211 


Lynn Rees cf91d0c 






Lynn Rees 7b504f0 
Lynn Rees 4355211 



Lynn Rees cf91d0c 

Lynn Rees 3e6c695 
Lynn Rees cf91d0c 


Lynn Rees 3e6c695 


Lynn Rees cf91d0c 
Lynn Rees 7b504f0 



Lynn Rees 3e6c695 
Lynn Rees cf91d0c 
Lynn Rees 7b504f0 

Lynn Rees 3e6c695 


Lynn Rees 90ac222 
Lynn Rees 01cd663 
Lynn Rees a453039 
Lynn Rees cf91d0c 
Lynn Rees 01cd663 
Lynn Rees f143187 
Lynn Rees 3e6c695 
Lynn Rees 01cd663 
Lynn Rees a453039 
Lynn Rees cf91d0c 
Lynn Rees 01cd663 
Lynn Rees 13fd5a0 
Lynn Rees 3e6c695 

Lynn Rees cf91d0c 
Lynn Rees 3e6c695 
Lynn Rees a453039 
Lynn Rees d7a6d4e 
Lynn Rees a453039 
Lynn Rees d7a6d4e 

Lynn Rees a453039 
Lynn Rees 13fd5a0 
Lynn Rees a453039 
Lynn Rees ba82b0b 
Lynn Rees cf91d0c 
Lynn Rees d7a6d4e 
Lynn Rees cf91d0c 
Lynn Rees d7a6d4e 
Lynn Rees cf91d0c 


Lynn Rees 7b504f0 
Lynn Rees d7a6d4e 
Lynn Rees a453039 
Lynn Rees d7a6d4e 
Lynn Rees 13fd5a0 
Lynn Rees 3e6c695 
Lynn Rees a453039 
Lynn Rees ba82b0b 
Lynn Rees 01cd663 
Lynn Rees d7a6d4e 
Lynn Rees 4355211 
Lynn Rees d7a6d4e 
Lynn Rees 4355211 

Lynn Rees 01cd663 
Lynn Rees f143187 

Lynn Rees 13fd5a0 

Lynn Rees f143187 

Lynn Rees 3e6c695 
Lynn Rees a453039 
Lynn Rees 01cd663 
Lynn Rees 13fd5a0 
Lynn Rees 01cd663 
Lynn Rees 13fd5a0 
Lynn Rees 90ac222 



# -*- coding: utf-8 -*-
'''Low level API for ctypes binding to Crossroads.IO.'''

from uuid import uuid4
from functools import partial
from ctypes import byref, sizeof, c_int, c_int64, c_char, c_size_t

from stuf.desc import Setter, lazy
from stuf.six import OrderedDict, items, tobytes, isstring, b

from . import lowest as xs
from . import constants as XS
from .message import RecvMsg, RecvMsg32, SendMsg, SendMsg32

bigxsget = partial(getattr, XS)
xsget = partial(getattr, xs)
unique_id = lambda: b(uuid4().hex.upper())


class Context(Setter):

    def __init__(
        self, threads=XS._IO_THREADS, max_sockets=XS._MAX_SOCKETS, **options
    ):
        self._options = options
        super(Context, self).__init__()
        self._ctx = xs.init()
        self._set(
            XS._IO_THREADS,
            XS.IO_THREADS,
            threads,
            'must use 1 or more I/O threads',
        )
        self._set(
            XS._MAX_SOCKETS,
            XS.MAX_SOCKETS,
            max_sockets,
            'must use 1 or more sockets',
        )
        self.sockets = OrderedDict()

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                newkey = key.upper()
                this = bigxsget(newkey)
                if this in XS.TYPES:
                    return self._setter(key.lower(), self._open, this, newkey)
            except AttributeError:
                raise AttributeError(key)

    def __setattr__(self, key, value, setr=object.__setattr__):
        try:
            this = bigxsget(key.upper())
            self._options[this] = value
            setr(self, key.lower(), value)
        except AttributeError:
            setr(self, key, value)

    def _set(self, default, key, value, msg):
        if value != default:
            if value > 0:
                value = c_int(value)
                xs.setctxopt(self._ctx, key, value, sizeof(value))
                del value
            else:
                raise xs.XSOperationError(msg)

    def _open(self, stype, key, **options):
        return Socket(self, stype, key, **options)

    def close(self):
        rc = xs.term(self._ctx)
        del self.sockets
        self._ctx = None
        if rc:
            raise xs.XSOperationError('socket failed to close')
        return rc

    __del__ = close


class Socket(Setter):

    def __init__(self, context, stype, newkey, **options):
        self.context = context
        self._type = stype
        self._type_name = newkey
        self._socket = xs.socket(context._ctx, stype)
        options.update(context._options)
        _set = self._set
        rc = 0
        for k, v in items(options):
            rc |= _set(k, v)
        self.last_rc = rc
        context.sockets[self.id] = self
        self.closed = False
        self.connections = OrderedDict()
        self.bindings = OrderedDict()

    def __enter__(self):
        return self

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                return self._setter(
                    key, xsget(key.lower().strip('_')), self._socket
                )
            except AttributeError:
                try:
                    return self._get(key)
                except AttributeError:
                    if key.startswith(('bind', 'connect')):
                        method, prefix = key.split('_')
                        return self._setter(
                            key, getattr(
                                self, '_s' + method), XS.PROTO[prefix],
                        )
                    raise AttributeError(key)

    def __setattr__(self, key, value, setr=object.__setattr__):
        try:
            self._set(key, value)
        except AttributeError:
            setr(self, key, value)

    def __repr__(self):
        return '{self._type_name}@{self._socket}'.format(self=self)

    def _opt_type(self, key):
        opt_key = bigxsget(key.upper())
        if opt_key in XS.INT:
            opt_type = c_int
        elif opt_key in XS.INT64:
            opt_type = c_int64
        elif opt_key in XS.BINARY:
            opt_type = c_char
        else:
            raise AttributeError(
                '{0} is not a valid socket attribute'.format(key)
            )
        return opt_key, opt_type

    def _get(self, key):
        opt_key, opt_type = self._opt_type(key)
        value = opt_type()
        self._getsockopt(opt_key, value, c_size_t(sizeof(value)))
        return value.value

    def _set(self, key, value):
        opt_key, opt_type = self._opt_type(key)
        if not value  and isstring(value):
            new_value = c_char()
        else:
            new_value = opt_type(value)
        self.last_rc = self._setsockopt(
            opt_key,
            byref(new_value),
            len(value) if opt_key == XS.SUBSCRIBE else sizeof(new_value),
        )
        return self.last_rc

    def _sbind(self, prefix, *addresses):
        for address in addresses:
            self.last_rc = self._bind(prefix + tobytes(address))
            self.bindings[unique_id()] = self.last_rc
        return self

    def _sconnect(self, prefix, *addresses):
        for address in addresses:
            self.last_rc = self._connect(prefix + tobytes(address))
            self.connections[unique_id()] = self.last_rc
        return self

    @lazy
    def id(self):
        return unique_id()

    def send(self, data, size=None, more=False, nowait=False):
        msg = SendMsg(size, data)
        msg.last_rc = self.last_rc = self._send(
            msg.msg,
            msg.size,
            XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def sendmsg(self, data, more=False, nowait=False):
        msg = SendMsg32(data)
        msg.last_rc = self.last_rc = self._sendmsg(
            msg.msg, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def recv(self, size, nowait=False, char=False):
        msg = RecvMsg(size, char)
        msg.last_rc = self.last_rc = self._recv(
            msg.msg, msg.size, XS.DONTWAIT if nowait else 0,
        )
        msg.more = self.rcvmore
        return msg

    def recvmsg(self, nowait=False):
        msg = RecvMsg32()
        msg.last_rc = self.last_rc = self._recvmsg(
            msg.msg, XS.DONTWAIT if nowait else 0,
        )
        msg.more = self.rcvmore
        return msg

    def shutdown(self, sid):
        self.last_rc = self._shutdown(c_int(sid))
        return self

    def close(self):
        rc = xs.close(self._socket)
        del self.context.sockets[self.id]
        self._socket = self.context = self.last_rc = None
        if not rc:
            self.closed = True
        return rc

    def __exit__(self, e, c, b):
        self.close()
        if any((e is not None, c is not None, b is not None)):
            return False