1. Bernhard Biskup
  2. araldo-redis

Source

araldo-redis / araldo_redis / endpoints / redis_endpoint.py


""" araldo Endpoint for communication with Redis PubSub
"""

import gevent
from gevent.monkey import patch_all

patch_all()

import redis
from araldo.endpoints import EndPointBase, PluginException


class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
    """ Endpoint implementation for receiving *Redis* messages
    """
    def __init__(self, **kwargs):
        """ Creates an InBound instance for relaying Redis messages
            :param config: configuration object
            :param gevent_queue: queue for forwarding incoming messages
        """
        config = kwargs["config"]
        redis_server = kwargs.get("redis_server", None)
        self._channel_name = config["channel"]
        if not redis_server:
            self._server = redis.Redis(
                host=config["host"],
                port=config["port"])
        else:
            self._server = redis_server
        self._client = self._server.pubsub()
        self._client.subscribe(self._channel_name)
        EndPointBase.__init__(self, **kwargs)

    @staticmethod
    def plugin_id():
        """ :return: ID of plugin
        """
        return "endpoint-redis"

    def _run(self):  # pylint: disable-msg=E0202
        """ Enter infinite gevent loop to receive messages from
            Redis pubsub queue and push it to gevent queue
        """
        messages = self._client.listen()
        while True:
            try:
                redis_message = messages.next()
                self._logger.debug(
                    "%s: received: %s",
                    self, redis_message)
                data = redis_message["data"]
                self._logger.debug("data: %s", data)
                if data == 1:
                    self._logger.debug("Skipping subscription message")
                    continue
                message = self._marshalling.to_internal_format(data)
                self._logger.debug(
                    "unmarshalled: %s",
                    message)
                self.gevent_queue.put((self.name(), message))
            except Exception as error:
                self._logger.debug(
                    "Unable to unmarshal message; discarding (%s)",
                    error)
            gevent.sleep(0)

    def check(self):
        """ implementation of abstract method
        """
        try:
            response = self._server.echo("x")
            if response != "x":
                raise PluginException("Redis echo failed")
        except Exception as error:
            raise PluginException("Redis connection problem: %s" % str(error))

    def send(self, message):
        message_str = self._marshalling.to_external_format(message)
        self._logger.debug(
            "Publishing to channel '%s' (message_str: %s)",
            self._channel_name, message_str)
        self._server.publish(
            self._channel_name,
            message_str)