Source

araldo-websocket / araldo_websocket / endpoints / websocket_endpoint.py

""" araldo Endpoint for communication with WebSocket (client side)
"""
import gevent
from gevent import monkey
monkey.patch_all()
from websocket import create_connection, enableTrace
from araldo.endpoints import EndPointBase


RECONNECT_DELAY = 5


class EndPoint(EndPointBase):  # pylint: disable-msg=R0904
    """ EndPoint implementation for connecting to a WebSocket server
    """
    def __init__(self, **kwargs):
        """ Creates an endpoint instance for relaying messages received
            from a WebSocket server
            :param config: configuration object
            :param gevent_queue: queue for forwarding incoming messages
        """
        config = kwargs["config"]
        self._websocket_url = config["websocket_url"]
        do_trace = config["trace"]
        enableTrace(do_trace)
        EndPointBase.__init__(self, **kwargs)
        self._connect_websocket()

    def _connect_websocket(self):
        try:
            self._logger.debug(
                "Connecting to WebSocket '%s'", self._websocket_url)
            self._websocket = create_connection(self._websocket_url)
        except Exception as error:
            self._logger.debug(
                "Reconnection attempt failed (%s); waiting & retrying...",
                error)
            gevent.sleep(RECONNECT_DELAY)

    @staticmethod
    def plugin_id():
        """ :return: ID of plugin
        """
        return "endpoint-websocketclient"

    def _get_next_message(self):
        """ Receive a single message from the WebSocket server
            and enqueue it
        """
        message_str = self._websocket.recv()
        message = self._marshalling.to_internal_format(message_str)
        self._logger.debug(
            "unmarshalled: %s",
            message)
        self.gevent_queue.put((self.name(), message))

    def _run(self):  # pylint: disable-msg=E0202
        """ Enter infinite gevent loop to receive messages from
            WebSocket server and push it to gevent queue
        """
        while True:
            try:
                self._get_next_message()
            except Exception as error:
                self._logger.debug(
                    "Unable to unmarshal message; discarding (%s)",
                    error)
                gevent.sleep(1)  # avoid tight loop

                self._logger.debug("Trying to reconnect...")
                self._connect_websocket()

            gevent.sleep(0)

    def check(self):
        """ implementation of abstract method
        """
        raise NotImplementedError()

    def send(self, message):
        message_str = self._marshalling.to_external_format(message)
        self._logger.debug("message_str: %s", message_str)
        self._websocket.send(message_str)