Commits

Ginés Martínez Sánchez  committed a96752e Draft

asynchronous control of response from Pyramid wsgi application DONE!

  • Participants
  • Parent commits cf5c33e

Comments (0)

Files changed (5)

File ginsfsm/protocols/http/common/response.py

 
         if content_length_header is None and self.content_length is not None:
             content_length_header = str(self.content_length)
-            print "YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO"
             self.response_headers.append(
                 ('Content-Length', content_length_header)
                 )
         self.request._close()
         self.flush()
 
-    def flush(self):
+    def flush(self, callback=None):
         """ Flush output buffer.
+        TODO: posibilita que se ejecute un callback al recibir
+        el evento EV_TRANSMIT_READY
         """
         self.channel.send_event(self.gsock, 'EV_FLUSH_OUTPUT_DATA')
+        if callback:
+            # TODO: must be executed at on event WRITE
+            # instead next pooling cycle.
+            self.channel.gaplic.add_callback(callback)
 
     def write(self, data):
         """ Write data to output buffer.

File ginsfsm/protocols/http/server/c_http_clisrv.py

 
 class ResponseInterrupt(Exception):
     """ To use when the response is asynchronous
-        or infinite until top level wants finish it.
+        or infinite until top level wants to finish it.
     """
-    def __init__(self, reference_request):
-        Exception.__init__(self)
-        self.reference_request = reference_request
 
 
 def ac_disconnected(self, event):
     data = event.data
     if not data:
         return
-    new_request = self.current_request
+    new_request = self.parsing_request
     while data:
         if new_request is None:
             new_request = HTTPRequestParser(self)
 
         if new_request.completed:
             # The new_request (with the body) is ready to use.
-            self.current_request = None
+            self.parsing_request = None
             if not new_request.empty:
                 self.enqueue_request(new_request)
             new_request = None
         else:
-            self.current_request = new_request
+            self.parsing_request = new_request
         if n >= len(data):
             break
         data = data[n:]
         logging.error("ERROR response doesn't mach responding request %s" %
             response.request.path)
 
-    if self.interrupted_response:
-        logging.error("ERROR interrupted_response is BUSY, of %s" %
+    if self.responding_response:
+        logging.error("ERROR responding_response is BUSY, of %s" %
             response.request.path)
+    self.responding_response = response
 
     response.start()
     try:
         response.execute()
-    except ResponseInterrupt as e:
+    except ResponseInterrupt:
         """ Response is asynchronous or infinite.
             Don't clear the current responding_request.
         """
-        if e.reference_request != self.responding_request:
-            logging.error("ERROR response doesn't mach responding request %s" %
-                response.request.path)
-        self.interrupted_response = response
         self.stop_responseless_timer()  # TODO: do some ping-alive
         return
 
 
     def __init__(self):
         GObj.__init__(self, GHTTPCLISRV_FSM, GHTTPCLISRV_GCONFIG)
-        self.current_request = None  # A request parser instance
-        self.responding_request = None  # resquest waiting top response
-        self.interrupted_response = None  # current async or infinite response.
+        self.parsing_request = None  # A request parser instance
+        self.responding_request = None  # request waiting a top response
+        self.responding_response = None  # current response being responding
         self.dl_requests = deque()  # requests queue
         self.sent_continue = False  # used as a latch after sending 100continue
 
         """ Write data to output buffer.
             To supply asynchronous access to high level.
         """
-        if not self.interrupted_response:
-            logging.error("ERROR channel.write() with no interrupted_response")
+        if not self.responding_response:
+            logging.error("ERROR channel.write() with no responding_response")
             return
-        self.interrupted_response(data)
+        self.responding_response.write(data)
 
-    def flush(self):
+    def flush(self, callback=None):
         """ Flush output buffer.
             To supply asynchronous access to high level.
         """
-        if not self.interrupted_response:
-            logging.error("ERROR channel.flush() with no interrupted_response")
+        if not self.responding_response:
+            logging.error("ERROR channel.flush() with no responding_response")
             return
-        self.flush()
+        self.responding_response.flush(callback)
 
     def finish(self, response=None):
         """ Finishes this response,
             and ending the HTTP request.
         """
         if response is None:
-            if not self.interrupted_response:
-                logging.error("ERROR channel.finish() with no interrupted_response")
+            if not self.responding_response:
+                logging.error("ERROR channel.finish()"
+                              " with no responding_response")
                 return
-            response = self.interrupted_response
+            response = self.responding_response
         response.finish()
         self.stop_responseless_timer()
         self.responding_request = None
+        self.responding_response = None
 
         if response.close_on_finish:
             # ignore all enqueued requests.

File ginsfsm/protocols/sockjs/server/basehandler.py

         self.context = context
         self.request = request
         super(BaseHandler, self).__init__()
-        self.ginsfsm_request = None
-        self.ginsfsm_response = None
+        self.ginsfsm_channel = None
 
     def set_response(self, response):
         """ response can be request.response for static urls
         """Finishes this response, ending the HTTP request."""
         #self._log_disconnect()
         #super(BaseHandler, self).finish()
-        if self.ginsfsm_response:
-            self.ginsfsm_response.finish()
+        self.ginsfsm_channel.finish()
 
     def safe_finish(self):
         """ Finish session.
         If it will blow up - connection was set to Keep-Alive and
         client dropped connection, ignore any IOError or socket error."""
-        self.finish()
+        try:
+            self.finish()
+        except:
+            pass
 
     def write(self, data):
         """ Write data to the network.
             Must be overridden in your application
         """
-        if self.ginsfsm_response:
-            self.ginsfsm_response.write(data)
+        self.ginsfsm_channel.write(data)
 
     def flush(self, include_footers=False, callback=None):
         """Flushes the current output buffer to the network.
         if another flush occurs before the previous flush's callback
         has been run, the previous callback will be discarded.
         """
-        if self.ginsfsm_response:
-            self.ginsfsm_response.flush()
-        if callback:
-            # TODO: must be executed at on event WRITE
-            # instead next pooling cycle.
-            self.context.sockjs_server.gaplic.add_callback(callback)
+        self.ginsfsm_channel.flush(callback)
 
 
 class PreflightHandler(BaseHandler):

File ginsfsm/protocols/sockjs/server/transports/xhrstreaming.py

     def post(self):
         #response = self.set_response(self.request.response)
         response = self.set_response(self)  # Response is self.
-        self.headers = ()  # important trick: clean default webob headers.
+        response.headers = ()  # important trick: clean default webob headers.
 
         self.sid = self.context.parent.re_matched_name  # session_id
         if self.context.sockjs_server.per_user:
         response.content_type = 'application/javascript'
         response.charset = 'UTF-8'
 
-        self.ginsfsm_request = self.request.environ['ginsfsm.request']
-        self.ginsfsm_response = self.request.environ['ginsfsm.response']
+        self.ginsfsm_channel = self.request.environ['ginsfsm.channel']
 
         return self  # Webob will execute __call__
 
     def __call__(self, environ, start_response):
         start_response(self.status, self._abs_headerlist(environ))
-        self.ginsfsm_response.write('h' * 2048 + '\n')
-        self.ginsfsm_response.flush()
+        self.ginsfsm_channel.write('h' * 2048 + '\n')
+        self.ginsfsm_channel.flush()
 
         if not self._attach_session(self.sid, False):
             self.finish()
             return ()
 
         if self.session:
-            self.ginsfsm_response.flush()
+            self.ginsfsm_channel.flush()
             #self.session.flush()
-        raise ResponseInterrupt(self.ginsfsm_request)
+        raise ResponseInterrupt()
 
     def send_pack(self, message, binary=False):
         if binary:

File ginsfsm/protocols/wsgi/common/wsgi_response.py

 
         environ = {}
         # ginsfsm variables to do asynchronous response
+        # you can access directly to http channel with functions:
+        #   write(data):        Write data to output buffer.
+        #   flush(callback):    Flush output buffer to socket,
+        #                       and call callback when transmit ready event.
+        #   finish()            Finish http response.
         environ['ginsfsm.channel'] = channel
-        environ['ginsfsm.request'] = request
-        environ['ginsfsm.response'] = self
 
         environ['REQUEST_METHOD'] = request.command.upper()
         environ['SERVER_PORT'] = str(wsgi_server.effective_port)