1. Bernhard Biskup
  2. araldo

Commits

bb b...@gmx.de  committed 4f3784c Draft

unified in- and outbound endpoints for simpler handling
and for sharing objects (w.g. with websocket client)

  • Participants
  • Parent commits 6b7ccc5
  • Branches default

Comments (0)

Files changed (13)

File araldo/app.py

View file
 """
 import logging
 import re
-from araldo.endpoints.websocket import InOutBound
+from araldo.endpoints.websocket import EndPoint
 
 STATIC_DIR = "."
 
         self._logger = logging.getLogger("araldo")
         self._plugin_manager = plugin_manager
         plugin_instances = self._plugin_manager.plugin_instances()
-        outbound = plugin_instances["araldo.endpoints.outbound"]
+        outbound = plugin_instances["araldo.endpoints.endpoint"]
         self._websockets = outbound
 
     def _handle_websocket(self, environ, web_socket_name):
         #self._logger.debug("### %s" % web_socket.__class__)
 
         web_socket_conf = self._plugin_manager.plugin_subconfig(
-            "araldo.endpoints.inbound", web_socket_name)
+            "araldo.endpoints.endpoint", web_socket_name)
 
-        endpoint = InOutBound(
+        endpoint = EndPoint(
             config=web_socket_conf,
             gevent_queue=self._queue,
             name=web_socket_name,

File araldo/config.py

View file
             },
             required("plugins"): {
                 required(
-                    "araldo.endpoints.inbound"):
-                [{
-                    v.optional("instantiate"): bool,
-                    v.extra: object,
-                }],
-                required(
-                    "araldo.endpoints.outbound"):
+                    "araldo.endpoints.endpoint"):
                 [{
                     v.optional("instantiate"): bool,
                     v.extra: object,

File araldo/endpoints/__init__.py

View file
         Exception.__init__(self, msg)
 
 
-class EndPointBase:
+class EndPointBase(gevent.Greenlet):  # pylint: disable-msg=R0904
     """ Abstract base class for Araldo endpoints
 
-        Keyword arguments:
-
+        Concrete classes must implement Greenlet's _run method to
+        process incoming messages and to enque them into gevent_queue
     """
     __meta__ = ABCMeta
 
         #import pdb; pdb.set_trace()
         marshalling_name = self._config.get("marshalling", "marshal-json")
         self._marshalling = self._get_marshalling(marshalling_name)
+        self._gevent_queue = kwargs["gevent_queue"]
+        gevent.Greenlet.__init__(self)
 
     def _get_marshalling(self, marshalling_name):
         """ Retrieve plugin instances for marshaller
         """
         return self._config
 
-
-class InBoundBase(gevent.Greenlet, EndPointBase):  # pylint: disable-msg=R0904
-    """ Base class for plugin implementations of an inbound endpoint
-
-        Concrete classes must implement Greenlet's _run method to
-        process incoming messages and to enque them into gevent_queue
-    """
-    __meta__ = ABCMeta
-
-    def __init__(self, **kwargs):
-        self._gevent_queue = kwargs["gevent_queue"]
-        gevent.Greenlet.__init__(self)
-        EndPointBase.__init__(self, **kwargs)
-
     @property
     def gevent_queue(self):
         """ gevent target queue
         return "endpoint '%s'" % self.name()
 
     def __repr__(self):
-        return ("InBound(name=%s,id=%s)"
+        return ("EndPoint(name=%s,id=%s)"
                 % (self.name(), id(self)))
 
-
-class OutBoundBase(EndPointBase):
-    """ Base class for outbound endpoint implementations
-    """
-    __meta__ = ABCMeta
-
-    def __init__(self, **kwargs):
-        EndPointBase.__init__(self, **kwargs)
-
     @abstractmethod
     def send(self, message):
         """ Send message to backend
         """
-
-    def __repr__(self):
-        return ("OutBound(name=%s,id=%s)"
-                % (self.name(), id(self)))

File araldo/endpoints/websocket.py

View file
 
 patch_all()
 
-from araldo.endpoints import InBoundBase, OutBoundBase
+from araldo.endpoints import EndPointBase
 from araldo.message import Message
 
-WS_TIMEOUT = 10  # WebSocket timeout (s)
+WS_TIMEOUT = 60  # WebSocket timeout (s)
 
 
-class InOutBound(OutBoundBase, InBoundBase):  # pylint: disable-msg=R0904
+class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
     """ In- and OutBound endpoint implementation for
         receiving/sending WebSocket message
     """
         """
         self._web_socket = kwargs["web_socket"]
 
-        OutBoundBase.__init__(self, **kwargs)
-        InBoundBase.__init__(self, **kwargs)
+        EndPointBase.__init__(self, **kwargs)
 
         plugin_instances = self.plugin_manager().plugin_instances()
-        outbound = plugin_instances["araldo.endpoints.outbound"]
+        outbound = plugin_instances["araldo.endpoints.endpoint"]
         self._web_sockets = outbound
 
     @staticmethod
     def plugin_id():
         """ :return: ID of plugin
         """
-        return "inoutbound-websocket"
+        return "endpoint-websocket"
 
     def _communicate_with_websocket(self, web_socket, web_socket_name):
         """ Receive incoming message from websocket.
             :param web_socket: WebSocket to listen to
             :raises Timeout: if no message receives for `WS_TIMEOUT` seconds.
         """
+        self._logger.debug(
+            "Communicating with WebSocket '%s'",
+            web_socket_name)
         timeout = gevent.Timeout(WS_TIMEOUT)
         timeout.start()
         message = web_socket.receive()

File araldo/plugin_manager.py

View file
                 "araldo.marshalling",
                 "Marshalling")
             self._load_plugins_for_group(
-                "araldo.endpoints.inbound",
-                "InBound")
-            self._load_plugins_for_group(
-                "araldo.endpoints.outbound",
-                "OutBound")
+                "araldo.endpoints.endpoint",
+                "EndPoint")
         else:
             self._log.debug("Plugins were previously loaded")
         dump_obj(self._plugins, self._log, "Plugin registry")
         plugins = self._config.settings()["plugins"]
         plugin_group_keys = [
             "araldo.marshalling",
-            "araldo.endpoints.inbound",
-            "araldo.endpoints.outbound"]
+            "araldo.endpoints.endpoint"]
         for plugin_group_key in plugin_group_keys:
             plugin_group = plugins[plugin_group_key]
             self._log.debug("Plugin group '%s'" % plugin_group_key)

File araldo/router.py

View file
                 "Using route: %s --> %s",
                 source, ", ".join(destinations))
         self._outbound = self._plugin_manager.plugin_instances()[
-            "araldo.endpoints.outbound"]
+            "araldo.endpoints.endpoint"]
 
     def routes(self):
         """ :return: configured routes

File araldo/server.py

View file
     """
     plugin_manager = PluginManager(config)
     plugin_instances = plugin_manager.plugin_instances()
-    inbound_handlers = plugin_instances["araldo.endpoints.inbound"]
-    logger.debug("Starting %d inbound handlers", len(inbound_handlers))
-    for handler in inbound_handlers.values():
+    endpoint_handlers = plugin_instances["araldo.endpoints.endpoint"]
+    logger.debug("Starting %d endpoint handlers", len(endpoint_handlers))
+    for handler in endpoint_handlers.values():
         handler.start()
     return plugin_manager
 

File scripts/websocket_client.py

View file
 if __name__ == '__main__':
     msgLen = int(sys.argv[1])
     enableTrace(True)
-    ws = create_connection("ws://localhost:8890/websocket")
+    ws = create_connection("ws://localhost:8890/websocket/websocket_1")
     a = ord("a")
     message = "".join(
         [chr(int(a + random.random() * 26)) for x in range(msgLen)])

File scripts/websocket_client_gevent.py

View file
     count = 1
     if len(sys.argv) > 1:
         count = int(sys.argv[1])
-    ws = create_connection("ws://localhost:8890/websocket")
+    ws = create_connection("ws://localhost:8890/websocket/websocket_1")
 
     payload = "*" * PAYLOAD_LEN
 

File test/endpoints_test.py

View file
 from mock import Mock
 from gevent.queue import Queue
 from araldo.endpoints import PluginException
-from araldo.endpoints import InBoundBase, OutBoundBase
+from araldo.endpoints import EndPointBase
 
 
 @pytest.fixture
 
 
 @pytest.fixture
-def outbound_derived():
-    class C(OutBoundBase):
+def endpoint_derived():
+    class C(EndPointBase):
         pass
     return C
 
     assert "testmessage" == str(exception)
 
 
-def test_inbound_instantiation(queue, plugin_manager, marshal_json):
-    class C(InBoundBase):
+def test_endpoint(queue, plugin_manager, marshal_json):
+    class C(EndPointBase):
         pass
     conf = {}
     c = C(
     assert queue == c.gevent_queue
     assert plugin_manager == c.plugin_manager()
     assert c.marshalling() == marshal_json
-
-
-def test_outbound_instantiation(
-        outbound_derived,
-        plugin_manager,
-        marshal_json):
-    conf = {}
-    c = outbound_derived(
-        name="testname",
-        # use default marshalling
-        plugin_manager=plugin_manager,
-        config=conf)
-    assert c.config() == conf
-    assert c.name() == "testname"
-    assert c.plugin_manager() == plugin_manager
-    assert c.marshalling() == marshal_json
-
-
-def test_outbound_instantiation_explicit_marshalling(
-        outbound_derived,
-        plugin_manager):
-    conf = {
-        "marshalling": "marshal-test"
-    }
-    test_marshal = Mock()
-    plugin_instances = plugin_manager.plugin_instances()
-    marshalling_instances = plugin_instances["araldo.marshalling"]
-    marshalling_instances["marshal-test"] = test_marshal
-    c = outbound_derived(
-        name="testname",
-        plugin_manager=plugin_manager,
-        config=conf)
-    assert c.config() == conf
-    assert c.name() == "testname"
-    assert c.plugin_manager() == plugin_manager
-    assert c.marshalling() == test_marshal

File test/endpoints_websocket_test.py

View file
     result = Mock()
     plugin_instances = Mock()
     plugin_instances.return_value = {
-        "araldo.endpoints.outbound": {}
+        "araldo.endpoints.endpoint": {}
     }
     result.plugin_instances = plugin_instances
     return result
 
 @pytest.fixture
 def endpoint(sub_config, queue, plugin_manager, valid_socket):
-    with patch.object(sut.InOutBound, "_get_marshalling") as getm:
+    with patch.object(sut.EndPoint, "_get_marshalling") as getm:
         getm.return_value = json_marshalling.Marshalling()
-        result = sut.InOutBound(
+        result = sut.EndPoint(
             config=sub_config,
             gevent_queue=queue,
             name="test_websocket",
 
 
 def test_plugin_id(endpoint):
-    assert endpoint.plugin_id() == "inoutbound-websocket"
+    assert endpoint.plugin_id() == "endpoint-websocket"
 
 
 def test_websocket_instantiation(endpoint):

File test/router_test.py

View file
 
 @pytest.fixture
 def test_instantiation(router):
-    assert router._routes == {"redis_1": ["redis_out_3", "redis_out_4"]}
+    assert router._routes == {"redis_1": ["redis_3", "redis_4"]}
 
 
 def test_existing_route(plugin_manager, router):
-    redis_out_3 = Mock()
-    outbound_entries = {"redis_out_3": redis_out_3}
+    redis_3 = Mock()
+    outbound_entries = {"redis_3": redis_3}
     router._outbound = outbound_entries
     message = Mock()
     router.route("redis_1", message)
-    assert redis_out_3.send.called
+    assert redis_3.send.called
 
 
 def test_nonexisting_route(plugin_manager, router):
-    redis_out_3 = Mock()
-    outbound_entries = {"redis_out_3": redis_out_3}
+    redis_3 = Mock()
+    outbound_entries = {"redis_3": redis_3}
     router._outbound = outbound_entries
     message = Mock()
     router.route("redis_10", message)
-    assert not redis_out_3.send.called
+    assert not redis_3.send.called
 
 
 def test_recover_from_send_error(plugin_manager, router):
     def raise_send(message):
         raise Exception("test exception")
 
-    redis_out_3 = Mock()
-    redis_out_3.send.side_effect = raise_send
-    redis_out_4 = Mock()
+    redis_3 = Mock()
+    redis_3.send.side_effect = raise_send
+    redis_4 = Mock()
     outbound_entries = {
-        "redis_out_3": redis_out_3,
-        "redis_out_4": redis_out_4,
+        "redis_3": redis_3,
+        "redis_4": redis_4,
     }
     router._outbound = outbound_entries
     message = Mock()
     router.route("redis_1", message)
-    assert redis_out_3.send.called
-    assert redis_out_4.send.called
+    assert redis_3.send.called
+    assert redis_4.send.called

File testfiles/conf1.yaml

View file
   araldo.marshalling:
     - name: marshal-json
       id: marshal-json
-  araldo.endpoints.inbound:
-    - name: redis_in_1
-      id: inbound-redis
+  araldo.endpoints.endpoint:
+    - name: redis_1
+      id: endpoint-redis
       host: localhost
       port: 6379
       channel: channel_1
-    - name: redis_in_2
-      id: inbound-redis
+    - name: redis_2
+      id: endpoint-redis
       host: localhost
       port: 6379
       channel: channel_2
-  araldo.endpoints.outbound:
-    - name: redis_out_3
-      id: outbound-redis
+    - name: redis_3
+      id: endpoint-redis
       host: localhost
       port: 6379
       channel: channel_3
 
 routes:
   redis_1:
-    - redis_out_3
-    - redis_out_4
+    - redis_3
+    - redis_4