Source

araldo-websocket / araldo_websocket / endpoints / websocket_endpoint.py

Full commit
""" araldo Endpoint for communication with WebSocket (client side)
"""
import gevent
from gevent import monkey
monkey.patch_all()
from websocket import create_connection, enableTrace
from araldo.endpoints import EndPointBase


class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
    """ EndPoint implementation for connecting to a WebSocket server
    """
    def __init__(self, **kwargs):
        """ Creates an endpoint instance for relaying messages received
            from a WebSocket server
            :param config: configuration object
            :param gevent_queue: queue for forwarding incoming messages
        """
        config = kwargs["config"]
        websocket_url = config["websocket_url"]
        do_trace = config["trace"]
        enableTrace(do_trace)
        self._websocket = create_connection(websocket_url)
        EndPointBase.__init__(self, **kwargs)

    @staticmethod
    def plugin_id():
        """ :return: ID of plugin
        """
        return "endpoint-websocketclient"

    def _get_next_message(self):
        """ Receive a single message from the WebSocket server
            and enqueue it
        """
        message_str = self._websocket.recv()
        message = self._marshalling.to_internal_format(message_str)
        self._logger.debug(
            "unmarshalled: %s",
            message)
        self.gevent_queue.put((self.name(), message))

    def _run(self):  # pylint: disable-msg=E0202
        """ Enter infinite gevent loop to receive messages from
            WebSocket server and push it to gevent queue
        """
        while True:
            try:
                self._get_next_message()
            except Exception as error:
                self._logger.debug(
                    "Unable to unmarshal message; discarding (%s)",
                    error)
            gevent.sleep(0)

    def check(self):
        """ implementation of abstract method
        """
        raise NotImplementedError()

    def send(self, message):
        message_str = self._marshalling.to_external_format(message)
        self._logger.debug("message_str: %s", message_str)
        self._websocket.send(message_str)