Source

araldo-redis / araldo_redis / endpoints / redis_endpoint.py


""" araldo Endpoint for communication with Redis PubSub
"""

import gevent
from gevent.monkey import patch_all

patch_all()

from gevent.queue import Queue
import logging
import redis
from araldo.endpoints import InBoundBase, OutBoundBase, PluginException



class InBound(InBoundBase):
    def __init__(self, **kwargs):
        """ Creates an InBound instance for relaying Redis messages
            :param config: configuration object
            :param gevent_queue: queue for forwarding incoming messages
        """
        config = kwargs["config"]
        gevent_queue = kwargs["gevent_queue"]
        redis_server = kwargs.get("redis_server", None)
        self._channel_name = config["channel"] 
        if not redis_server:
            self._server = redis.Redis(
                host=config["host"], 
                port=config["port"])
        else:
            self._server = redis_server
        self._client = self._server.pubsub()
        self._client.subscribe(self._channel_name)
        InBoundBase.__init__(self, config, gevent_queue)

    @staticmethod
    def plugin_id():
        return "inbound-redis"

    def _run(self):
        """ Enter infinite gevent loop to receive messages from 
            Redis pubsub queue and push it to gevent queue
        """
        messages = self._client.listen()
        while True:
            message = messages.next()
            print "%s: received: %s" % (self, message)
            self.gevent_queue.put((self._channel_name, message))
            gevent.sleep(0)


    def check(self):
        """ implementation of abstract method
        """
        try:
            response = self._server.echo("x")
            if response != "x":
                raise PluginException("Redis echo failed")
        except Exception as error:
            raise PluginException("Redis connection problem: %s" % str(error))

class OutBound(OutBoundBase):
    def __init__(self, **kwargs):
        config = kwargs["config"]
        self._channel_name = config["channel"] 
        self._server = redis.Redis(
            host=config["host"], 
            port=config["port"])
        OutBoundBase.__init__(self, config)

    @staticmethod
    def plugin_id():
        return "outbound-redis"

    def send(self, message):
        self._server.publish(
            self._channel_name,
            message)