Commits

Ginés Martínez Sánchez committed de3e5da Draft

developing websocket protocol

  • Participants
  • Parent commits 90786a3

Comments (0)

Files changed (1)

ginsfsm/protocols/sockjs/server/c_websocket.py

     GMsg,
     gmsg_remove,
 )
-from ginsfsm.utils import (
-    string_to_pack,
-)
 
 STRUCT_2BYTES = struct.Struct("!H")
-STRUCT_4BYTES = struct.Struct("!Q")
+STRUCT_8BYTES = struct.Struct("!Q")
 STRUCT_BB = struct.Struct("BB")
 STRUCT_BBH = struct.Struct("!BBH")
 STRUCT_BBQ = struct.Struct("!BBQ")
 
     def __init__(self):
         # Information of the first two bytes header
-        self.fin = 0  # final fragment in a message
-        self.reserved_bits = 0
-        self.opcode = 0
-        self.mask = 0  # Set to 1 a  masking key is present in masking-key
-        self.payload_len = 0
+        self.h_fin = 0  # final fragment in a message
+        self.h_reserved_bits = 0
+        self.h_opcode = 0
+        self.h_mask = 0  # Set to 1 a masking key is present
+        self.h_payload_len = 0
 
+    def reset_nth(self):
+        """ READ: Reset variables for a new read: network to host
+        """
         # variables to help encode/decode frame
         self.complete = False
         self.error = False
         self.must_close = False
-        self.must_read_extended_payload_length = False
+        self.must_read_2_extended_payload_length = False
+        self.must_read_8_extended_payload_length = False
         self.must_read_masking_key = False
         self.must_read_payload_data = False
         self.masking_key = 0
-        self.payload_data_length = 0
+        self.frame_length = 0
 
-    def network_to_host(self, data):
+    def decode_nth_header(self, data):
+        """ READ: Decode the two bytes header.
+        """
         if len(data) != 2:
-            raise RuntimeError('ERROR ntoh needs two bytes.')
+            raise RuntimeError('ERROR decode_ntoh_header needs 2 bytes')
         byte1, byte2 = self.packer.unpack(bytes(data))
 
         # decod byte1
-        self.fin = byte1 & 0x80
-        self.reserved_bits = byte1 & 0x70
-        self.opcode = byte1 & 0x0f
+        self.h_fin = byte1 & 0x80
+        self.h_reserved_bits = byte1 & 0x70
+        self.h_opcode = byte1 & 0x0f
 
         # decod byte2
-        self.mask = byte2 & 0x80
-        self.payload_len = byte2 & 0x7f
+        self.h_mask = byte2 & 0x80
+        self.h_payload_len = byte2 & 0x7f
 
-    def host_to_network(self, fin, opcode, mask, payload_len):
-        self.fin = fin
-        self.reserved_bits = 0  # by the moment it's ignored.
-        self.opcode = opcode
-        self.mask = mask
-        self.payload_len = payload_len
+    def analize_nth_header(self):
+        """ READ: Analize the two bytes header and set the next steps.
+        """
+        if self.h_mask:
+            # must read 4 bytes of masking key
+            self.must_read_masking_key = True
 
-        if self.fin:
-            byte1 = (self.opcode & 0x0f) | 0x80
+        ln = self.h_payload_len
+        if ln == 0:
+            pass  # no data to read
+        elif ln < 126:
+            self.frame_length = self.h_payload_len
+            self.must_read_payload_data = True
+        elif ln == 126:
+            # must read 2 bytes of extended payload length
+            self.must_read_2_extended_payload_length = True
+        elif ln == 127:
+            # must read 8 bytes of extended payload length
+            self.must_read_8_extended_payload_length = True
+
+    def decode_nth_extended_length(self, data):
+        """ READ: Decode the two bytes header.
+        """
+        if self.must_read_2_extended_payload_length:
+            if len(data) != 2:
+                raise RuntimeError('ERROR decode_nth_extended_length 2 bytes')
+            self.frame_length = STRUCT_2BYTES.unpack(data)[0]
+
+        if self.must_read_8_extended_payload_length:
+            if len(data) != 8:
+                raise RuntimeError('ERROR decode_nth_extended_length 8 bytes')
+            self.frame_length = STRUCT_8BYTES.unpack(data)[0]
+
+        if self.frame_length:
+            self.must_read_payload_data = True
+
+    def decode_nth_masking_key(self, data):
+        """ READ: Decode the 4 masking-key bytes.
+        """
+        if len(data) != 4:
+            raise RuntimeError('ERROR decode_nth_masking_key 4 bytes')
+        self.masking_ley = data
+
+    # TODO: WRITE
+    def reset_htn(self):
+        """ WRITE: Reset variables for a new write: host to network
+        """
+        # variables to help encode/decode frame
+        self.complete = False
+        self.error = False
+        self.must_close = False
+        self.must_write_extended_payload_length = False
+        self.must_write_masking_key = False
+        self.must_write_payload_data = False
+        self.masking_key = 0
+        self.frame_length = 0
+
+    def encode_htn_header(self, h_fin, h_opcode, h_mask, h_payload_len):
+        """ WRITE: Encode the two bytes header.
+        """
+        self.h_fin = h_fin
+        self.h_reserved_bits = 0  # by the moment it's ignored.
+        self.h_opcode = h_opcode
+        self.h_mask = h_mask
+        self.h_payload_len = h_payload_len
+
+        if self.h_fin:
+            byte1 = (self.h_opcode & 0x0f) | 0x80
         else:
-            byte1 = self.opcode
+            byte1 = self.h_opcode
 
-        byte2 = self.
-        if self.mask:
+        byte2 = self.xxx
+        if self.h_mask:
             byte2 = byte2 & 0x80
 
         trama = self.packer.pack(byte1, byte2)
         return bytearray(trama)
 
     def yyy(self):
-        ln = self.frame_len
+        byte1 = 0
+        ln = self.frame_length
         if ln < 126:
             # next: write 4 bytes of masking key IF client
-            byte2 = self.frame_len
+            byte2 = self.frame_length
         elif ln <= 0xFFFF:
             # next: write 4 bytes of masking key IF client
             # next: write 2 bytes of extended payload length
             # next: write 4 bytes of masking key IF client
             # next: write 8 bytes of extended payload length
             byte2 = 127
-        if self.mask:
+        if self.h_mask:
             byte2 = byte2 & 0x80
 
         trama = self.packer.pack(byte1, byte2)
         return bytearray(trama)
 
-    def xxx(self, ln):
-        if ln < 126:
-            # next: read 4 bytes of masking key
-            self.frame_len = self.payload_len
-        elif ln == 126:
-            # next: read 2 bytes of extended payload length
-            # next: read 4 bytes of masking key
-            self.frame_len = 0
-        elif ln == 127:
-            # next: read 8 bytes of extended payload length
-            # next: read 4 bytes of masking key
-            self.frame_len = 0
-
-
     @property
     def size(self):
         return self.packer.size
         return hd
     hd.network_to_host(data)
     hd.complete = True
-    if hd.reserved_bits:
+    if hd.h_reserved_bits:
         # client is using as-yet-undefined extensions; abort
         hd.error = True
         logging.error(
-            "ERROR websocket header: using as-yet-undefined reserved_bits")
+            "ERROR websocket header: using as-yet-undefined h_reserved_bits")
         return hd
 
     if hd.opcode_is_control:
         #
-        #   Control opcode
+        #   Control h_opcode
         #
-        control = hd.opcode
+        control = hd.h_opcode
         if control == OPCODE_CONTROL_CLOSE:
             hd.close = True
             pass
             pass
         elif control == OPCODE_CONTROL_PONG:
             pass
-        if hd.payload_len >= 126:
+        if hd.h_payload_len >= 126:
             # control frames must have payload < 126; abort
             hd.error = True
             logging.error("ERROR websocket header: too big control frame payloda")
     else:
         #
-        #   Data opcode
+        #   Data h_opcode
         #
-        if iam_server and not hd.mask:
+        if iam_server and not hd.h_mask:
             # Unmasked frame; abort
             hd.error = True
             logging.error("ERROR websocket header: iam_server but unmasked frame")
     """
 
 
-def ac_process_frame_header_start(self, event):
-    """ Process two first bytes frame start.
+def ac_process_frame_header(self, event):
+    """ Processing the header.
     """
-    gmsg = self.rx_gmsg
+    gmsg = self.rx_circular
     if not gmsg:
-        gmsg = self.rx_gmsg = GMsg(1024)
+        gmsg = self.rx_circular = GMsg(1024)
     if event.data:
         gmsg.putdata(event.data)
 
     #
     # Have we got already a frame header struct?
     #
-    hd = self.frame_header
+    hd = self.rx_frame_header
     if hd is None:
         #
         # Read and analize the two first header bytes.
             return
         if not hd.complete:
             return
-        self.frame_header = hd
+        self.rx_frame_header = hd
 
     #
     #  Calculate the frame lenght
     #
-    ln = hd.payload_len
+    ln = hd.h_payload_len
 
     if ln < 126:
         # we've got the length.
         # next: read 4 bytes of masking key if iam_server
-        self.frame_len = hd.payload_len
-        print ">>>> frame_len", self.frame_len
-        if self.frame_len > 0:
+        self.frame_length = hd.h_payload_len
+        print ">>>> frame_length", self.frame_length
+        if self.frame_length > 0:
             print ">>>>>>>>>> PAYLOAD GMSG"
-            self.data_gmsg = GMsg(self.frame_len)
+            self.rx_data_buffer = GMsg(self.frame_length)
         if self.iam_server:
             self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
         else:
 
     self.send_event(self, 'EV_RX_DATA', data=None)
 
-
-def ac_process_header_extended_length(self, event):
-    """
-    """
-    gmsg = self.rx_gmsg
-    if event.data:
-        gmsg.putdata(event.data)
-
-    hd = self.frame_header
-    ln = hd.payload_len
-    if ln == 126:
-        # read 2 bytes of extended payload length
-        # next: read 4 bytes of masking key if iam_server
-        data = gmsg.getdata(2)
-        if data is None:
-            # wait: there is no enough data
-            return
-        self.frame_len = data
-
-    elif ln == 127:
-        # read 8 bytes of extended payload length
-        # next: read 4 bytes of masking key if iam_server
-        data = gmsg.getdata(8)
-        if data is None:
-            # wait: there is no enough data
-            return
-        self.frame_len = data
-
-    else:
-        logging.error("ERROR payload length: %d", ln)
-        self.remove_gmsgs()
-        self.gsock.mt_drop()
-        return
-
-    self.data_gmsg = GMsg(self.frame_len)
-
     if self.iam_server:
         self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
         self.send_event(self, 'EV_RX_DATA', data=None)
         self.send_event(self, 'EV_RX_DATA', data=data)
 
 
-def ac_process_header_masking_key(self, event):
-    """ Get the 4 bytes of masking key
-    """
-    gmsg = self.rx_gmsg
-    if event.data:
-        gmsg.putdata(event.data)
-    if not self.iam_server:
-        self.set_new_state('ST_WAITING_PAYLOAD_DATA')
-        self.send_event(self, 'EV_RX_DATA', data=None)
-        return
-
-    data = gmsg.getdata(4)
-    if data is None:
-        # wait: there is no enough data
-        return
-    self.masking_key = data
-
-    self.set_new_state('ST_WAITING_PAYLOAD_DATA')
-    self.send_event(self, 'EV_RX_DATA', data=None)
-
-
 def ac_process_payload_data(self, event):
     """ Get payload data
     """
     print "GETTINNNG PAYLOAD"
-    gmsg = self.rx_gmsg
+    gmsg = self.rx_circular
     if event.data:
         gmsg.putdata(event.data)
 
-    data_gmsg = self.data_gmsg
-    if data_gmsg is None:
+    rx_data_buffer = self.rx_data_buffer
+    if rx_data_buffer is None:
         # frame with lenght 0
         print "CONTROLLLL"
         self.flush_current_frame()
         return
 
-    free = data_gmsg.free_space()
+    free = rx_data_buffer.free_space()
     print "FREEEE", free
     if free > 0:
         data = gmsg.getdata(free)
         if data is None:
             # wait: need more data
             return
-        data_gmsg.putdata(data)
+        rx_data_buffer.putdata(data)
 
     self.flush_current_frame()
 
         (
             ('EV_DISCONNECTED', ac_disconnected,                    None),
             ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
-            ('EV_RX_DATA',      ac_process_frame_header_start,      None),
+            ('EV_RX_DATA',      ac_process_frame_header,            None),
         ),
 
-        'ST_WAITING_HEADER_EXTENDED_LENGTH':
-        (
-            ('EV_DISCONNECTED', ac_disconnected,                    None),
-            ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
-            ('EV_RX_DATA',      ac_process_header_extended_length,  None),
-        ),
-        'ST_WAITING_HEADER_MASKING_KEY':
-        (
-            ('EV_DISCONNECTED', ac_disconnected,                    None),
-            ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
-            ('EV_RX_DATA',      ac_process_header_masking_key,      None),
-        ),
-
-        'ST_WAITING_PAYLOAD_DATA':
+'ST_WAITING_PAYLOAD_DATA':
         (
             ('EV_DISCONNECTED', ac_disconnected,                    None),
             ('EV_TIMEOUT',      ac_timeout_waiting_payload_data,    None),
 
     def __init__(self):
         GObj.__init__(self, GWEBSOCKET_FSM, GWEBSOCKET_GCONFIG)
-        self.frame_header = None
-        self.rx_gmsg = None
-        self.data_gmsg = None
+        self.rx_frame_header = None
+        self.rx_circular = None
+        self.rx_data_buffer = None
         self.masking_key = None
 
     def start_up(self):
         self.remove_data_gmsg()
 
     def remove_header_gmsg(self):
-        if self.rx_gmsg:
-            gmsg_remove(self.rx_gmsg)
-            self.rx_gmsg = None
+        if self.rx_circular:
+            gmsg_remove(self.rx_circular)
+            self.rx_circular = None
 
     def remove_data_gmsg(self):
-        if self.data_gmsg:
-            gmsg_remove(self.data_gmsg)
-            self.data_gmsg = None
+        if self.rx_data_buffer:
+            gmsg_remove(self.rx_data_buffer)
+            self.rx_data_buffer = None
 
     def flush_current_frame(self):
-        hd = self.frame_header
-        print "==============> OPCODE", hd.opcode
-        self.frame_header = None
+        hd = self.rx_frame_header
+        print "==============> OPCODE", hd.h_opcode
+        self.rx_frame_header = None
 
-        gmsg = self.data_gmsg
+        gmsg = self.rx_data_buffer
         if gmsg:
             gmsg.reset_rd()
             ln = gmsg.bytesleft()
             unmasked = gmsg.getdata(ln)
-            mask = self.masking_key
+            h_mask = self.masking_key
             for i in xrange(ln):
-                unmasked[i] = unmasked[i] ^ mask[i % 4]
+                unmasked[i] = unmasked[i] ^ h_mask[i % 4]
 
-            if hd.fin:
+            if hd.h_fin:
                 print "<====== MESSAGE", unmasked
-                self.handle_message(hd.opcode, unmasked)
+                self.handle_message(hd.h_opcode, unmasked)
 
         self.remove_data_gmsg()
         self.set_new_state('ST_WAITING_HEADER')
         self.post_event(self, 'EV_RX_DATA', data=None)
 
-    def handle_message(self, opcode, data):
-        if opcode == OPCODE_TEXT_FRAME:
+    def handle_message(self, h_opcode, data):
+        if h_opcode == OPCODE_TEXT_FRAME:
             if True:  # TODO self._auto_decode:
                 data = data.decode("utf-8")
             self.gaplic.add_callback(
                 'EV_ON_MESSAGE',
                 data=data
             )
-        elif opcode == OPCODE_BINARY_FRAME:
+        elif h_opcode == OPCODE_BINARY_FRAME:
             self.gaplic.add_callback(
                 self.broadcast_event,
                 'EV_ON_MESSAGE',
                 data=data
             )
-        elif opcode == OPCODE_CONTROL_CLOSE:
+        elif h_opcode == OPCODE_CONTROL_CLOSE:
             self.gsock.mt_drop()
-        elif opcode == OPCODE_CONTROL_PING:
+        elif h_opcode == OPCODE_CONTROL_PING:
             pass
-        elif opcode == OPCODE_CONTROL_PONG:
+        elif h_opcode == OPCODE_CONTROL_PONG:
             pass
         else:
             pass
     def write_message(self, message, binary=False):
         """Sends the given message to the client of this Web Socket."""
         if binary:
-            opcode = 0x2
+            h_opcode = 0x2
         else:
-            opcode = 0x1
+            h_opcode = 0x1
         message = ginsfsm.escape.utf8(message)
         assert isinstance(message, binary_type)
-        self._write_frame(True, opcode, message)
+        self._write_frame(True, h_opcode, message)
 
-    def _write_frame(self, fin, opcode, data):
-        if fin:
-            finbit = opcode | 0x80
+    def _write_frame(self, h_fin, h_opcode, data):
+        if h_fin:
+            finbit = h_opcode | 0x80
         else:
-            finbit = opcode
+            finbit = h_opcode
 
         l = len(data)
         if l < 126: