1. artgins
  2. ginsfsm

Commits

Ginés Martínez Sánchez  committed 38c2534 Draft

refactoring sockjs work done until now

  • Participants
  • Parent commits 2ac028c
  • Branches default

Comments (0)

Files changed (7)

File ginsfsm/protocols/sockjs/server/basehandler.py

View file
         """
         self.response.write(data)
 
-    def flush(self, include_footers=False, callback=None):
+    def flush(self, callback=None):
         """Flushes the current output buffer to the network.
 
         The ``callback`` argument, if given, can be used for flow control:
         """
         if self.async_response:
             self.response.flush(callback)
+        else:
+            if callback:
+                self.context.gaplic.add_callback(callback)
 
     def finish(self):
         """Finishes this response, ending the HTTP request."""
         """ Finish session. Ignore any error.
         """
         if self.async_response:
-            self.response.safe_finish()
+            try:
+                self.response.finish()
+            except:
+                pass
 
 
 class PreflightHandler(BaseHandler):
     """CORS preflight handler"""
 
     def __init__(self, context, request, async_response=False):
-        BaseHandler.__init__(self, context, request, async_response)
+        super(PreflightHandler, self).__init__(
+            context, request, async_response
+        )
 
     def options(self, *args, **kwargs):
         """XHR cross-domain OPTIONS handler"""

File ginsfsm/protocols/sockjs/server/c_session.py

View file
 
 """
 from ginsfsm.gobj import GObj
-from ginsfsm.protocols.sockjs.server.transports.xhrstreaming import (
+from ginsfsm.protocols.sockjs.server.c_xhr import (
+    GXhrSend,
+    GXhrPolling,
     GXhrStreaming,
 )
-from ginsfsm.protocols.sockjs.server.transports.xhr import (
-    GXhrPolling,
-    GXhrSend,
-)
 
 #----------------------------------------------------------------#
 #                   GSockjsSession GClass

File ginsfsm/protocols/sockjs/server/c_xhr.py

View file
+# -*- coding: utf-8 -*-
+"""
+    Xhr implementation
+"""
+import logging
+
+from pyramid.view import view_config
+from pyramid.httpexceptions import HTTPNotFound, HTTPServerError
+
+from ginsfsm.gobj import GObj
+from ginsfsm.protocols.sockjs.server import proto
+from ginsfsm.protocols.sockjs.server.basehandler import PreflightHandler
+from ginsfsm.protocols.sockjs.server.session import ConnectionInfo
+
+#----------------------------------------------------------------#
+#                   GXhrSend GClass
+#                   /xhr_send
+#----------------------------------------------------------------#
+GXHRSEND_GCONFIG = {
+    'sockjs_server': [None, None, 0, None, ""],
+}
+
+
+class GXhrSend(GObj):
+    """  GXhr GObj.
+    """
+    def __init__(self):
+        GObj.__init__(self, {}, GXHRSEND_GCONFIG)
+
+    def start_up(self):
+        """ Initialization zone.
+        """
+
+
+#----------------------------------------------------------------#
+#                   GXhrSend Views
+#                   /xhr_send
+#----------------------------------------------------------------#
+@view_config(
+    context=GXhrSend,
+    name='',
+    attr='options',
+    request_method='OPTIONS',
+)
+@view_config(
+    context=GXhrSend,
+    name='',
+    attr='post',
+    request_method='POST',
+)
+class XhrSendHandler(PreflightHandler):
+    def post(self):
+        response = self.response
+
+        # Start response
+        self.preflight()
+        self.handle_session_cookie()
+        self.disable_cache()
+
+        session_id = self.sid = self.context.parent.re_matched_name
+        session = self.context.sockjs_server.get_session(session_id)
+        if session is None:
+            return HTTPNotFound('Session not found')
+
+        data = self.request.body_file.read()
+        if not data:
+            return HTTPServerError("Payload expected.")
+
+        try:
+            messages = proto.json_decode(data)
+        except:
+            return HTTPServerError("Broken JSON encoding.")
+
+        try:
+            session.on_messages(messages)
+        except Exception:
+            logging.exception('XHR incoming')
+            return HTTPServerError('XHR incoming')
+
+        self.set_status(204)
+        self.response.content_type = 'text/plain'
+        self.response.charset = 'UTF-8'
+        return response
+
+
+#----------------------------------------------------------------#
+#                   GXhrPolling GClass
+#                   /xhr
+#----------------------------------------------------------------#
+GXHRPOLLING_GCONFIG = {
+    'sockjs_server': [None, None, 0, None, ""],
+}
+
+
+class GXhrPolling(GObj):
+    """  GXhrPolling GObj.
+    """
+    def __init__(self):
+        GObj.__init__(self, {}, GXHRPOLLING_GCONFIG)
+
+    def start_up(self):
+        """ Initialization zone.
+        """
+
+
+#----------------------------------------------------------------#
+#                   GXhrPolling Views
+#                   /xhr
+#----------------------------------------------------------------#
+@view_config(
+    context=GXhrPolling,
+    name='',
+    attr='options',
+    request_method='OPTIONS',
+)
+@view_config(
+    context=GXhrPolling,
+    name='',
+    attr='post',
+    request_method='POST',
+)
+class XhrPollingTransport(PreflightHandler):
+    """xhr-polling transport implementation"""
+    name = 'xhr'
+    session = None
+
+    def __init__(self, context, request):
+        super(XhrPollingTransport, self).__init__(
+            context,
+            request,
+            False
+        )
+        self.session = None
+        self.active = True
+
+    def post(self):
+        response = self.response
+
+        # Start response
+        self.preflight()
+        self.handle_session_cookie()
+        self.disable_cache()
+
+        session_id = self.sid = self.context.parent.re_matched_name
+
+        # Get or create session without starting heartbeat
+        session = self.context.sockjs_server.get_session(self.sid)
+        if session is None:
+            session = self.context.sockjs_server.create_session(session_id)
+        if session is None:
+            return HTTPServerError("ERROR creating session.")
+
+        # Try to attach to the session
+        if not session.set_handler(self, False):
+            return response
+        self.session = session
+        # Verify if session is properly opened
+        session.verify_state()
+
+        # Might get already detached because connection was closed in on_open
+        if not self.session:
+            return response
+
+        if not self.session.send_queue:
+            self.session.start_heartbeat()
+        else:
+            self.session.flush()
+        return response
+
+    def send_pack(self, message, binary=False):
+        if binary:
+            raise Exception('binary not supported for XhrPollingTransport')
+
+        if not self.active:
+            return
+        self.active = False
+
+        try:
+            self.response.content_type = 'application/javascript'
+            self.response.charset = 'UTF-8'
+
+            print "======> SEND_PACK SYNC", message
+            self.write(message + '\n')
+            self.flush()
+            self.send_complete()
+        except:
+            # If connection dropped, make sure we close offending session
+            # instead of propagating error all way up.
+            if self.session:
+                self.session.delayed_close()
+
+    def send_complete(self):
+        if self.session:  # detach session
+            self.session.remove_handler(self)
+            self.session = None
+        self.active = True
+
+    def session_closed(self):
+        """Called by the session when it was closed"""
+        if self.session:  # detach session
+            self.session.remove_handler(self)
+            self.session = None
+
+    def get_conn_info(self):
+        """Return `ConnectionInfo` object from current transport"""
+        return ConnectionInfo(
+            self.request.remote_addr,  # remote_ip
+            self.request.cookies,
+            self.request.params,  # arguments
+            self.request.headers,
+            self.request.path
+        )
+
+#----------------------------------------------------------------#
+#                   GXhrStreaming GClass
+#                   /xhr_streaming
+#----------------------------------------------------------------#
+GXHRSTREAMING_GCONFIG = {
+    'sockjs_server': [None, None, 0, None, ""],
+}
+
+
+class GXhrStreaming(GObj):
+    """  GStreaming GObj.
+    """
+    def __init__(self):
+        GObj.__init__(self, {}, GXHRSTREAMING_GCONFIG)
+
+    def start_up(self):
+        """ Initialization zone.
+        """
+
+
+#----------------------------------------------------------------#
+#                   GXhrStreaming Views
+#----------------------------------------------------------------#
+#                   /xhr_streaming
+@view_config(
+    context=GXhrStreaming,
+    name='',
+    attr='options',
+    request_method='OPTIONS',
+)
+@view_config(
+    context=GXhrStreaming,
+    name='',
+    attr='post',
+    request_method='POST',
+)
+class XhrStreamingTransport(PreflightHandler):
+    name = 'xhr_streaming'
+    session = None
+
+    def __init__(self, context, request):
+        super(XhrStreamingTransport, self).__init__(
+            context,
+            request,
+            True  # Asynchronous response!
+        )
+        self.session = None
+        self.active = True
+        self.amount_limit = self.context.sockjs_server.response_limit
+
+    def post(self):
+        response = self.response
+
+        session_id = self.sid = self.context.parent.re_matched_name
+
+        # Start response
+        self.preflight()
+        self.handle_session_cookie()
+        self.disable_cache()
+
+        response.content_type = 'application/javascript'
+        response.charset = 'UTF-8'
+        response.write('h' * 2048 + '\n')
+
+        # Get or create session without starting heartbeat
+        session = self.context.sockjs_server.get_session(session_id)
+        if session is None:
+            session = self.context.sockjs_server.create_session(session_id)
+        if session is None:
+            # close the session in the next cycle.
+            #self.context.gaplic.add_callback(self.xxx)
+            return response  # how inform of the error? headers has been sent.
+
+        # Try to attach to the session
+        if not session.set_handler(self, False):
+            # close the session in the next cycle.
+            #self.context.gaplic.add_callback(self.xxx)
+            return response  # how inform of the error? headers has been sent.
+        self.session = session
+        # Verify if session is properly opened
+        session.verify_state()
+
+        session.flush()
+
+        return response
+
+    def send_pack(self, message, binary=False):
+        if binary:
+            raise Exception('binary not supported for XhrStreamingTransport')
+
+        # self.active = False  # TODO si chequeo active fallan los tests
+        # se vuelve a poner active a true al ejecutar send_complete
+        # parece como un flow control
+
+        try:
+            print "======> SEND_PACK AAAASYNC", message
+            self.notify_sent(len(message))
+            self.write(message + '\n')
+            self.flush(callback=self.send_complete)
+        except:
+            # If connection dropped, make sure we close offending session
+            # instead of propagating error all way up.
+            self.session.delayed_close()
+            if self.session:  # detach session
+                self.session.remove_handler(self)
+                self.session = None
+
+    def send_complete(self):
+        """
+            Verify if connection should be closed based on amount of data
+            that was sent.
+        """
+
+        if self.should_finish():
+            if self.session:  # detach session
+                self.session.remove_handler(self)
+                self.session = None
+            self.context.gaplic.add_callback(self.safe_finish)
+        else:
+            if self.session:
+                self.session.flush()
+
+    def notify_sent(self, data_len):
+        """
+            Update amount of data sent
+        """
+        self.amount_limit -= data_len
+
+    def should_finish(self):
+        """
+            Check if transport should close long running connection after
+            sending X bytes to the client.
+
+            `data_len`
+                Amount of data that was sent
+        """
+        if self.amount_limit <= 0:
+            return True
+
+        return False
+
+    def session_closed(self):
+        """Called by the session when it was closed"""
+        if self.session:  # detach session
+            self.session.remove_handler(self)
+            self.session = None
+        self.context.gaplic.add_callback(self.safe_finish)
+
+    def get_conn_info(self):
+        """Return `ConnectionInfo` object from current transport"""
+        return ConnectionInfo(
+            self.request.remote_addr,  # remote_ip
+            self.request.cookies,
+            self.request.params,  # arguments
+            self.request.headers,
+            self.request.path
+        )

File ginsfsm/protocols/sockjs/server/session.py

View file
 
 class BaseSession(object):
     """Base session implementation class"""
+
     def __init__(self, conn, server):
         """Base constructor.
 
             msg = msg.encode('utf-8')
 
         if self._immediate_flush:
-            if self.handler and self.handler.active and not self.send_queue:
+            if self.handler and not self.send_queue:
                 # Send message right away
                 self.handler.send_pack('a[%s]' % msg)
             else:
         """Flush message queue if there's an active connection running"""
         self._pending_flush = False
 
-        if self.handler is None or \
-                not self.handler.active or \
-                not self.send_queue:
+        if self.handler is None or not self.send_queue:
             return
 
         self.handler.send_pack('a[%s]' % self.send_queue)

File ginsfsm/protocols/sockjs/server/transports/pollingbase.py

View file
 # -*- coding: utf-8 -*-
 """
-    ginsfsm.protocols.sockjs.server.transports.pollingbase
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
     Polling transports base
 """
 
     def check_xsrf_cookie(self):
         pass
 
-    def send_message(self, message, binary=False):
-        """Called by the session when some data is available"""
-        raise NotImplementedError()
-
     def session_closed(self):
         """Called by the session when it was closed"""
         self._detach()

File ginsfsm/protocols/sockjs/server/transports/xhrstreaming.py

View file
 from pyramid.security import authenticated_userid
 
 from ginsfsm.gobj import GObj
-from ginsfsm.protocols.http.server.c_http_clisrv import ResponseInterrupt
 from ginsfsm.protocols.sockjs.server.transports import streamingbase
 
 #----------------------------------------------------------------#

File ginsfsm/protocols/wsgi/webob/async_response.py

View file
         super(AsyncResponse, self).__init__(**kw)
         self.ginsfsm_channel = None
 
-        # Important trick to remove Content-Length.
-        del self.app_iter
+        del self.app_iter  # Important trick to remove Content-Length.
         # WARNING: you cannot use body, because it sets Content-Length.
 
     def __call__(self, environ, start_response):
             self.ginsfsm_channel.finish()
         else:
             logging.error('ERROR async FINISH before set ginsfsm_channel')
-
-    def safe_finish(self):
-        """ Finish session.
-        If it will blow up - connection was set to Keep-Alive and
-        client dropped connection, ignore any IOError or socket error."""
-        try:
-            self.finish()
-        except:
-            pass