Commits

Anonymous committed d3a20d8 Draft

Feature: auto-reconnect

Comments (0)

Files changed (3)

araldo_redis/endpoints/redis_endpoint.py

         config = kwargs["config"]
         redis_server = kwargs.get("redis_server", None)
         self._channel_name = config["channel"]
+        self._host = config["host"]
+        self._port = config["port"]
+        EndPointBase.__init__(self, **kwargs)
+
+        self._connect(redis_server)
+
+    def _connect(self, redis_server=None):
+        """ Establishes connection to Redis server
+        """
+        self._logger.debug("Connecting to Redis server at %s:%d",
+                           self._host, self._port)
         if not redis_server:
             self._server = redis.Redis(
-                host=config["host"],
-                port=config["port"])
+                host=self._host,
+                port=self._port)
         else:
             self._server = redis_server
         self._client = self._server.pubsub()
         self._client.subscribe(self._channel_name)
-        EndPointBase.__init__(self, **kwargs)
 
     @staticmethod
     def plugin_id():
                 self._logger.debug(
                     "Unable to unmarshal message; discarding (%s)",
                     error)
-            gevent.sleep(0)
+                while True:
+                    try:
+                        self._logger.debug("Attempting reconnect")
+                        self._connect()
+                        messages = self._client.listen()
+                        break
+                    except Exception as error2:
+                        self._logger.debug("Reconnection attempt failed (%s)",
+                                           error2)
+                        gevent.sleep(1)
+        gevent.sleep(0)
 
     def check(self):
         """ implementation of abstract method

doc/araldo-redis.rst

     channel: channel_1
     marshalling: marshal-raw
 
+Behavior
+========
+
+- Currently there is no persistence of messages
+- Endpoints will automatically attempt to reconnect every second if the
+  Redis server is down
+

test/redis_test.py

 from gevent import Timeout
 from mock import Mock, patch
 from araldo.endpoints import PluginException
-from araldo_redis.endpoints import redis_endpoint
+from araldo_redis.endpoints import redis_endpoint as sut
 
 
 @pytest.fixture
 
 @pytest.fixture
 def endpoint(plugin_manager, config, queue, redis_mock):
-    with patch.object(redis_endpoint.redis, "Redis", redis_mock):
-        return redis_endpoint.EndPoint(
+    with patch.object(sut.redis, "Redis", redis_mock):
+        return sut.EndPoint(
             name="redis_1",
             plugin_manager=plugin_manager,
             config=config,
 
 #@pytest.fixture
 #def outbound(plugin_manager, config, redis_mock):
-#    with patch.object(redis_endpoint.redis, "Redis", redis_mock):
-#        return redis_endpoint.OutBound(
+#    with patch.object(sut.redis, "Redis", redis_mock):
+#        return sut.OutBound(
 #            name="redis_1",
 #            plugin_manager=plugin_manager,
 #            config=config)
 
 
 def test_plugin_id():
-    assert redis_endpoint.EndPoint.plugin_id() == "endpoint-redis"
+    assert sut.EndPoint.plugin_id() == "endpoint-redis"
 
 
 def test_check_connection_problem(plugin_manager, config, queue):
     def f():
         raise redis.ConnectionError("test error")
     redis_mock.echo.side_effect = f
-    end_point = redis_endpoint.EndPoint(
+    end_point = sut.EndPoint(
         name="redis_1",
         plugin_manager=plugin_manager,
         config=config,
 def test_send(endpoint):
     message = Mock()
     endpoint.send(message)
+
+
+def test_connect(plugin_manager, config, queue):
+    with patch.object(sut.redis, "Redis"):
+        sut.EndPoint(
+            name="redis_1",
+            plugin_manager=plugin_manager,
+            config=config,
+            gevent_queue=queue)