Commits

Anonymous committed b69609f Draft

incorporated marshalling as plugin; removed warnings

Comments (0)

Files changed (12)

 from geventwebsocket.websocket import WebSocket
 import gevent
 from araldo.message import Message
-from araldo.plugin_manager import PluginManager
 
 STATIC_DIR = "."
 WS_TIMEOUT = 10  # WebSocket timeout (s)
 
 WEBSOCKET_NAME_RE = re.compile("/websocket/([^/]+)/?")
 
+
 class TooLong(Exception):
     """ Indicates a gevent timeout
 
         logging.getLogger("araldo").debug("A timeout occurred")
         Exception.__init__(self, TooLong._msg)
 
+
 class AppException(Exception):
-    """ An internal exception 
+    """ An internal exception
     """
 
     def __init__(self, msg):
         logging.getLogger("araldo").error(msg)
         Exception.__init__(self, msg)
 
+
 class WebSocketApp(object):
     """ Provides bidirectional communication:
         - inbound via WebSocket
                 len(message))
             self._queue.put(
                 ("web_socket",
-                Message(
-                    origin="web_socket",
-                    destination="",
-                    payload=message)))
+                 Message(
+                     origin="web_socket",
+                     destination="",
+                     payload=message)))
             gevent.sleep(0)
         else:
             self._logger.debug("null message (client disconnect?)")
 ##                del self_._websockets[id(socket)]
 ##        start_response('200 OK', [('Content-Type', 'text/html')])
 ##        return ["outbound response\n"]
-    
+
     @staticmethod
     def _parse_websocket_name(path_info):
         """ Extract web socket name from URL path
     _instance = None  # singleton instance
     _log = logging.getLogger("araldo")
 
-
     def __init__(self, config_file_name):
         self._config_file_name = config_file_name
         self._log.debug(
         port_spec = all(int, v.range(min=1, max=65535))
 
         self._validation_schema = Schema({
-            "global" : {"port": port_spec}, 
-            required("plugins"): { 
-              required("araldo.endpoints.inbound"):    [{
+            "global": {"port": port_spec},
+            required("plugins"): {
+                required(
+                    "araldo.endpoints.inbound"):
+                [{
                     "name": str,
                     "id": str,
                     "host": str,
                     "port": port_spec,
                     "channel": str
-              }], 
-              required("araldo.endpoints.outbound"):    [{
+                }],
+                required(
+                    "araldo.endpoints.outbound"):
+                [{
                     "name": str,
                     "id": str,
                     "host": str,
                     "port": port_spec,
                     "channel": str
-              }], 
+                }],
+                required(
+                    "araldo.marshalling"):
+                [{
+                    "name": str,
+                    "id": str,
+                }],
             },
             required("routes"): {str: [str]}}
         )
         try:
             self._validation_schema(conf)
         except v.Error as error:
-            error_message = "Error in configuration file format: %s", str(error)
+            error_message = ("Error in configuration file format: %s",
+                             str(error))
             self._log.error(error_message)
             raise ConfigException(error_message)
 
-
     def settings(self):
-        """ 
+        """
         :return: raw configuration (as created by *pyaml*)
         """
         return self._conf
         """ Singleton getter. The configuration must have been initialized
             before calling this method by calling `Config.create`
         """
-        if Config._instance == None:
+        if Config._instance is None:
             raise ConfigException("Configuration not initialized")
         return Config._instance
 
         """
         Config._instance = Config(config_file_name)
         return Config._instance
-

araldo/endpoints/__init__.py

 """ Communication endpoints
 """
 
-
+import logging
 from abc import ABCMeta, abstractmethod
 import gevent
 
 
     def __init__(self, **kwargs):
         self._name = kwargs["name"]
+        self._logger = logging.getLogger("araldo")
         self._plugin_manager = kwargs["plugin_manager"]
         self._config = kwargs["config"]
 
+        marshalling_name = kwargs.get("marshalling", "marshal-default")
+        self._marshalling = self._get_marshalling(marshalling_name)
+
+    def _get_marshalling(self, marshalling_name):
+        """ Retrieve plugin instances for marshaller
+        :return: marshalling-Object
+        :rtype: araldo.marshalling.Marshaller
+        """
+        self._logger.debug("Obtaining marshaller '%s'" % marshalling_name)
+        plugin_instances = self._plugin_manager.plugin_instances()
+        marshalling_instances = plugin_instances["araldo.marshalling"]
+        self._logger.debug("### %s" % marshalling_instances)
+        return marshalling_instances[marshalling_name]
+
     @abstractmethod
     def name(self):
         """ Short, human-readable name of plugin
         return self._name
 
     @abstractmethod
+    def marshalling(self):
+        """ Short, human-readable name marshalling used
+        """
+        return self._marshalling
+
+    @abstractmethod
     def plugin_manager(self):
         """ plugin manager for loading other plugins
         """
     __meta__ = ABCMeta
 
     def __init__(self, **kwargs):
-        name = kwargs["name"]
-        plugin_manager = kwargs["plugin_manager"]
-        config = kwargs["config"]
         EndPointBase.__init__(self, **kwargs)
 
     @abstractmethod

araldo/marshalling/__init__.py

 
     For new development, it is recommended to use araldo's internal format.
     However, if a particular format is required or when interfacing an
-    existing system, a custom marshalling plugin may be implemented to this end.
+    existing system, a custom marshalling plugin may be
+    implemented to this end.
 """
 
 from abc import ABCMeta, abstractmethod
 
+
 class MarshallingBase():
     """ Marshalling plugin base class
     """
     __meta__ = ABCMeta
 
-    def __init__(self):
+    def __init__(self, **kwargs):
         pass
 
     @abstractmethod
     def to_internal_format(self, message_str):
         """ Maps a custom format (serialized) to internal format
         """
-        
+
     @abstractmethod
     def to_external_format(self, message):
         """ Maps to internal format to external format (serialized)

araldo/marshalling/default_marshalling.py

 
 JSONPICKLE_TIMESTAMP_FORMAT = "%Y-%m-%d_%H:%M:%S.%f"
 
-class DefaultMarshalling(MarshallingBase):
+
+class Marshalling(MarshallingBase):
     """ Marhals/unmarshals to/from JSON
     """
-    def __init__(self):
+    def __init__(self, **kwargs):
         MarshallingBase.__init__(self)
 
+    @staticmethod
+    def plugin_id():
+        """ :return: ID of plugin
+        """
+        return "marshal-default"
+
     def to_internal_format(self, message_str):
-        obj =  json.loads(message_str)
+        obj = json.loads(message_str)
         # fix date
         obj["creation_timestamp"] = datetime.strptime(
             obj["creation_timestamp"], JSONPICKLE_TIMESTAMP_FORMAT)

araldo/plugin_manager.py

 from gevent.queue import Queue
 from araldo.util import dump_obj
 
+
 class PluginManager:
     """ Manages loading and instantiation of plugins
     """
 
     def load_plugins(self):
         """ Load araldo plugins
+
+            *Order is important* (dependencies): e.g., marshalling must be
+            instantiated before endpoints
         """
         if self._plugins is None:
             self._log.debug("Loading araldo plugins...")
             self._plugins = defaultdict(lambda: {})
 
             self._load_plugins_for_group(
-                "araldo.endpoints.inbound", 
+                "araldo.marshalling",
+                "Marshalling")
+            self._load_plugins_for_group(
+                "araldo.endpoints.inbound",
                 "InBound")
             self._load_plugins_for_group(
-                "araldo.endpoints.outbound", 
+                "araldo.endpoints.outbound",
                 "OutBound")
         else:
             self._log.debug("Plugins were previously loaded")
         plugin_instance = plugin_class(
             name=plugin_config["name"],
             plugin_manager=self,
-            config=plugin_config, 
+            config=plugin_config,
             gevent_queue=self._gevent_queue)
         return plugin_instance
 
             The configuration file lists instances with their *unique*
             names and plugin configuration
         """
+        self.load_plugins()
         self._log.debug("Instantiating plugins")
-        self.load_plugins()
         self._plugin_instances = defaultdict(lambda: {})
-        plugin_items = self._config.settings()["plugins"].items()
-        for plugin_group_key, plugin_group in plugin_items:
+        plugins = self._config.settings()["plugins"]
+        plugin_group_keys = [
+            "araldo.marshalling",
+            "araldo.endpoints.inbound",
+            "araldo.endpoints.outbound"]
+        for plugin_group_key in plugin_group_keys:
+            plugin_group = plugins[plugin_group_key]
             self._log.debug("Plugin group '%s'" % plugin_group_key)
             for plugin_config in plugin_group:
                 self._log.debug("plugin_config: %s" % plugin_config)
                 plugin_instance = self._instantiate_plugin(
-                    plugin_group_key, 
+                    plugin_group_key,
                     plugin_config)
                 plugin_name = plugin_config["name"]
                 self.add_plugin_instance(
-                    plugin_group_key, 
-                    plugin_name, 
+                    plugin_group_key,
+                    plugin_name,
                     plugin_instance)
         self._log.debug(
-            "Plugin instances: %s",  
+            "Plugin instances: %s",
             self._plugin_instances)
 """ Routing of messages from inbound to outbound endpoints
-""" 
+"""
 import logging
 import gevent
 
         self._outbound = self._plugin_manager.plugin_instances()[
             "araldo.endpoints.outbound"]
 
-
     def routes(self):
         """ :return: configured routes
         :rtype:  {str: [str]}
         """
         return self._routes
 
-
     def route(self, origin, message):
         """ Send message via all configured outgoing endpoints
         """
 from araldo.plugin_manager import PluginManager
 
 
-
 def start_server(
         port,
         queue,
     signal.signal(signal.SIGTERM, sig_handler)
     signal.signal(signal.SIGQUIT, sig_handler)
 
+
 def setup_plugins(logger, config):
     """ Load and instantiate plugins
     """
     plugin_manager = PluginManager(config)
     plugin_instances = plugin_manager.plugin_instances()
     inbound_handlers = plugin_instances["araldo.endpoints.inbound"]
-    logger.debug("Starting %d inbound handlers", len(inbound_handlers)) 
+    logger.debug("Starting %d inbound handlers", len(inbound_handlers))
     for handler in inbound_handlers.values():
         handler.start()
     return plugin_manager
     queue = plugin_manager.gevent_queue()
     setup_sending(config, queue, plugin_manager)
     start_server(
-        config.settings()["global"]["port"], 
+        config.settings()["global"]["port"],
         queue,
         plugin_manager)
 
 """ Miscellaneous utilities for application and testing
-""" 
+"""
 from StringIO import StringIO
 import pprint
 
+
 def dump_obj(obj, logger, description=""):
     """ Dump a pretty-printed python object to log
     """
     else:
         description_str = ""
     logger.debug("%s%s", description_str, stream.getvalue())
-

test/endpoints_test.py

     return Queue()
 
 
+@pytest.fixture
+def marshal_default():
+    return Mock()
+
+
+@pytest.fixture
+def plugin_manager(marshal_default):
+    plugin_manager = Mock()
+    plugin_manager.plugin_instances.return_value = {
+        "araldo.marshalling": {
+            "marshal-default": marshal_default
+        }
+    }
+    return plugin_manager
+
+
+@pytest.fixture
+def outbound_derived():
+    class C(OutBoundBase):
+        pass
+    return C
+
+
 def test_plugin_exception():
     exception = PluginException("testmessage")
     assert "testmessage" == str(exception)
 
 
-def test_inbound_instantiation(queue):
+def test_inbound_instantiation(queue, plugin_manager, marshal_default):
     class C(InBoundBase):
         pass
-    plugin_manager = Mock()
     conf = {}
     c = C(
         name="testname",
     assert conf == c.config()
     assert queue == c.gevent_queue
     assert plugin_manager == c.plugin_manager()
+    assert c.marshalling() == marshal_default
 
 
-def test_outbound_instantiation():
-    class C(OutBoundBase):
-        pass
-    plugin_manager = Mock()
+def test_outbound_instantiation(
+        outbound_derived,
+        plugin_manager,
+        marshal_default):
     conf = {}
-    c = C(
+    c = outbound_derived(
         name="testname",
+        # use default marshalling
         plugin_manager=plugin_manager,
         config=conf)
-    assert conf == c.config()
-    assert "testname" == c.name()
-    assert plugin_manager == c.plugin_manager()
+    assert c.config() == conf
+    assert c.name() == "testname"
+    assert c.plugin_manager() == plugin_manager
+    assert c.marshalling() == marshal_default
+
+
+def test_outbound_instantiation_explicit_marshalling(
+        outbound_derived,
+        plugin_manager):
+    conf = {}
+    test_marshal = Mock()
+    plugin_instances = plugin_manager.plugin_instances()
+    marshalling_instances = plugin_instances["araldo.marshalling"]
+    marshalling_instances["marshal-test"] = test_marshal
+    c = outbound_derived(
+        name="testname",
+        marshalling="marshal-test",
+        plugin_manager=plugin_manager,
+        config=conf)
+    assert c.config() == conf
+    assert c.name() == "testname"
+    assert c.plugin_manager() == plugin_manager
+    assert c.marshalling() == test_marshal

test/marshalling_test.py

 import json
 from datetime import datetime
 from araldo.marshalling.default_marshalling import(
-    DefaultMarshalling, JSONPICKLE_TIMESTAMP_FORMAT)
+    Marshalling, JSONPICKLE_TIMESTAMP_FORMAT)
 from araldo.message import Message
 
 #TIMESTAMP_FORMAT = "%Y%m%d_%H%M%S"
 
 @pytest.fixture
 def default_marshalling():
-    return DefaultMarshalling()
+    return Marshalling()
 
 
 @pytest.fixture

testfiles/conf1.yaml

     port: 54321
 
 plugins:
+  araldo.marshalling:
+    - name: marshal-default
+      id: marshal-default
   araldo.endpoints.inbound:
     - name: redis_in_1
       id: inbound-redis