crossroads / crossroads / low.py

# -*- coding: utf-8 -*-
'''Low level API for ctypes binding to Crossroads.IO.'''

from functools import partial
from ctypes import byref, sizeof, c_int, c_int64, c_char_p, c_size_t

from stuf.utils import unique_id
from stuf.desc import Setter, lazy
from stuf.six import OrderedDict, items, tobytes

from . import lowest as xs
from . import constants as XS
from .message import MessageXS, Message

bigxsget = partial(getattr, XS)
xsget = partial(getattr, xs)


class Context(Setter):

    def __init__(
        self, threads=XS._IO_THREADS, max_sockets=XS._MAX_SOCKETS, **options
    ):
        self._options = options
        super(Context, self).__init__()
        self._ctx = xs.init()
        self._set(
            XS._IO_THREADS,
            XS.IO_THREADS,
            threads,
            'must use 1 or more I/O threads',
        )
        self._set(
            XS._MAX_SOCKETS,
            XS.MAX_SOCKETS,
            max_sockets,
            'must use 1 or more sockets',
        )
        self.sockets = OrderedDict()

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                this = bigxsget(key.upper())
                if this in XS.TYPES:
                    return self._setter(key.lower(), self._open, this)
            except AttributeError:
                raise AttributeError(key)

    def __setattr__(self, key, value, setr=object.__setattr__):
        try:
            this = bigxsget(key.upper())
            self._options[this] = value
            setr(self, key.lower(), value)
        except AttributeError:
            setr(self, key, value)

    def _set(self, default, key, value, msg):
        if value != default:
            if value > 0:
                value = c_int(value)
                xs.setctxopt(self._ctx, key, value, sizeof(value))
                del value
            else:
                raise xs.XSOperationError(msg)

    def _open(self, stype, **options):
        return Socket(self, stype, **options)

    def close(self):
        rc = xs.term(self._ctx)
        del self.sockets
        self._ctx = None
        if rc:
            raise xs.XSOperationError('socket failed to close')
        return rc

    __del__ = close


class Socket(Setter):

    def __init__(self, context, stype, **options):
        self.context = context
        self._type = stype
        self._socket = xs.socket(context._ctx, stype)
        options.update(context._options)
        _set = self._set
        rc = 0
        for k, v in items(options):
            rc |= _set(k, v)
        self.last_rc = rc
        context.sockets[self.id] = self
        self.closed = False
        self.connections = OrderedDict()
        self.bindings = OrderedDict()

    def __enter__(self):
        return self

    def __getattr__(self, key, getr=object.__getattribute__):
        try:
            return getr(self, key)
        except AttributeError:
            try:
                return self._setter(
                    key, xsget(key.lower().strip('_')), self._socket
                )
            except AttributeError:
                try:
                    return self._get(key)
                except AttributeError:
                    if key.startswith(('bind', 'connect')):
                        method, prefix = key.split('_')
                        return self._setter(
                            key, getattr(
                                self, '_s' + method), XS.PROTO[prefix],
                        )
                    raise AttributeError(key)

    def __setattr__(self, key, value, setr=object.__setattr__):
        try:
            self._set(key, value)
        except AttributeError:
            setr(self, key, value)

    def _opt_type(self, key):
        opt_key = bigxsget(key.upper())
        if opt_key in XS.INT:
            opt_type = c_int
        elif opt_key in XS.INT64:
            opt_type = c_int64
        elif opt_key in XS.BINARY:
            opt_type = c_char_p
        return opt_key, opt_type

    def _get(self, key):
        opt_key, opt_type = self._opt_type(key)
        value = opt_type()
        self._getsockopt(opt_key, value, c_size_t(sizeof(value)))
        return value.value

    def _set(self, key, value):
        opt_key, opt_type = self._opt_type(key)
        value = opt_type(value)
        self.last_rc = self._setsockopt(
            opt_key,
            byref(value),
            0 if opt_key == XS.SUBSCRIBE else sizeof(value),
        )
        return self.last_rc

    def _sbind(self, prefix, *addresses):
        for address in addresses:
            self.last_rc = self._bind(prefix + tobytes(address))
            self.bindings[unique_id()] = self.last_rc
        return self

    def _sconnect(self, prefix, *addresses):
        for address in addresses:
            self.last_rc = self._connect(prefix + tobytes(address))
            self.connections[unique_id()] = self.last_rc
        return self

    @lazy
    def id(self):
        return unique_id()

    def send(self, data, size=None, more=False, nowait=False):
        msg = Message(size, data)
        msg.last_rc = self.last_rc = self._send(
            msg.src,
            len(msg),
            XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def sendmsg(self, data, more=False, nowait=False):
        msg = MessageXS(data)
        src = msg.src
        msg.last_rc = self.last_rc = self._sendmsg(
            src, XS.DONTWAIT if nowait else 0 | XS.SNDMORE if more else 0,
        )
        return msg

    def recv(self, size, nowait=False):
        msg = Message(size)
        msg.last_rc = self.last_rc = self._recv(
            msg.dest, msg.size, XS.DONTWAIT if nowait else 0,
        )
        msg.more = self.rcvmore
        return msg

    def recvmsg(self, nowait=False):
        msg = MessageXS()
        try:
            msg.last_rc = self.last_rc = self._recvmsg(
                msg.dest, XS.DONTWAIT if nowait else 0,
            )
            msg.more = self.rcvmore
        finally:
            msg.close()
        return msg

    def shutdown(self, sid):
        self.last_rc = self._shutdown(c_int(sid))
        return self

    def close(self):
        rc = xs.close(self._socket)
        del self.context.sockets[self.id]
        self._socket = self.context = self.last_rc = None
        if not rc:
            self.closed = True
        return rc

    def __exit__(self, e, c, b):
        self.close()
        if any((e is not None, c is not None, b is not None)):
            return False
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.