1. artgins
  2. ginsfsm

Commits

Ginés Martínez Sánchez  committed fdceb5a Draft

refactoring sockjs

  • Participants
  • Parent commits b261cac
  • Branches default

Comments (0)

Files changed (13)

File docs/TODO.rst

View file
  • Ignore whitespace
+Este es un mix, que usado desde pyramid,
+le permite meter eventos en nuestro gobj-ecosistema.
+
+Los mixes son cajas que comunican el mundo exterior con el gobj-ecosistema.
+EL mundo exterior llama a funciones del mix, que este internamente convierte
+en eventos que se envian a nuestro gobj-ecosistema.
+Los mixes son de solo-entrada.
+Hacia fuera se podría dar información, que no eventos,
+pero tendría que ser recogida en modo poll.
+MEJORA: Bueno, tb se pueden usar callbacks para informar al mundo exterior
+en tiempo real de las informaciones cambiantes.
+
+TODO: tb debería permitir recibir eventos del gobj-ecosistema y
+procesarlo a conveniencia.
+Como va a recibir eventos si no es un gobj? No se puede.
+La solución es integrar este código en el GObj correspondiente.
+MEJORA: sí se puede, subscribiendose a un evento con callback!

File ginsfsm/protocols/http/common/response.py

View file
  • Ignore whitespace
 .. autoclass:: HttpResponse
     :members:
 
-Este es un mix, que usado desde pyramid,
-le permite meter eventos en nuestro gobj-ecosistema.
-
-Los mixes son cajas que comunican el mundo exterior con el gobj-ecosistema.
-EL mundo exterior llama a funciones del mix, que este internamente convierte
-en eventos que se envian a nuestro gobj-ecosistema.
-Los mixes son de solo-entrada.
-Hacia fuera se podría dar información, que no eventos,
-pero tendría que ser recogida en modo poll.
-MEJORA: Bueno, tb se pueden usar callbacks para informar al mundo exterior
-en tiempo real de las informaciones cambiantes.
-
-TODO: tb debería permitir recibir eventos del gobj-ecosistema y
-procesarlo a conveniencia.
-Como va a recibir eventos si no es un gobj? No se puede.
-La solución es integrar este código en el GObj correspondiente.
+This gobj can be used as mix.
 
 """
 
 import logging
-import socket
 import time
 
 from ginsfsm.compat import (

File ginsfsm/protocols/sockjs/server/basehandler.py

View file
  • Ignore whitespace
 
 from pyramid.response import Response
 
+from ginsfsm.protocols.wsgi.webob.async_response import AsyncResponse
+
 #----------------------------------------------------------------#
 #                   Base Views
 #----------------------------------------------------------------#
 CACHE_TIME = 31536000
 
 
-class BaseHandler(Response):
-    # same as initialize() of Tornado
-    def __init__(self, context, request):
+class BaseHandler(object):
+    def __init__(self, context, request, async_response=False):
         self.context = context
         self.request = request
-        super(BaseHandler, self).__init__()
-        self.ginsfsm_channel = None
-        self.response = self
+        self.async_response = async_response
+        if async_response:
+            self.response = AsyncResponse()
+        else:
+            self.response = Response()
 
     # Various helpers
     def set_header(self, name, value):
         """Sets the given response header name and value.
-
-        If a datetime is given, we automatically format it according to the
-        HTTP specification. If the value is not a string, we convert it to
-        a string. All header values are then encoded as UTF-8.
         """
-        # self._headers[name] = self._convert_header_value(value)  de tornado
         self.response.headerlist.append((name, value))
 
     def set_status(self, code):
         self.response.status = code
 
     def set_body(self, body):
+        if self.async_response:
+            raise Exception('You cannot write to body in async response')
         self.response.body = body
 
     def enable_cache(self):
         self.response.set_cookie('JSESSIONID', cv)
         return ('Set-Cookie', self.response.headers['Set-Cookie'])
 
-    def finish(self):
-        """Finishes this response, ending the HTTP request."""
-        #self._log_disconnect()
-        #super(BaseHandler, self).finish()
-        self.ginsfsm_channel.finish()
-
-    def safe_finish(self):
-        """ Finish session.
-        If it will blow up - connection was set to Keep-Alive and
-        client dropped connection, ignore any IOError or socket error."""
-        try:
-            self.finish()
-        except:
-            pass
-
     def write(self, data):
-        """ Write data to the network.
-            Must be overridden in your application
+        """ Write data.
+            It will use the configurated response: async or sync.
         """
-        self.ginsfsm_channel.write(data)
+        self.response.write(data)
 
     def flush(self, include_footers=False, callback=None):
         """Flushes the current output buffer to the network.
         if another flush occurs before the previous flush's callback
         has been run, the previous callback will be discarded.
         """
-        self.ginsfsm_channel.flush(callback)
+        if self.async_response:
+            self.response.flush(callback)
+
+    def finish(self):
+        """Finishes this response, ending the HTTP request."""
+        if self.async_response:
+            self.response.finish()
+
+    def safe_finish(self):
+        """ Finish session. Ignore any error.
+        """
+        if self.async_response:
+            self.response.safe_finish()
 
 
 class PreflightHandler(BaseHandler):
     """CORS preflight handler"""
 
-    def __init__(self, context, request):
-        BaseHandler.__init__(self, context, request)
+    def __init__(self, context, request, async_response=False):
+        BaseHandler.__init__(self, context, request, async_response)
 
     def options(self, *args, **kwargs):
         """XHR cross-domain OPTIONS handler"""
-        #response = self.set_response(self.request.response)
-        response = self.response = self.request.response
-        # we override __call__ of self in XhrStreamingTransport!!
-        # y por eso falla cuando se llama a options desde esa clase
-        # No uses aquí una herededada de self (webob response)
+        # Force a clean sync Response.
+        # Several views point here: both async and sync responses.
+        response = self.response = Response()
 
         self.enable_cache()
         self.handle_session_cookie()
             allowed_methods = getattr(self, 'access_methods', 'OPTIONS, POST')
             self.set_header('Access-Control-Allow-Methods', allowed_methods)
             self.set_header('Allow', allowed_methods)
-
             self.set_status(204)
         else:
             # Set forbidden
             self.set_status(403)
 
-        # self.finish() TODO: how tell http-server to finish connection?
         return response
 
     def preflight(self):

File ginsfsm/protocols/sockjs/server/c_sockjs_server.py

View file
  • Ignore whitespace
 class GreetingsHandler(BaseHandler):
     """SockJS greetings page handler"""
     def get(self):
-        #response = self.set_response(self.request.response)
-        response = self
+        response = self.response
         self.enable_cache()
         response.content_type = 'text/plain'
         response.charset = 'UTF-8'

File ginsfsm/protocols/sockjs/server/session.py

View file
  • Ignore whitespace
 
     def delayed_close(self):
         """Delayed close - won't close immediately, but on next ioloop tick."""
+        print "DELAYEDDDDDDDDDDD CLOSE"
         self.state = CLOSING
         self.server.gaplic.add_callback(self.close)
 
         if not forced and self.handler is not None and not self.is_closed:
             self.promote()
         else:
+            print "CLOSE ON ON_DELETEEEEEEEEEEE"
             self.close()
 
     # Add session
         """Flush message queue if there's an active connection running"""
         self._pending_flush = False
 
-        if self.handler is None or not self.handler.active or not self.send_queue:
+        if self.handler is None or \
+                not self.handler.active or \
+                not self.send_queue:
             return
 
         self.handler.send_pack('a[%s]' % self.send_queue)

File ginsfsm/protocols/sockjs/server/static.py

View file
  • Ignore whitespace
         self.access_methods = 'OPTIONS, GET'
 
     def get(self):
-        #response = self.set_response(self.request.response)
-        response = self
+        response = self.response
         response.content_type = 'application/json'
         response.charset = 'UTF-8'
         self.preflight()
 class IFrameHandler(BaseHandler):
     """SockJS IFrame page handler"""
     def get(self):
-        #response = self.set_response(self.request.response)
-        response = self
+        response = self.response
         data = IFRAME_TEXT % self.context.sockjs_server.sockjs_url
         hsh = hashlib.md5(data).hexdigest()
 

File ginsfsm/protocols/sockjs/server/transports/pollingbase.py

View file
  • Ignore whitespace
 from ginsfsm.protocols.sockjs.server.transports import base
 
 
-class PollingTransportBase(basehandler.PreflightHandler, base.BaseTransportMixin):
+class PollingTransportBase(basehandler.PreflightHandler,
+                           base.BaseTransportMixin):
     """Polling transport handler base class"""
-    def __init__(self, context, request):
-        super(PollingTransportBase, self).__init__(context, request)
+    def __init__(self, context, request, async_response=False):
+        super(PollingTransportBase, self).__init__(
+            context,
+            request,
+            async_response
+        )
         self.session = None
         self.active = True
 

File ginsfsm/protocols/sockjs/server/transports/streamingbase.py

View file
  • Ignore whitespace
 
 class StreamingTransportBase(pollingbase.PollingTransportBase):
     def __init__(self, context, request):
-        super(StreamingTransportBase, self).__init__(context, request)
+        super(StreamingTransportBase, self).__init__(context, request, True)
 
         self.amount_limit = self.context.sockjs_server.response_limit
 

File ginsfsm/protocols/sockjs/server/transports/xhr.py

View file
  • Ignore whitespace
     name = 'xhr'
 
     def post(self):
-        #response = self.set_response(self.request.response)
-        #response = self.set_response(self)  # Response is self.
-        response = self
-        # Start response
+        response = self.response
         self.preflight()
         self.handle_session_cookie()
         self.disable_cache()
             # instead of propagating error all way up.
 
             # TODO: ERROR self.session.delayed_close()
-            pass
+            raise
 
 #----------------------------------------------------------------#
 #                   GXhrSend GClass
 )
 class XhrSendHandler(pollingbase.PollingTransportBase):
     def post(self):
-        #response = self.set_response(self.request.response)
-        #response = self.set_response(self)  # Response is self.
-        response = self
+        response = self.response
 
         self.sid = self.context.parent.re_matched_name  # session_id
         if self.context.sockjs_server.per_user:

File ginsfsm/protocols/sockjs/server/transports/xhrstreaming.py

View file
  • Ignore whitespace
     name = 'xhr_streaming'
 
     def post(self):
-        #response = self.set_response(self.request.response)
-        #response = self.set_response(self)  # Response is self.
-        response = self
-        response.headers = ()  # important trick: clean default webob headers.
+        response = self.response
 
         self.sid = self.context.parent.re_matched_name  # session_id
         if self.context.sockjs_server.per_user:
         self.disable_cache()
         response.content_type = 'application/javascript'
         response.charset = 'UTF-8'
-
-        self.ginsfsm_channel = self.request.environ['ginsfsm.channel']
-
-        return self  # Webob will execute __call__
-
-    def __call__(self, environ, start_response):
-        start_response(self.status, self._abs_headerlist(environ))
-        self.ginsfsm_channel.write('h' * 2048 + '\n')
-        self.ginsfsm_channel.flush()
+        response.write('h' * 2048 + '\n')
 
         if not self._attach_session(self.sid, False):
             self.finish()
             return ()
 
+        """
+
         if self.session:
             self.ginsfsm_channel.flush()
             #self.session.flush()
-        raise ResponseInterrupt()
+        """
+
+        return response
 
     def send_pack(self, message, binary=False):
         if binary:
             raise Exception('binary not supported for XhrStreamingTransport')
 
         print ">>>>>>>>>>>>>>>>>>>>>>> SEND_PACK:", message
-        self.active = False  # TODO si chequeo active fallan los tests
+        # self.active = False  # TODO si chequeo active fallan los tests
+        # se vuelve a poner active a true al ejecutar send_complete
+        # parece como un flow control
 
         try:
             self.notify_sent(len(message))
-
             self.write(message + '\n')
             self.flush(callback=self.send_complete)
         except:
+            raise
             # If connection dropped, make sure we close offending session instead
             # of propagating error all way up.
+            print "??????????????????????????'"
             self.session.delayed_close()
             self._detach()

File ginsfsm/protocols/wsgi/common/wsgi_response.py

View file
  • Ignore whitespace
 
         environ = {}
         # ginsfsm variables to do asynchronous response
-        # you can access directly to http channel with functions:
+        # you can access directly to http channel with the next functions:
         #   write(data):        Write data to output buffer.
         #   flush(callback):    Flush output buffer to socket,
         #                       and call callback when transmit ready event.

File ginsfsm/protocols/wsgi/webob/__init__.py

View file
  • Ignore whitespace
+"""
+Derived WebOb/Pyramid Response to do asynchronous responses.
+"""

File ginsfsm/protocols/wsgi/webob/async_response.py

View file
  • Ignore whitespace
+# -*- encoding: utf-8 -*-
+"""
+    AsyncResponse -  an asynchronous WebOb Response over ginsfsm.
+"""
+import logging
+
+from ginsfsm.protocols.http.server.c_http_clisrv import ResponseInterrupt
+
+try:
+    from pyramid.response import Response
+except ImportError:  # pragma: no cover
+    try:
+        from webob import Response
+    except ImportError:  # pragma: no cover
+        raise Exception('async_response is depending of Pyramid or WebOb')
+
+
+class AsyncResponse(Response):
+    def __init__(self, **kw):
+        super(AsyncResponse, self).__init__(**kw)
+        self.ginsfsm_channel = None
+
+        # Important trick to remove Content-Length.
+        del self.app_iter
+        # WARNING: you cannot use body, because it sets Content-Length.
+
+    def __call__(self, environ, start_response):
+        "Override WSGI call"
+        self.ginsfsm_channel = environ['ginsfsm.channel']
+
+        start_response(self.status, self._abs_headerlist(environ))
+
+        # send the current body to the network
+        for chunk in self.app_iter:
+            self.write(chunk)
+            self.flush()
+        del self.app_iter
+
+        raise ResponseInterrupt()
+
+    def write(self, data):
+        """ Write data to http channel.
+        """
+        if self.ginsfsm_channel:
+            self.ginsfsm_channel.write(data)
+        else:
+            super(AsyncResponse, self).write(data)
+
+    def flush(self, callback=None):
+        """Flushes the current output buffer to the network.
+
+        The ``callback`` argument, if given, can be used for flow control:
+        it will be run when all flushed data has been written to the socket.
+        Note that only one flush callback can be outstanding at a time;
+        if another flush occurs before the previous flush's callback
+        has been run, the previous callback will be discarded.
+        """
+        if self.ginsfsm_channel:
+            self.ginsfsm_channel.flush(callback)
+        else:
+            logging.error('ERROR async Flush before set ginsfsm_channel')
+
+    def finish(self):
+        """Finishes this response, ending the HTTP request."""
+        if self.ginsfsm_channel:
+            self.ginsfsm_channel.finish()
+        else:
+            logging.error('ERROR async FINISH before set ginsfsm_channel')
+
+    def safe_finish(self):
+        """ Finish session.
+        If it will blow up - connection was set to Keep-Alive and
+        client dropped connection, ignore any IOError or socket error."""
+        try:
+            self.finish()
+        except:
+            pass