Commits

Anonymous committed a42a168 Draft

unified in- and outbound endpoints

  • Participants
  • Parent commits df52ceb

Comments (0)

Files changed (3)

File araldo_redis/endpoints/redis_endpoint.py

 patch_all()
 
 import redis
-from araldo.endpoints import InBoundBase, OutBoundBase, PluginException
+from araldo.endpoints import EndPointBase, PluginException
 
 
-class InBound(InBoundBase):  # pylint: disable-msg=R0904
-    """ InBound endpoint implementation for receiving *Redis* messages
+class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
+    """ Endpoint implementation for receiving *Redis* messages
     """
     def __init__(self, **kwargs):
         """ Creates an InBound instance for relaying Redis messages
             self._server = redis_server
         self._client = self._server.pubsub()
         self._client.subscribe(self._channel_name)
-        InBoundBase.__init__(self, **kwargs)
+        EndPointBase.__init__(self, **kwargs)
 
     @staticmethod
     def plugin_id():
         """ :return: ID of plugin
         """
-        return "inbound-redis"
+        return "endpoint-redis"
 
     def _run(self):  # pylint: disable-msg=E0202
         """ Enter infinite gevent loop to receive messages from
                     self, redis_message)
                 data = redis_message["data"]
                 self._logger.debug("data: %s", data)
+                if data == 1:
+                    self._logger.debug("Skipping subscription message")
+                    continue
                 message = self._marshalling.to_internal_format(data)
                 self._logger.debug(
                     "unmarshalled: %s",
         except Exception as error:
             raise PluginException("Redis connection problem: %s" % str(error))
 
-
-class OutBound(OutBoundBase):
-    """ Outound implementation for sending messages via *Redis*
-    """
-    def __init__(self, **kwargs):
-        config = kwargs["config"]
-        self._channel_name = config["channel"]
-        self._server = redis.Redis(
-            host=config["host"],
-            port=config["port"])
-        OutBoundBase.__init__(self, **kwargs)
-
-    @staticmethod
-    def plugin_id():
-        """ :return: ID of plugin
-        """
-        return "outbound-redis"
-
     def send(self, message):
         message_str = self._marshalling.to_external_format(message)
-        self._logger.debug("message_str: %s", message_str)
+        self._logger.debug(
+            "Publishing to channel '%s' (message_str: %s)",
+            self._channel_name, message_str)
         self._server.publish(
             self._channel_name,
             message_str)
         "redis >= 2.7.1"
     ],
     entry_points={
-        "araldo.endpoints.inbound": [
+        "araldo.endpoints.endpoint": [
             "redis-inbound = araldo_redis.endpoints.redis_endpoint"
-        ],
-        "araldo.endpoints.outbound": [
-            "redis-outbound = araldo_redis.endpoints.redis_endpoint"
         ]
     },
     classifiers=[

File test/redis_test.py

 
 
 @pytest.fixture
-def inbound(plugin_manager, config, queue, redis_mock):
-    return redis_endpoint.InBound(
-        name="redis_1",
-        plugin_manager=plugin_manager,
-        config=config,
-        gevent_queue=queue,
-        redis_server=redis_mock)
-
-
-@pytest.fixture
-def outbound(plugin_manager, config, redis_mock):
+def endpoint(plugin_manager, config, queue, redis_mock):
     with patch.object(redis_endpoint.redis, "Redis", redis_mock):
-        return redis_endpoint.OutBound(
+        return redis_endpoint.EndPoint(
             name="redis_1",
             plugin_manager=plugin_manager,
-            config=config)
+            config=config,
+            gevent_queue=queue,
+            redis_server=redis_mock)
 
 
-def test_inbound_plugin_id():
-    assert redis_endpoint.InBound.plugin_id() == "inbound-redis"
+#@pytest.fixture
+#def outbound(plugin_manager, config, redis_mock):
+#    with patch.object(redis_endpoint.redis, "Redis", redis_mock):
+#        return redis_endpoint.OutBound(
+#            name="redis_1",
+#            plugin_manager=plugin_manager,
+#            config=config)
+
+
+def test_plugin_id():
+    assert redis_endpoint.EndPoint.plugin_id() == "endpoint-redis"
 
 
 def test_check_connection_problem(plugin_manager, config, queue):
     def f():
         raise redis.ConnectionError("test error")
     redis_mock.echo.side_effect = f
-    end_point = redis_endpoint.InBound(
+    end_point = redis_endpoint.EndPoint(
         name="redis_1",
         plugin_manager=plugin_manager,
         config=config,
         end_point.check()
 
 
-def test_inbound_run(inbound):
+def test_run(endpoint):
     timeout = Timeout(.1)
     timeout.start()
     with pytest.raises(Timeout):
-        inbound.start()
-        inbound.join()
+        endpoint.start()
+        endpoint.join()
 
 
-def test_check(inbound):
-    inbound.check()
+def test_check(endpoint):
+    endpoint.check()
 
 
-def test_send(outbound):
+def test_send(endpoint):
     message = Mock()
-    outbound.send(message)
-
-
-def test_outbound_plugin_id():
-    assert redis_endpoint.OutBound.plugin_id() == "outbound-redis"
+    endpoint.send(message)