Commits

Ginés Martínez Sánchez committed 10bfa5c Draft

developing websocket protocol

  • Participants
  • Parent commits de3e5da

Comments (0)

Files changed (3)

File ginsfsm/circular_fifo.py

 # -*- coding: utf-8 -*-
-""" FIFO queue implemented with a Circular buffer.
+""" FIFO queue implemented with a CircularFIFO buffer.
     It's not thread-safe.
 """
 from ginsfsm.utils import string_to_bytearray
 
 
 class CircularFullBufferError(Exception):
-    """ Raised when Circular have problems."""
+    """ Raised when CircularFIFO have problems."""
 
 
-class Circular(object):
+class CircularFIFO(object):
     def __init__(self, size=DEFAULT_CIRCULAR_SIZE):
-        """ Circular buffer.
+        """ CircularFIFO buffer.
         """
         self.size = size
         self.start = 0

File ginsfsm/protocols/sockjs/server/c_websocket.py

 import logging
 
 import ginsfsm.escape
+from ginsfsm.gobj import GObj
 from ginsfsm.c_timer import GTimer
 from ginsfsm.compat import binary_type
-from ginsfsm.gobj import GObj
-from ginsfsm.gmsg import (
-    GMsg,
-    gmsg_remove,
-)
+from ginsfsm.circular_fifo import CircularFIFO
+from ginsfsm.buffers import OverflowableBuffer
 
 STRUCT_2BYTES = struct.Struct("!H")
 STRUCT_8BYTES = struct.Struct("!Q")
 OPCODE_CONTROL_PONG = 0x0A
 
 
-class FRAME_HEADER_S(object):
-    """ Websocket frame header
+class FrameHead(object):
+    """ Websocket frame head.
 
-    This class controls the first two bytes of header:
+    This class analize the first two bytes of the header:
 
       0                   1
       0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
         self.h_mask = 0  # Set to 1 a masking key is present
         self.h_payload_len = 0
 
-    def reset_nth(self):
-        """ READ: Reset variables for a new read: network to host
+    def prepare_new_frame(self):
+        """Reset variables for a new read.
         """
-        # variables to help encode/decode frame
+        # state of frame
+        self.busy = True
         self.complete = False
         self.error = False
-        self.must_close = False
+
+        # must do
         self.must_read_2_extended_payload_length = False
         self.must_read_8_extended_payload_length = False
         self.must_read_masking_key = False
         self.must_read_payload_data = False
+        self.must_close = False
+
+        # information of frame
         self.masking_key = 0
         self.frame_length = 0
 
-    def decode_nth_header(self, data):
-        """ READ: Decode the two bytes header.
+    def decode_head(self, data):
+        """ READ: Decode the two bytes head.
         """
         if len(data) != 2:
             raise RuntimeError('ERROR decode_ntoh_header needs 2 bytes')
         self.h_mask = byte2 & 0x80
         self.h_payload_len = byte2 & 0x7f
 
-    def analize_nth_header(self):
-        """ READ: Analize the two bytes header and set the next steps.
-        """
+        # analize
+
         if self.h_mask:
             # must read 4 bytes of masking key
             self.must_read_masking_key = True
         if ln == 0:
             pass  # no data to read
         elif ln < 126:
+            # Got all we need to read data
             self.frame_length = self.h_payload_len
             self.must_read_payload_data = True
         elif ln == 126:
             # must read 2 bytes of extended payload length
             self.must_read_2_extended_payload_length = True
-        elif ln == 127:
+        else:  # ln == 127:
             # must read 8 bytes of extended payload length
             self.must_read_8_extended_payload_length = True
 
-    def decode_nth_extended_length(self, data):
-        """ READ: Decode the two bytes header.
+    def decode_extended_length(self, data):
+        """ READ: Decode 2/8 bytes of extended payload length.
         """
         if self.must_read_2_extended_payload_length:
+            # Read 2 bytes of extended payload length
             if len(data) != 2:
-                raise RuntimeError('ERROR decode_nth_extended_length 2 bytes')
+                raise RuntimeError(
+                    'ERROR decode_extended_length needs 2 bytes')
             self.frame_length = STRUCT_2BYTES.unpack(data)[0]
+            self.must_read_2_extended_payload_length = False
 
-        if self.must_read_8_extended_payload_length:
+        elif must_read_8_extended_payload_length:
+            # Read 8 bytes of extended payload length
             if len(data) != 8:
-                raise RuntimeError('ERROR decode_nth_extended_length 8 bytes')
+                raise RuntimeError(
+                    'ERROR decode_extended_length needs 8 bytes')
             self.frame_length = STRUCT_8BYTES.unpack(data)[0]
+            self.must_read_8_extended_payload_length = False
 
-        if self.frame_length:
-            self.must_read_payload_data = True
-
-    def decode_nth_masking_key(self, data):
-        """ READ: Decode the 4 masking-key bytes.
+    def decode_masking_key(self, data):
+        """ READ: Decode the 4 bytes of masking key.
         """
         if len(data) != 4:
-            raise RuntimeError('ERROR decode_nth_masking_key 4 bytes')
+            raise RuntimeError('ERROR decode_masking_key needs 4 bytes')
         self.masking_ley = data
+        self.must_read_masking_key = False
 
     # TODO: WRITE
     def reset_htn(self):
         return self.packer.size
 
 
-def analize_header(gmsg, iam_server):
-    """ Get and check the header from gmsg.
+def analyze_header(gmsg, iam_server):
+    """ Analyze and process the header.
     """
-    hd = FRAME_HEADER_S()
+    hd = FrameHead()
     data = gmsg.getdata(hd.size)
     if data is None:
         # there is no enough data
     """ Timeout frame start.
         Too much time waiting the frame header.
     """
-    self.remove_gmsgs()
+    self.frame_failed()
     self.gsock.mt_drop()
 
 
     """ Timeout frame start.
         Too much time waiting the frame header.
     """
-    self.remove_gmsgs()
+    self.frame_failed()
     self.gsock.mt_drop()
 
 
 def ac_disconnected(self, event):
-    """ Parter has disconnected.
+    """ Partner has disconnected.
     """
+    if self.cur_frame.busy:
+        self.frame_failed()
+    #TODO: avisa a la aplicación: on_close
 
 
 def ac_process_frame_header(self, event):
     """ Processing the header.
     """
-    gmsg = self.rx_circular
-    if not gmsg:
-        gmsg = self.rx_circular = GMsg(1024)
     if event.data:
-        gmsg.putdata(event.data)
+        # save the incoming data
+        self.circular.putdata(event.data)
 
-    #
-    # Have we got already a frame header struct?
-    #
-    hd = self.rx_frame_header
-    if hd is None:
-        #
-        # Read and analize the two first header bytes.
-        # Reject it if there is no at list 2 bytes.
-        #
-        hd = analize_header(gmsg, self.iam_server)
-        if hd.close or hd.error:
-            self.remove_gmsgs()
-            self.gsock.mt_drop()
-            return
-        if not hd.complete:
-            return
-        self.rx_frame_header = hd
+    # better work with local variables: more rapid.
+    cur_frame = self.cur_frame
+    circular = self.circular
 
-    #
-    #  Calculate the frame lenght
-    #
-    ln = hd.h_payload_len
+    if not cur_frame.busy:
+        # waiting the first two byte's head
+        if circular.busy_space < 2:
+            return  # wait more data
 
-    if ln < 126:
-        # we've got the length.
-        # next: read 4 bytes of masking key if iam_server
-        self.frame_length = hd.h_payload_len
-        print ">>>> frame_length", self.frame_length
-        if self.frame_length > 0:
-            print ">>>>>>>>>> PAYLOAD GMSG"
-            self.rx_data_buffer = GMsg(self.frame_length)
-        if self.iam_server:
-            self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
-        else:
-            self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+        print "NEW Frame"
+        # we've got enough data! Start a new frame
+        cur_frame.prepare_new_frame()  # `busy` flag is set.
+        data = circular.getdata(2)
+        cur_frame.decode_head(data)
 
-    elif ln == 126:
-        # next: read 2 bytes of extended payload length
-        # next: read 4 bytes of masking key if iam_server
-        self.set_new_state('ST_WAITING_HEADER_EXTENDED_LENGTH')
+    # processing extended header
+    if cur_frame.must_read_2_extended_payload_length:
+        if circular.busy_space < 2:
+            return  # wait more data
+        data = circular.getdata(2)
+        cur_frame.decode_extended_length(data)  # flag is set.
 
-    elif ln == 127:
-        # next: read 8 bytes of extended payload length
-        # next: read 4 bytes of masking key if iam_server
-        self.set_new_state('ST_WAITING_HEADER_EXTENDED_LENGTH')
+    if cur_frame.must_read_8_extended_payload_length:
+        if circular.busy_space < 8:
+            return  # wait more data
+        data = circular.getdata(8)
+        cur_frame.decode_extended_length(data)  # flag is set.
 
+    if cur_frame.must_read_masking_key:
+        if circular.busy_space < 4:
+            return  # wait more data
+        data = circular.getdata(4)
+        cur_frame.decode_masking_key(data)  # flag is set
+
+    if cur_frame.must_read_payload_data:
+        if circular.busy_space < cur_frame.frame_lenght:
+            return  # wait more data
+        # TODO save in OverflowableBuffer buffer!
+        # TODO: change the state!
+        cur_frame.decode_data(data)  # flag is set
+
+    if cur_frame.must_close:
+        self.gsock.mt_drop()
+
+    cur_frame.busy = False
+    self.process_frame()
+    if cur_frame.complete:
+        self.process_message()
+
+
+    self.set_new_state('ST_WAITING_PAYLOAD_DATA')
     self.send_event(self, 'EV_RX_DATA', data=None)
 
-    if self.iam_server:
-        self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
-        self.send_event(self, 'EV_RX_DATA', data=None)
-    else:
-        data = gmsg.getdata(gmsg.bytesleft())
-        self.set_new_state('ST_WAITING_PAYLOAD_DATA')
-        self.send_event(self, 'EV_RX_DATA', data=data)
-
 
 def ac_process_payload_data(self, event):
     """ Get payload data
     """
     print "GETTINNNG PAYLOAD"
-    gmsg = self.rx_circular
+    gmsg = self.circular
     if event.data:
         gmsg.putdata(event.data)
 
-    rx_data_buffer = self.rx_data_buffer
-    if rx_data_buffer is None:
+    data_bf = self.data_bf
+    if data_bf is None:
         # frame with lenght 0
         print "CONTROLLLL"
         self.flush_current_frame()
         return
 
-    free = rx_data_buffer.free_space()
+    free = data_bf.free_space()
     print "FREEEE", free
     if free > 0:
         data = gmsg.getdata(free)
         if data is None:
             # wait: need more data
             return
-        rx_data_buffer.putdata(data)
+        data_bf.putdata(data)
 
     self.flush_current_frame()
 
     'gsock': [None, None, 0, None, "GSock connection."],
     'request': [None, None, 0, None, "websocket request."],
     'iam_server': [bool, True, 0, None, "What side? server or client."],
+
+    # A tempfile should be created if the pending output is larger than
+    # outbuf_overflow, which is measured in bytes. The default is 1MB.  This
+    # is conservative.
+    'outbuf_overflow': [int, 1048576, 0, None, ""],
+
     'subscriber': [None, None, 0, None,
         "subcriber of all output-events."
         "Default is ``None``, i.e., the parent"
     KEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
 
     def __init__(self):
+        """ I used Cicular FIFO buffer to save the received data.
+            I pull the bytes needed for each operation.
+            The buffer is fixed size and by default of 8K bytes,
+            and it intend be as rapid as possible.
+            It's enough for do analysis of the header.
+            The payload data are saved in a OverflowableBuffer buffer,
+            a high capacity buffer.
+        """
         GObj.__init__(self, GWEBSOCKET_FSM, GWEBSOCKET_GCONFIG)
-        self.rx_frame_header = None
-        self.rx_circular = None
-        self.rx_data_buffer = None
-        self.masking_key = None
+        self.circular = CircularFIFO()  # always the same.
+        self.cur_frame = FrameHead()  # always the same
+        self.data_bf = OverflowableBuffer(self.outbuf_overflow)
 
     def start_up(self):
         """ Initialization zone.
         """
         return None
 
-    def remove_gmsgs(self):
-        self.remove_header_gmsg()
-        self.remove_data_gmsg()
+    def flush_current_frame(self):
+        hd = self.cur_frame
+        print "==============> OPCODE", hd.h_opcode
+        self.cur_frame = None
 
-    def remove_header_gmsg(self):
-        if self.rx_circular:
-            gmsg_remove(self.rx_circular)
-            self.rx_circular = None
-
-    def remove_data_gmsg(self):
-        if self.rx_data_buffer:
-            gmsg_remove(self.rx_data_buffer)
-            self.rx_data_buffer = None
-
-    def flush_current_frame(self):
-        hd = self.rx_frame_header
-        print "==============> OPCODE", hd.h_opcode
-        self.rx_frame_header = None
-
-        gmsg = self.rx_data_buffer
+        gmsg = self.data_bf
         if gmsg:
             gmsg.reset_rd()
             ln = gmsg.bytesleft()

File ginsfsm/tests/test_circular_fifo.py

 import unittest
 from ginsfsm.circular_fifo import (
-    Circular,
+    CircularFIFO,
     CircularFullBufferError,
 )
 
 
 class TestCircular(unittest.TestCase):
     def setUp(self):
-        self.circular = Circular(5)
+        self.circular = CircularFIFO(5)
 
     def test_getdata_putdata_without_overwrite(self):
         data = self.circular.getdata(1)
         self.assertEqual(count, 5)
 
 if __name__ == "__main__":
-    circular = Circular(5)
+    circular = CircularFIFO(5)
 
     circular.putdata("123")
     print circular