1. artgins
  2. ginsfsm

Source

ginsfsm / ginsfsm / protocols / sockjs / server / c_transport_websocket.py

The default branch has multiple heads

# -*- coding: utf-8 -*-
"""
    Sockjs Websocket transport implementation
"""
import logging
import random

from pyramid.view import view_config

from ginsfsm.protocols.sockjs.server.proto import json_decode
from ginsfsm.gobj import GObj
from ginsfsm.protocols.sockjs.server.c_websocket import GWebSocket
from ginsfsm.protocols.sockjs.server.session import BaseSession
from ginsfsm.protocols.sockjs.server.session import ConnectionInfo
from ginsfsm.protocols.wsgi.webob.websocket_response import WebsocketResponse
from ginsfsm.deferred import Deferred


#----------------------------------------------------------------#
#                   WebSocketHandler
#----------------------------------------------------------------#
class WebSocketHandler(object):
    """ Mixin to adapt GWebSocket and Sockjs session handler
    """

    def __init__(self, context, request):
        self.context = context
        self.request = request
        channel = request.environ['ginsfsm.channel']
        self.gsock = channel.gsock

    def get_conn_info(self):
        """Return `ConnectionInfo` object from current transport"""
        return ConnectionInfo(
            self.request.remote_addr,  # remote_ip
            self.request.cookies,
            self.request.params,  # arguments
            self.request.headers,
            self.request.path
        )

    def _execute(self):
        #
        #   Accept the connection
        #
        self.ws_connection = self.context.gaplic.create_gobj(
            None,
            GWebSocket,
            None,           # No parent, Iam a Mixin!
            request=self.request,
            gsock=self.gsock,
        )
        self.ws_connection.delete_all_subscriptions()
        # da error al suscribir, no es un gobj, como hago para las callback?
        deferred_open = Deferred(0, self.open)
        deferred_message = Deferred(0, self.on_message)
        deferred_close = Deferred(0, self.on_close)
        self.ws_connection.subscribe_event('EV_ON_OPEN', deferred_open)
        self.ws_connection.subscribe_event('EV_ON_MESSAGE', deferred_message)
        self.ws_connection.subscribe_event('EV_ON_CLOSE', deferred_close)

        #
        #   This will execute ResponseInterrupt.
        #   Now Pyramid has nothing to do.
        #   All server/client dialog will be done by GWebSocket.
        #
        response = WebsocketResponse(self.context, self.request)
        return response

    def write_message(self, message, binary=False):
        """Sends the given message to the client of this Web Socket."""
        self.ws_connection.write_message(message, binary)

    def close(self):
        self.gsock.mt_drop()

#----------------------------------------------------------------#
#                   GWebsocket GClass
#                   /websocket
#----------------------------------------------------------------#
GWEBSOCKET_GCONFIG = {
    'sockjs_server': [None, None, 0, None, ""],
}


class GWebsocket(GObj):
    """  GWebsocket GObj.
    """
    def __init__(self):
        GObj.__init__(self, {}, GWEBSOCKET_GCONFIG)

    def start_up(self):
        """ Initialization zone.
        """


#----------------------------------------------------------------#
#                   GWebsocket Views
#                   /websocket
#----------------------------------------------------------------#
@view_config(
    context=GWebsocket,
    name='',
    attr='execute',
    #request_method='GET',
)
class WebsocketTransport(WebSocketHandler):
    name = 'websocket'

    def __init__(self, context, request):
        super(WebsocketTransport, self).__init__(
            context,
            request,
        )
        self.session = None
        self.active = True

    def execute(self):
        self.sid = self.context.parent.re_matched_name
        response = self._execute()
        return response

    def open(self, event):
        # Stats

        # Disable nagle
        #if self.server.settings['disable_nagle']:
        #    self.gsock.socket.setsockopt(
        #        socket.SOL_TCP, socket.TCP_NODELAY, 1)

        # Handle session
        self.session = self.context.sockjs_server.create_session(
            self.sid,
            register=False
        )

        if not self.session.set_handler(self):
            self.close()
            return

        self.session.verify_state()

        if self.session:
            self.session.flush()

    def _detach(self):
        if self.session is not None:
            self.session.remove_handler(self)
            self.session = None

    def on_message(self, event):
        message = event.data
        # SockJS requires that empty messages should be ignored
        if not message or not self.session:
            return

        try:
            msg = json_decode(message)

            if isinstance(msg, list):
                self.session.on_messages(msg)
            else:
                self.session.on_messages((msg,))
        except Exception:
            logging.exception('ERROR WebSocket json_decode')

            # Close session on exception
            #self.session.close()

            # Close running connection
            self.gsock.mt_drop()

    def on_close(self, event):
        # Close session if websocket connection was closed
        if self.session is not None:
            # Stats

            # Detach before closing session
            session = self.session
            self._detach()
            session.close()

    def send_pack(self, message, binary=False):
        # Send message
        try:
            self.write_message(message, binary)
        except IOError:
            self.context.sockjs_server.gaplic.add_callback(self.on_close)

    def session_closed(self):
        # If session was closed by the application, terminate websocket
        # connection as well.
        try:
            self.close()
        except IOError:
            pass
        finally:
            self._detach()

    def auto_decode(self):
        return False


#----------------------------------------------------------------#
#                   RawWebsocket Session
#----------------------------------------------------------------#
class RawSession(BaseSession):
    """ Raw session without any sockjs protocol encoding/decoding.
        Simply works as a proxy between `SockJSConnection` class
        and `RawWebSocketTransport`.
    """
    def send_message(self, msg, stats=True, binary=False):
        self.handler.send_pack(msg, binary)

    def on_message(self, msg):
        self.conn.on_message(msg)


#----------------------------------------------------------------#
#                   GRawWebsocket GClass
#                   /rawwebsocket
#----------------------------------------------------------------#
GRAWWEBSOCKET_GCONFIG = {
    'sockjs_server': [None, None, 0, None, ""],
}


class GRawWebsocket(GObj):
    """  GRawWebsocket GObj.
    """
    def __init__(self):
        GObj.__init__(self, {}, GRAWWEBSOCKET_GCONFIG)

    def start_up(self):
        """ Initialization zone.
        """


#----------------------------------------------------------------#
#                   GRawWebsocket Views
#                   /rawwebsocket
#----------------------------------------------------------------#
@view_config(
    context=GRawWebsocket,
    name='',
    attr='execute',
    #request_method='GET',
)
class RawWebsocketTransport(WebSocketHandler):
    name = 'rawwebsocket'

    def __init__(self, context, request):
        super(RawWebsocketTransport, self).__init__(
            context,
            request,
        )
        self.session = None
        self.active = True

    def get(self):
        # session
        response = self.response
        session_id = self.sid = '%0.9d' % random.randint(1, 2147483647)

        manager = self.session_manager

        sid = '%0.9d' % random.randint(1, 2147483647)
        #if self.per_user:
        #    sid = (authenticated_userid(request), sid)
        """
        session = manager.get(sid, True, request=request)
        request.environ['wsgi.sockjs_session'] = session

        # websocket
        if 'HTTP_ORIGIN' in request.environ:
            return HTTPNotFound()

        res = init_websocket(request)
        if res is not None:
            return res

        return RawWebSocketTransport(session, request)
        """

    def open(self):
        # Stats

        # Disable nagle if needed
        #if self.server.settings['disable_nagle']:
        #    self.gsock.socket.setsockopt(
        #        socket.SOL_TCP, socket.TCP_NODELAY, 1)

        # Create and attach to session
        self.session = RawSession(
            self.server.get_connection_class(),
            self.server
        )
        self.session.set_handler(self)
        self.session.verify_state()

    def _detach(self):
        if self.session is not None:
            self.session.remove_handler(self)
            self.session = None

    def on_message(self, message):
        # SockJS requires that empty messages should be ignored
        if not message or not self.session:
            return

        try:
            self.session.on_message(message)
        except Exception:
            logging.exception('RawWebSocket')

            # Close running connection
            self._detach()
            self.abort_connection()

    def on_close(self):
        # Close session if websocket connection was closed
        if self.session is not None:
            # Stats
            self.server.stats.on_conn_closed()

            session = self.session
            self._detach()
            session.close()

    def send_pack(self, message, binary=False):
        # Send message
        try:
            self.write_message(message, binary)
        except IOError:
            self.server.io_loop.add_callback(self.on_close)

    def session_closed(self):
        try:
            self.close()
        except IOError:
            pass
        finally:
            self._detach()