Commits

Ginés Martínez Sánchez  committed 36ee51a Draft

XhrStreaming sockjs test passed!, but need clear the code

  • Participants
  • Parent commits 4cbefdc

Comments (0)

Files changed (9)

File ginsfsm/gobj.py

             if gobj.re_name:
                 if gobj._re_compiled_name.match(name) is not None:
                     # this gobj has been got as re_matched_name.
-                    print "matched------------->", name, "-", gobj.re_name
                     gobj.re_matched_name = name
                     return gobj
             else:

File ginsfsm/protocols/http/common/response.py

 import logging
 import socket
 import time
-from ginsfsm.deferred import DeferredInterrupt
 
 from ginsfsm.compat import (
     tobytes,
 class HttpResponse(object):
     def __init__(self, request):
         self.request = request
+        self.channel = request.channel
         self.gsock = request.channel.gsock
-        self.channel = request.channel
         self.response_headers = []
         version = request.version
         if version not in ('1.0', '1.1'):
         self.complete = False
         self.chunked_response = False
 
-    def service(self):
-        try:
-            try:
-                self.start()
-                self.execute()
-                self.finish()
-            except socket.error:
-                self.close_on_finish = True
-                raise  # ??? if self.gsock.adj.log_socket_errors:
-        except DeferredInterrupt:
-            raise
-        finally:
-            pass
+    def execute(self):
+        """ Execute the response.
+            Must be overridden.
+        """
 
     def build_response_header(self):
         version = self.version
         else:
             response_headers.append(('Via', ident))
         if not date_header:
+            if not self.start_time:
+                self.start_time = time.time()
             response_headers.append(('Date', build_http_date(self.start_time)))
 
         first_line = 'HTTP/%s %s' % (self.version, self.status)
         self.start_time = time.time()
 
     def finish(self):
+        """ Finishes this response,
+            flushing output buffer,
+            and ending the HTTP request.
+        """
         if not self.wrote_header:
             self.write(b'')
         if self.chunked_response:
         if self.close_on_finish:
             self.gsock.close_when_flushed = True
         self.request._close()
+        self.flush()
+
+    def flush(self):
+        """ Flush output buffer.
+        """
         self.channel.send_event(self.gsock, 'EV_FLUSH_OUTPUT_DATA')
 
     def write(self, data):
+        """ Write data to output buffer.
+        """
         if not self.complete:
             raise RuntimeError('start_response was not called before body '
                                'written')

File ginsfsm/protocols/http/server/c_http_clisrv.py

 GObj :class:`GHttpCliSrv`
 =========================
 
+Http Channel.
+
 .. autoclass:: GHttpCliSrv
     :members:
 
 from ginsfsm.gobj import GObj
 from ginsfsm.protocols.http.common.parser import HTTPRequestParser
 from ginsfsm.protocols.http.common.utilities import InternalServerError
-from ginsfsm.protocols.http.common.response import HttpErrorResponse
+from ginsfsm.protocols.http.common.response import (
+    HttpResponse,
+    HttpErrorResponse,
+)
 
 
-class LateResponseInterrupt(Exception):
-    """ Use when the response is asynchronous.
+class ResponseInterrupt(Exception):
+    """ To use when the response is asynchronous
+        or infinite until top level wants finish it.
     """
-    def __init__(self, request_reference):
+    def __init__(self, reference_request):
         Exception.__init__(self)
-        self.request_reference = request_reference
-
-
-class InfiniteResponseInterrupt(Exception):
-    """ Use when the response is infinite.
-    """
-    def __init__(self, request_reference):
-        Exception.__init__(self)
-        self.request_reference = request_reference
+        self.reference_request = reference_request
 
 
 def ac_disconnected(self, event):
 
 
 def ac_http_request(self, event):
+    """ Internal event.
+        The request is saved in self.responding_request.
+    """
     self.stop_inactivity_timer()
 
     if self.responding_request.error:
         response = HttpErrorResponse(self.responding_request)
-        response.service()
-        self.clear_request_queue()
+        response.execute()
+        self.finish(response)
         return
 
     self.start_responseless_timer()
+    self.set_new_state('ST_WAIT_RESPONSE')
     # TODO: in stratus environment, we need to inform of who srvcli is.
     self.broadcast_event(
         'EV_HTTP_REQUEST',
 
 def ac_http_response(self, event):
     response = event.response
+    if not isinstance(response, HttpResponse):
+        logging.error("ERROR response doesn't mach HttpResponse %s" %
+            response.request.path)
+
     if response.request != self.responding_request:
         logging.error("ERROR response doesn't mach responding request %s" %
             response.request.path)
 
+    if self.interrupted_response:
+        logging.error("ERROR interrupted_response is BUSY, of %s" %
+            response.request.path)
+
+    response.start()
     try:
-        response.service()
-    except LateResponseInterrupt as e:
-        """ Response is asynchronous.
-            Request will be response later.
+        response.execute()
+    except ResponseInterrupt as e:
+        """ Response is asynchronous or infinite.
             Don't clear the current responding_request.
         """
-        if e.request_reference != self.responding_request:
+        if e.reference_request != self.responding_request:
             logging.error("ERROR response doesn't mach responding request %s" %
                 response.request.path)
-
-    except InfiniteResponseInterrupt as e:
-        """ Response is infinite.
-            Request will be response with chunks.
-            Don't clear the current responding_request.
-        """
-        if e.request_reference != self.responding_request:
-            logging.error("ERROR response doesn't mach responding request %s" %
-                response.request.path)
+        self.interrupted_response = response
+        self.stop_responseless_timer()  # TODO: do some ping-alive
+        return
 
     except:
         logging.exception('Exception when serving %s' % response.request.path)
             request = HTTPRequestParser(self)
             request.error = InternalServerError(body)
             response = HttpErrorResponse(request)
-            response.service()
+            response.execute()
         else:
             response.close_on_finish = True
 
-    self.stop_responseless_timer()
-    self.responding_request = None
-
-    if response.close_on_finish:
-        self.clear_request_queue()
-        return
-
-    # pull the request queue
-    self.post_event(self, 'EV_DEQUEUE_REQUEST')
+    self.finish(response)
 
 
 def ac_transmit_ready(self, event):
     request = HTTPRequestParser(self)
     request.error = InternalServerError(body)
     response = HttpErrorResponse(request)
-    response.service()
-    self.clear_request_queue()
+    response.execute()
+    self.finish(response)
 
 
 GHTTPCLISRV_FSM = {
             ('EV_INACTIVITY_TIMEOUT',   ac_inactivity_timeout,      None),
             ('EV_RX_DATA',              ac_rx_data,                 None),
             ('EV_DEQUEUE_REQUEST',      ac_dequeue_request,         None),
-            ('EV_HTTP_REQUEST',         ac_http_request,   'ST_WAIT_RESPONSE'),
+            ('EV_HTTP_REQUEST',         ac_http_request,            None),
         ),
         'ST_WAIT_RESPONSE':
         (
             ('EV_DISCONNECTED',         ac_disconnected,            None),
-            ('EV_RESPONSELESS_TIMEOUT', ac_responseless_timeout,    None),
+            ('EV_RESPONSELESS_TIMEOUT', ac_responseless_timeout,    'ST_IDLE'),
             ('EV_RX_DATA',              ac_rx_data,                 None),
             ('EV_DEQUEUE_REQUEST',      None,                       None),
             ('EV_HTTP_RESPONSE',        ac_http_response,           'ST_IDLE'),
         GObj.__init__(self, GHTTPCLISRV_FSM, GHTTPCLISRV_GCONFIG)
         self.current_request = None  # A request parser instance
         self.responding_request = None  # resquest waiting top response
+        self.interrupted_response = None  # current async or infinite response.
         self.dl_requests = deque()  # requests queue
         self.sent_continue = False  # used as a latch after sending 100continue
 
             'EV_SET_TIMER',
             seconds=-1
         )
+
+    def write(self, data):
+        """ Write data to output buffer.
+            To supply asynchronous access to high level.
+        """
+        if not self.interrupted_response:
+            logging.error("ERROR channel.write() with no interrupted_response")
+            return
+        self.interrupted_response(data)
+
+    def flush(self):
+        """ Flush output buffer.
+            To supply asynchronous access to high level.
+        """
+        if not self.interrupted_response:
+            logging.error("ERROR channel.flush() with no interrupted_response")
+            return
+        self.flush()
+
+    def finish(self, response=None):
+        """ Finishes this response,
+            flushing output buffer,
+            and ending the HTTP request.
+        """
+        if response is None:
+            if not self.interrupted_response:
+                logging.error("ERROR channel.finish() with no interrupted_response")
+                return
+            response = self.interrupted_response
+        response.finish()
+        self.stop_responseless_timer()
+        self.responding_request = None
+
+        if response.close_on_finish:
+            # ignore all enqueued requests.
+            self.clear_request_queue()
+            return
+
+        # pull the request queue
+        self.post_event(self, 'EV_DEQUEUE_REQUEST')

File ginsfsm/protocols/sockjs/server/basehandler.py

         self.context = context
         self.request = request
         super(BaseHandler, self).__init__()
+        self.ginsfsm_request = None
+        self.ginsfsm_response = None
 
     def set_response(self, response):
         """ response can be request.response for static urls
         self.response = response
         return response
 
-    def finish(self):
-        """Finishes this response, ending the HTTP request."""
-        #self._log_disconnect()
-        #super(BaseHandler, self).finish()
-
     # Various helpers
     def set_header(self, name, value):
         """Sets the given response header name and value.
         self.response.set_cookie('JSESSIONID', cv)
         return ('Set-Cookie', self.response.headers['Set-Cookie'])
 
+    def finish(self):
+        """Finishes this response, ending the HTTP request."""
+        #self._log_disconnect()
+        #super(BaseHandler, self).finish()
+        if self.ginsfsm_response:
+            self.ginsfsm_response.finish()
+
     def safe_finish(self):
-        """Finish session. If it will blow up - connection was set to Keep-Alive and
+        """ Finish session.
+        If it will blow up - connection was set to Keep-Alive and
         client dropped connection, ignore any IOError or socket error."""
         self.finish()
 
         """ Write data to the network.
             Must be overridden in your application
         """
-        raise NotImplementedError()
+        if self.ginsfsm_response:
+            self.ginsfsm_response.write(data)
 
     def flush(self, include_footers=False, callback=None):
         """Flushes the current output buffer to the network.
         if another flush occurs before the previous flush's callback
         has been run, the previous callback will be discarded.
         """
+        if self.ginsfsm_response:
+            self.ginsfsm_response.flush()
         if callback:
             # TODO: must be executed at on event WRITE
             # instead next pooling cycle.

File ginsfsm/protocols/sockjs/server/c_session.py

 
 
 class GSockjsSession(GObj):
-    """  GSockjsSession GObj.
+    """  GSockjsSession GObj aka GServerID.
     This gobj treats the {server_id} segment of the url.
     The main reason for this segment is to make it easier
     to configure load balancer

File ginsfsm/protocols/sockjs/server/c_sockjs_server.py

             session_id,
             self.disconnect_delay
         )
-
+        print "CREATE SESSION: id=%s, s=%r" % (session_id, s,)
         if register:
             self._sessions.add(s)
 
         `session_id`
             Session id
         """
-        return self._sessions.get(session_id)
+        s = self._sessions.get(session_id)
+        print "GET SESSION: id=%s, s=%r" % (session_id, s,)
+        return s
 
     def get_connection_class(self):
         """Return associated connection class"""

File ginsfsm/protocols/sockjs/server/transports/xhr.py

 
             print "zzzzz", message
             self.add_body(message + '\n')
-            self.flush(callback=self.send_complete)
+            self.context.gaplic.add_callback(self.send_complete)
+            #self.flush(callback=self.send_complete)
         except:
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
-            self.session.delayed_close()
 
+            # TODO: ERROR self.session.delayed_close()
+            pass
 
 #----------------------------------------------------------------#
 #                   GXhrSend GClass

File ginsfsm/protocols/sockjs/server/transports/xhrstreaming.py

 from pyramid.security import authenticated_userid
 
 from ginsfsm.gobj import GObj
+from ginsfsm.protocols.http.server.c_http_clisrv import ResponseInterrupt
 from ginsfsm.protocols.sockjs.server.transports import streamingbase
 
 #----------------------------------------------------------------#
     name = 'xhr_streaming'
 
     def post(self):
+        #response = self.set_response(self.request.response)
         response = self.set_response(self)  # Response is self.
         self.headers = ()  # important trick: clean default webob headers.
 
-        self.sid = self.context.re_matched_name  # session_id
+        self.sid = self.context.parent.re_matched_name  # session_id
         if self.context.sockjs_server.per_user:
             self.sid = (authenticated_userid(self.request), self.sid)
 
-        #try:
+        #try: esto es de pyramid_sockjs
         #    session = manager.get(sid, create, request=request)
         #except KeyError:
         #    return HTTPNotFound(headers=(session_cookie(request),))
         self.disable_cache()
         response.content_type = 'application/javascript'
         response.charset = 'UTF-8'
+
+        self.ginsfsm_request = self.request.environ['ginsfsm.request']
+        self.ginsfsm_response = self.request.environ['ginsfsm.response']
+
         return self  # Webob will execute __call__
 
     def __call__(self, environ, start_response):
-        write = self.write = start_response(
-            self.status, self._abs_headerlist(environ))
-        write('h' * 2048 + '\n')
-        self.flush()
+        start_response(self.status, self._abs_headerlist(environ))
+        self.ginsfsm_response.write('h' * 2048 + '\n')
+        self.ginsfsm_response.flush()
 
         if not self._attach_session(self.sid, False):
             self.finish()
             return ()
 
         if self.session:
-            self.session.flush()
-        return []
+            self.ginsfsm_response.flush()
+            #self.session.flush()
+        raise ResponseInterrupt(self.ginsfsm_request)
 
     def send_pack(self, message, binary=False):
         if binary:
             raise Exception('binary not supported for XhrStreamingTransport')
 
-        self.active = False
+        print ">>>>>>>>>>>>>>>>>>>>>>> SEND_PACK:", message
+        self.active = False  # TODO si chequeo active fallan los tests
 
         try:
             self.notify_sent(len(message))

File ginsfsm/protocols/wsgi/common/wsgi_response.py

         environ = {}
         # ginsfsm variables to do asynchronous response
         environ['ginsfsm.channel'] = channel
-        environ['ginsfsm.gaplic'] = channel.gaplic
         environ['ginsfsm.request'] = request
+        environ['ginsfsm.response'] = self
 
         environ['REQUEST_METHOD'] = request.command.upper()
         environ['SERVER_PORT'] = str(wsgi_server.effective_port)