Source

araldo-redis / araldo_redis / endpoints / redis_endpoint.py

Full commit

""" araldo Endpoint for communication with Redis PubSub
"""

import gevent
from gevent.monkey import patch_all

patch_all()

import redis
from araldo.endpoints import EndPointBase, PluginException


class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
    """ Endpoint implementation for receiving *Redis* messages
    """
    def __init__(self, **kwargs):
        """ Creates an InBound instance for relaying Redis messages
            :param config: configuration object
            :param gevent_queue: queue for forwarding incoming messages
        """
        config = kwargs["config"]
        redis_server = kwargs.get("redis_server", None)
        self._channel_name = config["channel"]
        self._host = config["host"]
        self._port = config["port"]
        EndPointBase.__init__(self, **kwargs)

        self._connect(redis_server)

    def _connect(self, redis_server=None):
        """ Establishes connection to Redis server
        """
        self._logger.debug("Connecting to Redis server at %s:%d",
                           self._host, self._port)
        if not redis_server:
            self._server = redis.Redis(
                host=self._host,
                port=self._port)
        else:
            self._server = redis_server
        self._client = self._server.pubsub()
        self._client.subscribe(self._channel_name)

    @staticmethod
    def plugin_id():
        """ :return: ID of plugin
        """
        return "endpoint-redis"

    def _run(self):  # pylint: disable-msg=E0202
        """ Enter infinite gevent loop to receive messages from
            Redis pubsub queue and push it to gevent queue
        """
        messages = self._client.listen()
        while True:
            try:
                redis_message = messages.next()
                self._logger.debug(
                    "%s: received: %s",
                    self, redis_message)
                data = redis_message["data"]
                self._logger.debug("data: %s", data)
                if data == 1:
                    self._logger.debug("Skipping subscription message")
                    continue
                message = self._marshalling.to_internal_format(data)
                self._logger.debug(
                    "unmarshalled: %s",
                    message)
                self.gevent_queue.put((self.name(), message))
            except Exception as error:
                self._logger.debug(
                    "Unable to unmarshal message; discarding (%s)",
                    error)
                while True:
                    try:
                        self._logger.debug("Attempting reconnect")
                        self._connect()
                        messages = self._client.listen()
                        break
                    except Exception as error2:
                        self._logger.debug("Reconnection attempt failed (%s)",
                                           error2)
                        gevent.sleep(1)
        gevent.sleep(0)

    def check(self):
        """ implementation of abstract method
        """
        try:
            response = self._server.echo("x")
            if response != "x":
                raise PluginException("Redis echo failed")
        except Exception as error:
            raise PluginException("Redis connection problem: %s" % str(error))

    def send(self, message):
        message_str = self._marshalling.to_external_format(message)
        self._logger.debug(
            "Publishing to channel '%s' (message_str: %s)",
            self._channel_name, message_str)
        self._server.publish(
            self._channel_name,
            message_str)