Source

crossroads / crossroads / low.py

Full commit
# -*- coding: utf-8 -*-
'''Low level API for ctypes binding to Crossroads.IO.'''

from uuid import uuid4
from functools import partial
from operator import methodcaller
from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_size_t

from stuf.desc import Setter, lazy
from stuf.six import OrderedDict, items, tobytes, b

from . import lowest as xs
from . import constants as XS
from .message import XSMessage, Message

PREFIX = dict(inproc=b('inproc://'), ipc=b('ipc://'), tcp=b('tcp://'))
split = methodcaller('rsplit', '_')
bigxsget = partial(getattr, XS)
xsget = partial(getattr, xs)


class Options(Setter):

    def __init__(self, **options):
        self._options = options

    def __setattr__(self, key, value, setr=object.__setattr__):
        try:
            this = bigxsget(key.upper())
            self._options[this] = value
            setr(self, key.lower(), value)
        except AttributeError:
            setr(self, key, value)


class Context(Options):

    def __init__(
        self, threads=XS._IO_THREADS, max_sockets=XS._MAX_SOCKETS, **options
    ):
        super(Context, self).__init__(**options)
        self._ctx = xs.init()
        self.set(
            XS._IO_THREADS,
            XS.IO_THREADS,
            threads,
            'must use 1 or more I/O threads',
        )
        self.set(
            XS._MAX_SOCKETS,
            XS.MAX_SOCKETS,
            max_sockets,
            'must use 1 or more sockets',
        )
        self.sockets = OrderedDict()

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                this = bigxsget(key.upper())
                if this in XS.SOCKET_TYPES:
                    return self._setter(key.lower(), self.open, this)
            except AttributeError:
                raise AttributeError(key)

    def set(self, default, key, value, msg):
        if value != default:
            if value > 0:
                value = c_int(value)
                xs.setctxopt(self._ctx, key, value, sizeof(value))
                del value
            else:
                raise xs.XSOperationError(msg)

    def open(self, stype, **options):
        return Socket(self, stype, **options)

    def close(self):
        rc = xs.term(self._ctx)
        del self.sockets
        self._ctx = None
        if rc:
            raise xs.XSOperationError('socket failed to close')
        return rc

    __del__ = close


class Socket(Options):

    def __init__(self, context, stype, **options):
        self.context = context
        self._type = stype
        self._socket = xs.socket(context._ctx, stype)
        options.update(context._options)
        self.set(**options)
        context.sockets[self.id] = self
        self.last_rc = None
        self.closed = False
        self.connections = OrderedDict()
        self.bindings = OrderedDict()

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                if not key.startswith('__'):
                    if key.startswith('_'):
                        return self._setter(
                            key, xsget(key.lower().strip('_')), self._socket
                        )
                    elif key.startswith('bind'):
                        return self._setter(
                            key, self._ubind, PREFIX[split(key)[-1]]
                        )
                    elif key.startswith('connect'):
                        return self._setter(
                            key, self._uconnect, PREFIX[split(key)[-1]]
                        )
            except AttributeError:
                raise AttributeError(key)

    @lazy
    def id(self):
        return uuid4().get_hex().upper()

    def set(self, **options):
        INT, INT64, BINARY = XS.INT_OPTS, XS.INT64_OPTS, XS.BINARY_OPTS
        setsocket, getr = self._setsockopt, getattr
        rc = 0
        for k, v in items(options):
            const = getr(XS, k.upper())
            if const in INT:
                value = c_int(v)
            elif const in INT64:
                value = c_int64(v)
            elif const in BINARY:
                value = c_char_p(v)
            rc |= setsocket(const, byref(value), sizeof(value))
            del value
        self.last_rc = rc
        return self

    def __enter__(self):
        return self

    def _ubind(self, prefix , *addresses):
        for address in addresses:
            # coerce to bytes
            self.last_rc = self._bind(prefix + tobytes(address))
            self.bindings[uuid4().get_hex().upper()] = self.last_rc
        return self

    def _uconnect(self, prefix, *addresses):
        for address in addresses:
            # coerce to bytes
            self.last_rc = self._connect(prefix + tobytes(address))
            self.connections[uuid4().get_hex().upper()] = self.last_rc
        return self

    def send(self, data, size=None, more=False, nowait=False):
        msg = Message(size, data)
        msg.last_rc = self.last_rc = self._send(
            msg.source,
            len(msg),
            XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def recv(self, size, nowait=False):
        # staging
        msg = Message(size)
        msg.last_rc = self.last_rc = self._recv(
            msg.dest, len(msg), XS.DONTWAIT if nowait else 0
        )
        more = c_int()
        self._getsockopt(XS.RCVMORE, more, c_size_t(sizeof(more)))
        msg.more = more.value
        return msg

    def sendmsg(self, data, more=False, nowait=False):
        msg = XSMessage(data)
        msg.last_rc = self.last_rc = self._sendmsg(
            msg.dest, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def recvmsg(self, nowait=False):
        msg = XSMessage()
        try:
            msg.last_rc = self.last_rc = self._recvmsg(
                msg.dest, XS.DONTWAIT if nowait else 0
            )
            more = c_int()
            xs.getmsgopt(msg.dest, XS.MORE, more, c_size_t(sizeof(more)))
            msg.more = more.value
        finally:
            msg.close()
        return msg

    def shutdown(self, sid):
        self.last_rc = self._shutdown(c_int(sid))
        return self

    def close(self):
        rc = self._close()
        del self.context.sockets[self.id]
        self._socket = self.context = self.last_rc = None
        if not rc:
            self.closed = True
        return rc

    def __exit__(self, e, c, b):
        self.close()
        if any((e is not None, c is not None, b is not None)):
            return False