Commits

Ginés Martínez Sánchez committed 063c132 Draft

developing websocket

Comments (0)

Files changed (4)

ginsfsm/circular_fifo.py

 """ FIFO queue implemented with a CircularFIFO buffer.
     It's not thread-safe.
 """
-from ginsfsm.utils import string_to_bytearray
-
 DEFAULT_CIRCULAR_SIZE = 8192
 
 
             self.data,
         )
 
-    def putdata(self, data):
-        """ WRITING to gmsg.
-            Add 'ln' bytes to the message tail.
+    def putdata(self, bf, ln=0):
+        """ WRITING: add `ln` bytes of `bf` to cicular fifo.
+            If `ln` is 0 then put all data.
+            If `ln` is greater than data len, then `ln` is limited to data len.
             Return number of written bytes.
         """
-        data = string_to_bytearray(data)
-        length = ln = len(data)
-        if ln > self.free_space():
+        if ln <= 0:
+            ln = len(bf)
+        if ln > len(bf):
+            ln = len(bf)
+
+        if ln == 0:
+            return 0
+
+        data = bf[0:ln]
+
+        writted = ln = len(data)
+        if ln > self.free_space:
             raise CircularFullBufferError(
                 "ERROR full buffer, not space for %d bytes" % (ln,))
 
             # put the rest at the left zone
             self.data[0:ln] = data[right_len:]
 
-        return length
+        return writted
 
     def getdata(self, ln=0):
         """ READING : Pull 'ln' bytes.
         right_len = self.size - self.start  # right zone (from start to end)
         if ln <= right_len:
             # enough with the right zone
-            data = self.data[self.start:ln]
+            data = self.data[self.start: self.start + ln]
             self.count -= ln
             self.start += ln
         else:
             # get all the right zone
-            data = self.data[self.start:self.size]
+            data = self.data[self.start: self.start + self.size]
             self.count -= ln  # decrement now!
             ln -= self.size - self.start
 
 
         return data
 
+    @property
     def busy_space(self):
         """READING: Return number of bytes pending of reading
         """
         return self.count
 
+    @property
     def free_space(self):
         """ WRITTING: Return number of free byte space to write.
         """

ginsfsm/protocols/sockjs/server/c_sockjs_websocket.py

 from ginsfsm.protocols.wsgi.webob.websocket_response import WebsocketResponse
 from ginsfsm.deferred import Deferred
 
+
 #----------------------------------------------------------------#
 #                   WebSocketHandler
 #----------------------------------------------------------------#
         """Sends the given message to the client of this Web Socket."""
         self.ws_connection.write_message(message, binary)
 
+    def close(self):
+        self.gsock.mt_drop()
 
 #----------------------------------------------------------------#
 #                   GWebsocket GClass
     def on_message(self, oevent):
         message = oevent.data
         # SockJS requires that empty messages should be ignored
-        print "MMMMMMMMMMMMMM", message
+        print "MMMMMMMMMMMMMM", message, message.__class__
         if not message or not self.session:
             return
 
             logging.exception('WebSocket')
 
             # Close session on exception
-            #self.session.close()
+            self.session.close()
 
             # Close running connection
-            self.abort_connection()
+            self.gsock.mt_drop()
 
     def on_close(self, oevent):
         # Close session if websocket connection was closed

ginsfsm/protocols/sockjs/server/c_websocket.py

 import logging
 
 import ginsfsm.escape
+from ginsfsm.gmsg import GMsg
 from ginsfsm.gobj import GObj
 from ginsfsm.c_timer import GTimer
 from ginsfsm.compat import binary_type
 class FrameHead(object):
     """ Websocket frame head.
 
-    This class analize the first two bytes of the header:
+    This class analize the first two bytes of the header.
+    The maximum size of a frame header is 14 bytes.
 
       0                   1
       0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
 
     packer = struct.Struct('BB')
 
-    def __init__(self):
+    def __init__(self, circular):
         # Information of the first two bytes header
         self.h_fin = 0  # final fragment in a message
         self.h_reserved_bits = 0
         self.h_opcode = 0
         self.h_mask = 0  # Set to 1 a masking key is present
         self.h_payload_len = 0
+        self.circular = circular
+        self.busy = False  # in half of header
+        self.header_complete = False  # Set True when header is completed
+        self.error = False
 
     def prepare_new_frame(self):
         """Reset variables for a new read.
         """
         # state of frame
-        self.busy = True
-        self.complete = False
+        self.busy = True  # in half of header
+        self.header_complete = False  # Set True when header is completed
         self.error = False
 
         # must do
         self.must_read_8_extended_payload_length = False
         self.must_read_masking_key = False
         self.must_read_payload_data = False
-        self.must_close = False
 
         # information of frame
         self.masking_key = 0
             # must read 8 bytes of extended payload length
             self.must_read_8_extended_payload_length = True
 
+        print "===> MUST (%x, %x) len %d, extended_len2 %s, extended_len8 %s, exmasking-key %s" %(
+            byte1, byte2,
+            self.frame_length,
+            self.must_read_2_extended_payload_length,
+            self.must_read_8_extended_payload_length,
+            self.must_read_masking_key)
+
     def decode_extended_length(self, data):
         """ READ: Decode 2/8 bytes of extended payload length.
         """
             self.frame_length = STRUCT_2BYTES.unpack(data)[0]
             self.must_read_2_extended_payload_length = False
 
-        elif must_read_8_extended_payload_length:
+        elif self.must_read_8_extended_payload_length:
             # Read 8 bytes of extended payload length
             if len(data) != 8:
                 raise RuntimeError(
         """
         if len(data) != 4:
             raise RuntimeError('ERROR decode_masking_key needs 4 bytes')
-        self.masking_ley = data
+        self.masking_key = data
         self.must_read_masking_key = False
 
+    def consume(self, bf):
+        """ Consume input data to get and analyze the frame header.
+            Return the consumed size.
+        """
+        # better work with local variables: more rapid.
+        circular = self.circular
+        total_consumed = 0
+        if not self.busy:
+            # waiting the first two byte's head
+            available = circular.busy_space + len(bf)
+            if available < 2:
+                # save the remaining data
+                consumed = circular.putdata(bf)
+                total_consumed += consumed
+                return total_consumed  # wait more data
+
+            needed = 2 - circular.busy_space
+            if needed > 0:
+                consumed = circular.putdata(bf, needed)
+                bf = bf[consumed:]
+                total_consumed += consumed
+
+            print "NEW Frame"
+            # we've got enough data! Start a new frame
+            self.prepare_new_frame()  # `busy` flag is set.
+            data = circular.getdata(2)
+            self.decode_head(data)
+
+        # processing extended header
+        if self.must_read_2_extended_payload_length:
+            available = circular.busy_space + len(bf)
+            if available < 2:
+                # save the remaining data
+                consumed = circular.putdata(bf)
+                total_consumed += consumed
+                return total_consumed  # wait more data
+
+            needed = 2 - circular.busy_space
+            if needed > 0:
+                consumed = circular.putdata(bf, needed)
+                bf = bf[consumed:]
+                total_consumed += consumed
+
+            data = circular.getdata(2)
+            self.decode_extended_length(data)  # flag is set.
+
+        if self.must_read_8_extended_payload_length:
+            available = circular.busy_space + len(bf)
+            if available < 8:
+                # save the remaining data
+                consumed = circular.putdata(bf)
+                total_consumed += consumed
+                return total_consumed  # wait more data
+
+            needed = 8 - circular.busy_space
+            if needed > 0:
+                consumed = circular.putdata(bf, needed)
+                bf = bf[consumed:]
+                total_consumed += consumed
+
+            data = circular.getdata(8)
+            self.decode_extended_length(data)  # flag is set.
+
+        if self.must_read_masking_key:
+            available = circular.busy_space + len(bf)
+            if available < 4:
+                # save the remaining data
+                consumed = circular.putdata(bf)
+                total_consumed += consumed
+                return total_consumed  # wait more data
+
+            needed = 4 - circular.busy_space
+            if needed > 0:
+                consumed = circular.putdata(bf, needed)
+                bf = bf[consumed:]
+                total_consumed += consumed
+
+            data = circular.getdata(4)
+            self.decode_masking_key(data)  # flag is set
+
+        self.header_complete = True
+        return total_consumed
+
     # TODO: WRITE
     def reset_htn(self):
         """ WRITE: Reset variables for a new write: host to network
         # variables to help encode/decode frame
         self.complete = False
         self.error = False
-        self.must_close = False
         self.must_write_extended_payload_length = False
         self.must_write_masking_key = False
         self.must_write_payload_data = False
     """ Timeout frame start.
         Too much time waiting the frame header.
     """
-    self.frame_failed()
-    self.gsock.mt_drop()
+    logging.error("ERROR websocket TIMEOUT waiting HEADER")
+    self.close()
 
 
 def ac_timeout_waiting_payload_data(self, event):
     """ Timeout frame start.
         Too much time waiting the frame header.
     """
-    self.frame_failed()
-    self.gsock.mt_drop()
+    logging.error("ERROR websocket TIMEOUT waiting PAYLOAD data")
+    self.close()
 
 
 def ac_disconnected(self, event):
     """ Partner has disconnected.
     """
-    if self.cur_frame.busy:
-        self.frame_failed()
-    #TODO: avisa a la aplicación: on_close
+    self.closed = True
+    self.close()
 
 
 def ac_process_frame_header(self, event):
     """ Processing the header.
     """
-    if event.data:
-        # save the incoming data
-        self.circular.putdata(event.data)
+    cur_frame = self.cur_frame
+    bf = event.data
+    while bf:
+        consumed = cur_frame.consume(bf)
+        if cur_frame.error:
+            self.close()  # on error do break the connection
+            return
+        bf = bf[consumed:]
 
-    # better work with local variables: more rapid.
-    cur_frame = self.cur_frame
-    circular = self.circular
-
-    if not cur_frame.busy:
-        # waiting the first two byte's head
-        if circular.busy_space < 2:
-            return  # wait more data
-
-        print "NEW Frame"
-        # we've got enough data! Start a new frame
-        cur_frame.prepare_new_frame()  # `busy` flag is set.
-        data = circular.getdata(2)
-        cur_frame.decode_head(data)
-
-    # processing extended header
-    if cur_frame.must_read_2_extended_payload_length:
-        if circular.busy_space < 2:
-            return  # wait more data
-        data = circular.getdata(2)
-        cur_frame.decode_extended_length(data)  # flag is set.
-
-    if cur_frame.must_read_8_extended_payload_length:
-        if circular.busy_space < 8:
-            return  # wait more data
-        data = circular.getdata(8)
-        cur_frame.decode_extended_length(data)  # flag is set.
-
-    if cur_frame.must_read_masking_key:
-        if circular.busy_space < 4:
-            return  # wait more data
-        data = circular.getdata(4)
-        cur_frame.decode_masking_key(data)  # flag is set
-
-    if cur_frame.must_read_payload_data:
-        if circular.busy_space < cur_frame.frame_lenght:
-            return  # wait more data
-        # TODO save in OverflowableBuffer buffer!
-        # TODO: change the state!
-        cur_frame.decode_data(data)  # flag is set
-
-    if cur_frame.must_close:
-        self.gsock.mt_drop()
-
-    cur_frame.busy = False
-    self.process_frame()
-    if cur_frame.complete:
-        self.process_message()
-
-
-    self.set_new_state('ST_WAITING_PAYLOAD_DATA')
-    self.send_event(self, 'EV_RX_DATA', data=None)
+        if cur_frame.header_complete:
+            print "HEAD COMPLETEDDD"
+            if cur_frame.must_read_payload_data:
+                # Creat a new buffer for payload data
+                self.loaded_data = 0
+                self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+                self.send_event(self, 'EV_RX_DATA', data=bf)
+                return
+            else:
+                self.frame_completed()
 
 
 def ac_process_payload_data(self, event):
     """ Get payload data
     """
-    print "GETTINNNG PAYLOAD"
-    gmsg = self.circular
-    if event.data:
-        gmsg.putdata(event.data)
+    bf = event.data
+    bf_len = len(bf)
+    if bf_len == 0:
+        return
+    loaded_data = self.loaded_data
+    frame_length = self.cur_frame.frame_length
+    to_add = frame_length - loaded_data
+    if to_add < bf_len:
+        # use partial buffer, remain must be another new frame
+        data = bf[0:to_add]
+        bf = bf[to_add:]
+        self.frame_gmsg.putdata(data)
+        loaded_data += to_add
+    else:
+        self.frame_gmsg.putdata(bf)
+        loaded_data += bf_len
+        bf = None
 
-    data_bf = self.data_bf
-    if data_bf is None:
-        # frame with lenght 0
-        print "CONTROLLLL"
-        self.flush_current_frame()
-        return
-
-    free = data_bf.free_space()
-    print "FREEEE", free
-    if free > 0:
-        data = gmsg.getdata(free)
-        if data is None:
-            # wait: need more data
-            return
-        data_bf.putdata(data)
-
-    self.flush_current_frame()
+    self.loaded_data = loaded_data
+    if loaded_data == frame_length:
+        self.frame_completed()
+        self.set_new_state('ST_WAITING_HEADER')
+        if bf:
+            self.send_event(self, 'EV_RX_DATA', data=bf)
 
 
 GWEBSOCKET_FSM = {
     ),
     'state_list': (
         'ST_WAITING_HEADER',
-        'ST_WAITING_HEADER_EXTENDED_LENGTH',
-        'ST_WAITING_HEADER_MASKING_KEY',
         'ST_WAITING_PAYLOAD_DATA',
     ),
     'machine': {
             ('EV_RX_DATA',      ac_process_frame_header,            None),
         ),
 
-'ST_WAITING_PAYLOAD_DATA':
+        'ST_WAITING_PAYLOAD_DATA':
         (
             ('EV_DISCONNECTED', ac_disconnected,                    None),
             ('EV_TIMEOUT',      ac_timeout_waiting_payload_data,    None),
     # A tempfile should be created if the pending output is larger than
     # outbuf_overflow, which is measured in bytes. The default is 1MB.  This
     # is conservative.
+    # It's too used for maximum size of GMsg (buffer for frame payload data)
     'outbuf_overflow': [int, 1048576, 0, None, ""],
 
     'subscriber': [None, None, 0, None,
             a high capacity buffer.
         """
         GObj.__init__(self, GWEBSOCKET_FSM, GWEBSOCKET_GCONFIG)
-        self.circular = CircularFIFO()  # always the same.
-        self.cur_frame = FrameHead()  # always the same
-        self.data_bf = OverflowableBuffer(self.outbuf_overflow)
+        self.circular = CircularFIFO(14)  # always the same.
+        self.cur_frame = FrameHead(self.circular)  # always the same
+        self.closed = False  # channel closed
+        self.on_close_broadcasted = False  # event on_close already broadcasted
+        self.message_header = None
+        self.frame_gmsg = GMsg(1024, self.outbuf_overflow)
+        self.message_buffer = None
 
     def start_up(self):
         """ Initialization zone.
         """
         return None
 
-    def flush_current_frame(self):
-        hd = self.cur_frame
-        print "==============> OPCODE", hd.h_opcode
-        self.cur_frame = None
+    def close(self):
+        if not self.closed:
+            self.closed = True
+            self.gsock.mt_drop()
 
-        gmsg = self.data_bf
-        if gmsg:
-            gmsg.reset_rd()
+        if not self.on_close_broadcasted:
+            self.on_close_broadcasted = True
+            self.gaplic.add_callback(self.broadcast_event, 'EV_ON_CLOSE')
+
+    def frame_completed(self):
+        """ Process the completed frame.
+        """
+        print "FRAMEEEE COMPLETEDDD"
+        cur_frame = self.cur_frame
+        gmsg = self.frame_gmsg
+        gmsg.reset_rd()
+
+        unmasked = None
+        if cur_frame.frame_length:
             ln = gmsg.bytesleft()
             unmasked = gmsg.getdata(ln)
-            h_mask = self.masking_key
-            for i in xrange(ln):
-                unmasked[i] = unmasked[i] ^ h_mask[i % 4]
+            if cur_frame.h_mask:
+                h_mask = cur_frame.masking_key
+                for i in xrange(ln):
+                    unmasked[i] = unmasked[i] ^ h_mask[i % 4]
 
-            if hd.h_fin:
-                print "<====== MESSAGE", unmasked
-                self.handle_message(hd.h_opcode, unmasked)
+        if cur_frame.h_fin:
+            # last frame of message
+            if self.message_buffer:
+                self.message_buffer.append(unmasked)
+                message = self.message_buffer.get()
+                operation = self.message_header.h_opcode
+            else:
+                message = unmasked
+                operation = cur_frame.h_opcode
 
-        self.remove_data_gmsg()
-        self.set_new_state('ST_WAITING_HEADER')
-        self.post_event(self, 'EV_RX_DATA', data=None)
+            if operation == OPCODE_CONTINUATION_FRAME:
+                print "CONTINUATION FRAME"
+            elif operation == OPCODE_TEXT_FRAME:
+                if message:
+                    message = message.decode("utf-8")
+                    print "TEXT FRAME", message, message.__class__
+                    self.gaplic.add_callback(
+                        self.broadcast_event,
+                        'EV_ON_MESSAGE',
+                        data=message
+                    )
+            elif operation == OPCODE_BINARY_FRAME:
+                if message:
+                    print "BINARY FRAME", message
+                    self.gaplic.add_callback(
+                        self.broadcast_event,
+                        'EV_ON_MESSAGE',
+                        data=unmasked
+                    )
+            elif operation == OPCODE_CONTROL_CLOSE:
+                print "CONTROL CLOSE"
+                self.close()
+            elif operation == OPCODE_CONTROL_PING:
+                print "CONTROL PING"
+            elif operation == OPCODE_CONTROL_PONG:
+                print "CONTROL PONG"
+
+            self.message_header = None
+            self.message_buffer = None
+
+        else:
+            # Message with several frames
+            if not self.message_header:
+                self.message_header = cur_frame
+            if not self.message_buffer:
+                self.message_buffer = OverflowableBuffer(self.outbuf_overflow)
+            self.message_buffer.append(unmasked)
+
+        gmsg.reset_wr()  # Reset buffer for next frame
+        cur_frame.busy = False
 
     def handle_message(self, h_opcode, data):
         if h_opcode == OPCODE_TEXT_FRAME:

ginsfsm/tests/test_circular_fifo.py

         data = self.circular.getdata(1)
         self.assertEqual(data, b'')
 
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 5)
 
         count = self.circular.putdata("123")
         self.assertEqual(count, 3)
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 2)
 
         count = self.circular.putdata("ab")
         self.assertEqual(count, 2)
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 0)
 
         self.assertRaises(CircularFullBufferError,
     def test_getdata_putdata_with_overwrite(self):
         count = self.circular.putdata("123")
         self.assertEqual(count, 3)
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 2)
 
         self.assertRaises(CircularFullBufferError,
 
         data = self.circular.getdata(2)
         self.assertEqual(data, b"12")
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 4)
-        count = self.circular.busy_space()
+        count = self.circular.busy_space
         self.assertEqual(count, 1)
 
         count = self.circular.putdata("abcd")
 
         data = self.circular.getdata(0)
         self.assertEqual(data, b"3abcd")
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 5)
 
         count = self.circular.putdata("zxcvb")
         self.assertEqual(count, 5)
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 0)
 
         data = self.circular.getdata(4)
         self.assertEqual(data, b"zxcv")
-        count = self.circular.free_space()
+        count = self.circular.free_space
         self.assertEqual(count, 4)
 
         data = self.circular.getdata(3)
-        self.assertEqual(data, b"")
-        count = self.circular.free_space()
+        self.assertEqual(data, b"b")
+        count = self.circular.free_space
         self.assertEqual(count, 5)
 
+    def test_getdata_putdata_with_ln(self):
+        s = bytearray(['1', '2', '3'])
+        count = self.circular.putdata(s, 2)
+        self.assertEqual(count, 2)
+        count = self.circular.free_space
+        self.assertEqual(count, 3)
+
+        s = "abc"
+        count = self.circular.putdata(s, 5)
+        self.assertEqual(count, 3)
+        count = self.circular.free_space
+        self.assertEqual(count, 0)
+
+        data = self.circular.getdata()
+        self.assertEqual(data, b"12abc")
+
 if __name__ == "__main__":
     circular = CircularFIFO(5)