Commits

Ginés Martínez Sánchez committed f7eca43 Draft

developing websocket protocol

  • Participants
  • Parent commits 9ee5ee2

Comments (0)

Files changed (7)

File ginsfsm/c_sock.py

 GObj for manage socket events.
 
 .. autoclass:: GSock
-    :members: start_up, get_next_dst
+    :members: start_up, get_next_dstRecv data
 
 """
 
     ssl = None
 
 import select
+import traceback
+import datetime
 import socket
 import time
 import errno
         self.rxed_msgs += 1
         self.rxed_bytes += ln
         if self.trace_dump:
-            logging.debug(
-                "Recv data '%s' (%d bytes)\n%s" % (
-                    self.name, ln, hexdump('<=', data)))
+            hora = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            self.logger.debug(
+                "%s - Recv data '%s' (%d bytes)\n%s" % (
+                    hora,
+                    self.name,
+                    ln,
+                    hexdump('<=', data)
+                    )
+                )
 
         if self.rx_data_event_name is not None:
-            self.broadcast_event(
-                self.rx_data_event_name,
-                gsock=self,
-                data=data,
-            )
+            try:
+                self.broadcast_event(
+                    self.rx_data_event_name,
+                    gsock=self,
+                    data=data,
+                )
+            except:
+                traceback.format_exc()
+                hora = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+                self.logger.error(
+                    "%s - ERROR processing recv data '%s' (%d bytes)\n%s" % (
+                        hora,
+                        self.name,
+                        ln,
+                        hexdump('<=', data)
+                        )
+                    )
+                self.logger.error(traceback.format_exc())
 
     def readable(self):
         #"predicate for inclusion in the readable for select()"
             while outbuflen > 0:
                 chunk = outbuf.get(self.send_bytes)
                 if self.trace_dump:
+                    hora = datetime.datetime.now().strftime(
+                        "%Y-%m-%d %H:%M:%S")
                     logging.info(
-                        "Send data '%s' (%d bytes)\n%s" %
-                        (self.name, len(chunk), hexdump('=>', chunk)))
+                        "%s - Send data '%s' (%d bytes)\n%s" % (
+                            hora,
+                            self.name,
+                            len(chunk),
+                            hexdump('=>', chunk)
+                            )
+                        )
                 num_sent = self.send(chunk)
                 if num_sent:
                     outbuf.skip(num_sent, True)

File ginsfsm/deferred.py

 class DeferredList(object):
     """ List of deferred callbacks
         Add a callable (function or method).
+        The deferred callable are executed by searching his reference.
     """
     def __init__(self):
         self.callbacks = deque()

File ginsfsm/gaplic.py

 )
 from ginsfsm.deferred import (
     DeferredList,
+    Deferred,
 )
 from ginsfsm.c_sock import (
     poll_loop,
         timeout.callback = None
         self._clearTimeout(timeout)
 
-    def add_callback(self, callback):
+    def add_callback(self, func, *args, **kwargs):
         """ Compatible with tornado.io_loop
 
         Calls the given callback on the next I/O loop iteration.
         control from other threads to the IOLoop's thread.
         """
         list_empty = not self._callbacks
-        self._callbacks.append(callback)
+        deferred = Deferred(0, func, *args, **kwargs)
+        self._callbacks.append(deferred)
         if list_empty:
             if threading.current_thread().ident != self._thread_ident:
                 # If we're in the IOLoop's thread, we know it's not currently
                 # TODO: study this from tornado
                 # self._waker.wake()
 
-    def _run_callback(self, callback):
+    def _run_callback(self, deferred):
         try:
-            callback()
+            deferred()
         except Exception:
-            self.handle_callback_exception(callback)
+            self.handle_callback_exception(deferred)
 
     def handle_callback_exception(self, callback):
         """This method is called whenever a callback run by the IOLoop

File ginsfsm/gmsg.py

         # El máximo número de bytes que puede haber es 'segm_size'.
         # 'tail' no puede ser mayor que 'segm_size'
         #
-        self.head = 0  # primer byte sin procesar. Usado para LECTURA
-        self.tail = 0  # próximo byte a añadir. Usado para ESCRITURA (añadir)
-        self.fixp = 0  # fixed pointer. Usado en ESCRITURA para sobreescribir
+        self.head = 0  # primer byte sin procesar. Usado para READING
+        self.tail = 0  # próximo byte a añadir. Usado para WRITTING (añadir)
+        self.fixp = 0  # fixed pointer. Usado en WRITTING para sobreescribir
 
         #
         #  Calcula tamaño segmento
             segm = dl_next(segm)
 
     def getchar(self):
-        """ LECTURA o procesamient del mensaje.
+        """ READING o procesamient del mensaje.
             Saca un byte del message retornalo
             Retorna None si no hay más bytes
         """
         return None
 
     def check_char(self):
-        """ LECTURA o procesamient del mensaje.
+        """ READING o procesamient del mensaje.
             Consulta un byte del message y dejalo en 'c' (NO LO SACA DEL MSG)
             Retorna None si no hay mas bytes, y el caracter si lo hay.
         """
         return None
 
     def cur_seg_rd(self):
-        """  LECTURA : Devuelve segm lectura actual
+        """  READING : Devuelve segm lectura actual
         """
         if not self.rd_segm:
             return None
         return self.rd_segm.data[self.rd_segm.head:]
 
     def cur_seglen_rd(self):
-        """  LECTURA : Devuelve ln segm lectura actual
+        """  READING : Devuelve ln segm lectura actual
         """
         if not self.rd_segm:
             return None
         return self.rd_segm.tail - self.rd_segm.head
 
     def next_seg_rd(self):
-        """ LECTURA : Pon como actual el siguiente segm de lectura
+        """ READING : Pon como actual el siguiente segm de lectura
             Retorna false si no hay mas.
         """
         if not self.rd_segm:
             return False
 
     def getdata(self, ln):
-        """ LECTURA : Saca 'ln' bytes del self, moviendo los datos a 'data'
-            desde la posición actual de lectura.
-            NOO Retorna nº de bytes sacados
-            Retorna los datos
+        """ READING : Pull 'ln' bytes.
+            Read from the reading current position.
+            If there is no enough data (ln) then return None.
+            Return the pulled data or None.
         """
         if not self.rd_segm:
             return None
+        if self.bytesleft() < ln:
+            return None
 
         data = bytearray()
         while self.rd_segm and ln > 0:
         return data
 
     def subsetdata(self, ln):
-        """ LECTURA : Fija 'ln' bytes para el submensaje
+        """ READING : Fija 'ln' bytes para el submensaje
         """
         if ln < self.bytesleft():
             self.sublen = ln
         return False
 
     def subgetdata(self, ln):
-        """ LECTURA : Saca 'ln' bytes del SUB-MSG, moviendo los datos a 'data'
+        """ READING : Saca 'ln' bytes del SUB-MSG, moviendo los datos a 'data'
             desde la posición actual de lectura.
             Si 'data' es NULL simplemente se sacan los bytes,
             sin copiarlos a nada.
         return self.getdata(ln)
 
     def remove_subdata(self):
-        # LECTURA : Saca los bytes del SUB-MSG, eliminandolos
+        # READING : Saca los bytes del SUB-MSG, eliminandolos
         return self.subgetdata(self, self.sublen)
 
     def putchar(self, c):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Añade un byte al final del mensaje
             Retorna numero bytes escritos.
         """
         return self.putdata(self, c, 1)
 
     def putdata(self, data, ln=0):
-        """ ESCRITURA del mensaje.
-            Añade 'ln' bytes al final del mensaje
-            Retorna numero bytes escritos.
+        """ WRITING to gmsg.
+            Add 'ln' bytes to the message tail.
+            Return number of written bytes.
         """
+        if not data:
+            return 0
         data = string_to_bytearray(data)
         if ln == 0:
             ln = len(data)
         return True
 
     def new_segm(self):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Crea un nuevo segm para a€adir datos
             Retorna FALSE si error
         """
         return True
 
     def cur_seg_wr(self):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Retorna actual pointer segm de escritura
         """
         segm = dl_last(self.dl_segm)
             return None
 
     def cur_seglen_wr(self):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Retorna len of segm de escritura actual
         """
         segm = dl_last(self.dl_segm)
             return 0
 
     def insert_char(self, c):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Inserta al principio del mensaje
             !!! OJO QUE INSERTA EN head
             Retorna FALSE si error
         return self.insert_data(self, c, 1)
 
     def insert_data(self, data, ln=0):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Inserta 'ln' bytes al principio del mensaje
             !!!OJO QUE SE INSERTA EN HEAD
             !! NO SE PUEDEN INSERTAR MAS BYTES QUE EL TAMAÑO DEL SEGMENTO!!!
         self.fix_segm.fixp = self.fix_segm.tail
 
     def overwrite_data(self, data, ln=0):
-        """ ESCRITURA del mensaje.
+        """ WRITING to gmsg.
             Sobreescribe en la marked (fixed) position
             Retorna FALSE si error
         """
         return False  # can't override all
 
     def marked_bytes(self):
-        """ ESCRITURA: numero total de bytes escritos
+        """ WRITTING: numero total de bytes escritos
             desde la marca hasta el final
         """
         sum_len = 0
         return sum_len
 
     def bytesleft(self):
-        """LECTURA: nº bytes pendientes de procesar
+        """READING: Return number of bytes pending of reading
         """
         sum_len = 0
 
         return sum_len
 
     def totalbytes(self):
-        """ ESCRITURA: numero total de bytes escritos
+        """ WRITTING: Return total number of written bytes
         """
         sum_len = 0
 
             segm = dl_next(segm)
         return sum_len
 
+    def free_space(self):
+        """ WRITTING: Return number of free byte space to write.
+        """
+        free_bytes = self.max_size - self.totalbytes()
+        return free_bytes
+
 
 def gmsg_remove(gmsg):
     """ Elimina paquete

File ginsfsm/gobj.py

 import logging
 import re
 import ginsfsm.globals  # made it import available
-
 from ginsfsm.compat import string_types
-
 from ginsfsm.smachine import (
     SMachine,
     EventError,
     StateError,  # made it import available
     MachineError,  # made it import available
 )
-
 from ginsfsm.gconfig import (
     GConfig,
     add_gconfig,
 )
+from ginsfsm.deferred import Deferred
 
 
 class ParentError(Exception):
 
         # if destination is not None:
         if not (isinstance(destination, string_types) or
-                isinstance(destination, GObj)):
+                isinstance(destination, GObj) or
+                isinstance(destination, Deferred)
+                ):
             raise DestinationError(
-                '_event_factory() BAD TYPE destination %s in %s' %
-                (repr(destination), repr(self)))
+                '_event_factory() BAD TYPE destination %r in %r' %
+                (destination, self))
 
         if isinstance(event, Event):
             # duplicate the event
                         oevent.name = sub.change_original_event_name
 
                     ret = False
-                    if hasattr(sub, 'use_callback'):
+                    if isinstance(sub.subscriber_gobj, Deferred):
                         # outside world
-                        if hasattr(sub, 'callback_fn'):
-                            callback = sub.callback_fn
-                            if hasattr(sub, 'callback_param'):
-                                callback_param = sub.callback_param
-                                callback(callback_param)
-                            else:
-                                callback()
-                            ## TODO: callback(oevent): debería convertir
-                            # a oevent en json? pues claro que sí.
-                            # No obligues a que te conozcan
-                            # Eso sí: que acepten 1 param.
-                            # MEJORA:
-                            # Pero por otro, si ya tienen que conocer
-                            # el gobj-ecosistema para poder enviar eventos.
-                            # Si son casi hermanos, no estamos tratando
-                            # con entes de otro ecosistema físicamente externo,
-                            # entre los que sí se entiende, y no puede ser
-                            # de otra manera, que se tengan que intercambiar
-                            # los datos en formato aséptico, es decir, json.
-
-                            # Al callback no le pasamos el oevent,
-                            # él ya sabe a qué evento se ha suscrito.
-                            # Le pasamos el parámetro que él ha elegido.
-                        else:
-                            self.logger.error(
-                                "ERROR use_callback without callback_fn"
-                            )
+                        sub.subscriber_gobj()
                     else:
                         # gobj-ecosistema
                         if hasattr(sub, 'use_post_event'):
 
         if subscriber_gobj is not None:
             if not (isinstance(subscriber_gobj, string_types) or
-                    isinstance(subscriber_gobj, GObj)):
+                    isinstance(subscriber_gobj, Deferred) or
+                    isinstance(subscriber_gobj, GObj)
+                    ):
                 raise GObjError(
-                    'subcribe_event(): BAD TYPE subscriber_gobj %s in %s'
-                    % (repr(subscriber_gobj), repr(self)))
+                    'subcribe_event(): BAD TYPE subscriber_gobj %r in %r'
+                    % (subscriber_gobj, self))
 
             if isinstance(subscriber_gobj, string_types):
                 if not self.gaplic:
                 continue
             if not isinstance(name, string_types):
                 raise EventError(
-                    'subscribe_event(): event %s is not string in %s'
-                    % (repr(name), repr(self)))
+                    'subscribe_event(): event %r is not string in %r'
+                    % (name, self))
 
             if name not in output_events:
                 raise EventError(
-                    'subscribe_event(): output-event %s not defined in'
-                    ' %s' % (repr(event_name), repr(self)))
+                    'subscribe_event(): output-event %r not defined in'
+                    ' %r' % (event_name, self))
 
         existing_subs = self._find_subscription(event_name, subscriber_gobj)
         if existing_subs:

File ginsfsm/protocols/sockjs/server/c_sockjs_websocket.py

 """
 import logging
 import random
-import base64
 
 from pyramid.view import view_config
 
 from ginsfsm.protocols.sockjs.server.session import BaseSession
 from ginsfsm.protocols.sockjs.server.session import ConnectionInfo
 from ginsfsm.protocols.wsgi.webob.websocket_response import WebsocketResponse
-
+from ginsfsm.deferred import Deferred
+import ginsfsm.escape
 
 #----------------------------------------------------------------#
 #                   WebSocketHandler
         self.ws_connection = self.context.gaplic.create_gobj(
             None,
             GWebSocket,
-            None,           # Mixins has no parent!!!
+            None,           # No parent, Iam a Mixin!
             request=self.request,
             gsock=self.gsock,
         )
+        self.ws_connection.delete_all_subscriptions()
+        # da error al suscribir, no es un gobj, como hago para las callback?
+        deferred_open = Deferred(0, self.open)
+        deferred_message = Deferred(0, self.on_message)
+        deferred_close = Deferred(0, self.on_close)
+        self.ws_connection.subscribe_event('EV_ON_OPEN', deferred_open)
+        self.ws_connection.subscribe_event('EV_ON_MESSAGE', deferred_message)
+        self.ws_connection.subscribe_event('EV_ON_CLOSE', deferred_close)
 
         #
         #   This will execute ResponseInterrupt.
         response = WebsocketResponse(self.context, self.request)
         return response
 
+    def write_message(self, message, binary=False):
+        """Sends the given message to the client of this Web Socket."""
+        self.ws_connection.write_message(message, binary)
+
 
 #----------------------------------------------------------------#
 #                   GWebsocket GClass

File ginsfsm/protocols/sockjs/server/c_websocket.py

 import logging
 
 import ginsfsm.escape
+from ginsfsm.c_timer import GTimer
+from ginsfsm.compat import binary_type
 from ginsfsm.gobj import GObj
 from ginsfsm.gmsg import (
     GMsg,
     string_to_pack,
 )
 
+STRUCT_2BYTES = struct.Struct("!H")
+STRUCT_4BYTES = struct.Struct("!Q")
+STRUCT_BB = struct.Struct("BB")
+STRUCT_BBH = struct.Struct("!BBH")
+STRUCT_BBQ = struct.Struct("!BBQ")
+
+OPCODE_CONTINUATION_FRAME = 0x0
+OPCODE_TEXT_FRAME = 0x01
+OPCODE_BINARY_FRAME = 0x02
+
+OPCODE_CONTROL_CLOSE = 0x08
+OPCODE_CONTROL_PING = 0x09
+OPCODE_CONTROL_PONG = 0x0A
+
 
 class FRAME_HEADER_S(object):
     """ Websocket header: two bytes.
 
     packer = struct.Struct('BB')
 
-    def __init__(self, opcode=0, fin=0, mask=0, frame_len=0):
-        self.fin = fin
+    def __init__(self, opcode=0, final=0, mask=0, frame_len=0):
+        self.final = final
         self.reserved_bits = 0
         self.opcode = opcode
         self.opcode_is_control = opcode & 0x8
         self.mask = mask
         self.frame_len = frame_len
-        self.payload_len = frame_len  # TODO calcula los tamaños
+        self.payload_len = 0
+        self.complete = False
+        self.error = False
+        self.to_close = False
 
-    def fmt_bin_to_line(self):
-        if self.fin:
+    def host_to_network(self):
+        if self.final:
             byte1 = self.opcode | 0x80
         else:
             byte1 = self.opcode
         trama = self.packer.pack(byte1, byte2)
         return bytearray(trama)
 
-    def fmt_line_to_bin(self, data):
+    def network_to_host(self, data):
         byte1, byte2 = self.packer.unpack(bytes(data))
 
-        self.fin = byte1 & 0x80
+        self.final = byte1 & 0x80
         self.reserved_bits = byte1 & 0x70
         self.opcode = byte1 & 0xf
         self.opcode_is_control = self.opcode & 0x8
         return self.packer.size
 
 
-def gmsg_read_msg_header(gmsg, im_server=True):
-    """ check and remove header from gmsg
+def analize_header(gmsg, iam_server):
+    """ Get and check the header from gmsg.
     """
     hd = FRAME_HEADER_S()
     data = gmsg.getdata(hd.size)
-    hd.fmt_line_to_bin(data)
-
-    if (hd.msg_id & 0xFF00) != 0x0100:
-        logging.error("gmsg_read_msg_header(): ERROR, BAD GMSG_ID %X", hd.msg_id)
-        return None
-
+    if data is None:
+        # there is no enough data
+        return hd
+    hd.network_to_host(data)
+    hd.complete = True
     if hd.reserved_bits:
         # client is using as-yet-undefined extensions; abort
-        logging.error("ERROR: using as-yet-undefined reserved_bits")
-        return None
+        hd.error = True
+        logging.error(
+            "ERROR websocket header: using as-yet-undefined reserved_bits")
+        return hd
 
-    if im_server and not hd.mask:
-        # Unmasked frame; abort
-        logging.error("ERROR: unmasked frame")
-        return None
-
-    if hd.opcode_is_control and hd.payload_len >= 126:
-        # control frames must have payload < 126; abort
-        logging.error("ERROR: too big control frame payloda")
-        return None
+    if hd.opcode_is_control:
+        #
+        #   Control opcode
+        #
+        control = hd.opcode
+        if control == OPCODE_CONTROL_CLOSE:
+            hd.to_close = True
+            pass
+        elif control == OPCODE_CONTROL_PING:
+            pass
+        elif control == OPCODE_CONTROL_PONG:
+            pass
+        if hd.payload_len >= 126:
+            # control frames must have payload < 126; abort
+            hd.error = True
+            logging.error("ERROR websocket header: too big control frame payloda")
+    else:
+        #
+        #   Data opcode
+        #
+        if iam_server and not hd.mask:
+            # Unmasked frame; abort
+            hd.error = True
+            logging.error("ERROR websocket header: iam_server but unmasked frame")
 
     return hd
 
 #----------------------------------------------------#
 #               Machine Actions
 #----------------------------------------------------#
-def ac_timeout_frame_header(self, event):
+def ac_timeout_waiting_frame_header(self, event):
     """ Timeout frame start.
-        Too much time waiting the two bytes of frame start.
+        Too much time waiting the frame header.
+    """
+    self.remove_gmsgs()
+    self.gsock.mt_drop()
+
+
+def ac_timeout_waiting_payload_data(self, event):
+    """ Timeout frame start.
+        Too much time waiting the frame header.
+    """
+    self.remove_gmsgs()
+    self.gsock.mt_drop()
+
+
+def ac_disconnected(self, event):
+    """ Parter has disconnected.
     """
 
 
-def ac_process_frame_header(self, event):
-    """ Process frame start.
+def ac_process_frame_header_start(self, event):
+    """ Process two first bytes frame start.
     """
     gmsg = self.rx_gmsg
     if not gmsg:
-        gmsg = self.rx_gmsg = GMsg(1024, 1024)  # TODO: define max sizes
-    gmsg.putdata(event.data)
+        gmsg = self.rx_gmsg = GMsg(1024)
+    if event.data:
+        gmsg.putdata(event.data)
 
     #
-    #  Read the header
+    # Have we got already a frame header struct?
     #
-    hd = gmsg_read_msg_header(gmsg)  # check and remove header from gmsg
-    if not hd:
-        gmsg_remove(gmsg)
-        self.send_event(self.connex, 'EV_DROP')
+    hd = self.frame_header
+    if hd is None:
+        #
+        # Read and analize the two first header bytes.
+        # Reject it if there is no at list 2 bytes.
+        #
+        hd = analize_header(gmsg, self.iam_server)
+        if hd.to_close or hd.error:
+            self.remove_gmsgs()
+            self.gsock.mt_drop()
+            return
+        if not hd.complete:
+            return
+        self.frame_header = hd
 
+    #
+    #  Calculate the frame lenght
+    #
     ln = hd.payload_len
+
     if ln < 126:
-        # next: read 4 bytes of masking key
-        self.frame_len = self.payload_len
+        # we've got the length.
+        # next: read 4 bytes of masking key if iam_server
+        self.frame_len = hd.payload_len
+        print ">>>> frame_len", self.frame_len
+        if self.frame_len > 0:
+            print ">>>>>>>>>> PAYLOAD GMSG"
+            self.data_gmsg = GMsg(self.frame_len)
+        if self.iam_server:
+            self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
+        else:
+            self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+
     elif ln == 126:
         # next: read 2 bytes of extended payload length
-        # next: read 4 bytes of masking key
-        self.frame_len = 0
+        # next: read 4 bytes of masking key if iam_server
+        self.set_new_state('ST_WAITING_HEADER_EXTENDED_LENGTH')
+
     elif ln == 127:
         # next: read 8 bytes of extended payload length
-        # next: read 4 bytes of masking key
-        self.frame_len = 0
+        # next: read 4 bytes of masking key if iam_server
+        self.set_new_state('ST_WAITING_HEADER_EXTENDED_LENGTH')
+
+    self.send_event(self, 'EV_RX_DATA', data=None)
+
+
+def ac_process_header_extended_length(self, event):
+    """
+    """
+    gmsg = self.rx_gmsg
+    if event.data:
+        gmsg.putdata(event.data)
+
+    hd = self.frame_header
+    ln = hd.payload_len
+    if ln == 126:
+        # read 2 bytes of extended payload length
+        # next: read 4 bytes of masking key if iam_server
+        data = gmsg.getdata(2)
+        if data is None:
+            # wait: there is no enough data
+            return
+        self.frame_len = data
+
+    elif ln == 127:
+        # read 8 bytes of extended payload length
+        # next: read 4 bytes of masking key if iam_server
+        data = gmsg.getdata(8)
+        if data is None:
+            # wait: there is no enough data
+            return
+        self.frame_len = data
+
+    else:
+        logging.error("ERROR payload length: %d", ln)
+        self.remove_gmsgs()
+        self.gsock.mt_drop()
+        return
+
+    self.data_gmsg = GMsg(self.frame_len)
+
+    if self.iam_server:
+        self.set_new_state('ST_WAITING_HEADER_MASKING_KEY')
+        self.send_event(self, 'EV_RX_DATA', data=None)
+    else:
+        data = gmsg.getdata(gmsg.bytesleft())
+        self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+        self.send_event(self, 'EV_RX_DATA', data=data)
+
+
+def ac_process_header_masking_key(self, event):
+    """ Get the 4 bytes of masking key
+    """
+    gmsg = self.rx_gmsg
+    if event.data:
+        gmsg.putdata(event.data)
+    if not self.iam_server:
+        self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+        self.send_event(self, 'EV_RX_DATA', data=None)
+        return
+
+    data = gmsg.getdata(4)
+    if data is None:
+        # wait: there is no enough data
+        return
+    self.masking_key = data
+
+    self.set_new_state('ST_WAITING_PAYLOAD_DATA')
+    self.send_event(self, 'EV_RX_DATA', data=None)
+
+
+def ac_process_payload_data(self, event):
+    """ Get payload data
+    """
+    print "GETTINNNG PAYLOAD"
+    gmsg = self.rx_gmsg
+    if event.data:
+        gmsg.putdata(event.data)
+
+    data_gmsg = self.data_gmsg
+    if data_gmsg is None:
+        # frame with lenght 0
+        print "CONTROLLLL"
+        self.flush_current_frame()
+        return
+
+    free = data_gmsg.free_space()
+    print "FREEEE", free
+    if free > 0:
+        data = gmsg.getdata(free)
+        if data is None:
+            # wait: need more data
+            return
+        data_gmsg.putdata(data)
+
+    self.flush_current_frame()
+
 
 GWEBSOCKET_FSM = {
     'event_list': (
-        'EV_RX_DATA:bottom input:top output',
-        'EV_TRANSMIT_READY:bottom input:top output',
+        'EV_ON_OPEN:top output',
+        'EV_ON_CLOSE:top output',
+        'EV_ON_MESSAGE:top output',
+        'EV_RX_DATA:bottom input',
+#        'EV_TRANSMIT_READY:bottom input',
+        'EV_DISCONNECTED:bottom input',
         'EV_SET_TIMER:bottom output',
         'EV_TIMEOUT:bottom input',
     ),
     'state_list': (
-        'ST_WAITING_FRAME_HEADER',
+        'ST_WAITING_HEADER',
+        'ST_WAITING_HEADER_EXTENDED_LENGTH',
+        'ST_WAITING_HEADER_MASKING_KEY',
+        'ST_WAITING_PAYLOAD_DATA',
     ),
     'machine': {
-        'ST_WAITING_FRAME_HEADER':
+        'ST_WAITING_HEADER':
         (
-            ('EV_TIMEOUT',        ac_timeout_frame_header,       None),
-            ('EV_RX_DATA',        ac_process_frame_header,       None),
+            ('EV_DISCONNECTED', ac_disconnected,                    None),
+            ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
+            ('EV_RX_DATA',      ac_process_frame_header_start,      None),
+        ),
+
+        'ST_WAITING_HEADER_EXTENDED_LENGTH':
+        (
+            ('EV_DISCONNECTED', ac_disconnected,                    None),
+            ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
+            ('EV_RX_DATA',      ac_process_header_extended_length,  None),
+        ),
+        'ST_WAITING_HEADER_MASKING_KEY':
+        (
+            ('EV_DISCONNECTED', ac_disconnected,                    None),
+            ('EV_TIMEOUT',      ac_timeout_waiting_frame_header,    None),
+            ('EV_RX_DATA',      ac_process_header_masking_key,      None),
+        ),
+
+        'ST_WAITING_PAYLOAD_DATA':
+        (
+            ('EV_DISCONNECTED', ac_disconnected,                    None),
+            ('EV_TIMEOUT',      ac_timeout_waiting_payload_data,    None),
+            ('EV_RX_DATA',      ac_process_payload_data,            None),
         ),
     }
 }
 GWEBSOCKET_GCONFIG = {
     'gsock': [None, None, 0, None, "GSock connection."],
     'request': [None, None, 0, None, "websocket request."],
+    'iam_server': [bool, True, 0, None, "What side? server or client."],
+    'subscriber': [None, None, 0, None,
+        "subcriber of all output-events."
+        "Default is ``None``, i.e., the parent"
+    ],
 }
 
 
               * ``data``: sample event attribute.
 
     """
+
     KEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
 
     def __init__(self):
         GObj.__init__(self, GWEBSOCKET_FSM, GWEBSOCKET_GCONFIG)
-        self.current_frame = None
+        self.frame_header = None
         self.rx_gmsg = None
+        self.data_gmsg = None
+        self.masking_key = None
 
     def start_up(self):
         """ Initialization zone.
         """
-        self.execute()
+        if self.subscriber is None:
+            self.subscriber = self.parent
+        self.subscribe_event(None, self.subscriber)
+        if self.gsock:
+            # We've got already the connection
+            self.execute()
+        # Setup the timers
+        self.inactivity_timer = self.create_gobj(
+            None,
+            GTimer,
+            self,
+            timeout_event_name='EV_TIMEOUT'
+        )
+        self.start_inactivity_timer()
+
+    def start_inactivity_timer(self, seconds=5):
+        self.send_event(
+            self.inactivity_timer,
+            'EV_SET_TIMER',
+            seconds=seconds
+        )
+
+    def stop_inactivity_timer(self):
+        self.send_event(
+            self.inactivity_timer,
+            'EV_SET_TIMER',
+            seconds=-1
+        )
 
     def execute(self):
         # Websocket only supports GET method
             self.gsock.mt_drop()
             return
 
-        # Connection header should be upgrade. Some proxy servers/load balancers
+        # Connection header should be upgrade.
+        # Some proxy servers/load balancers
         # might mess with it.
         headers = self.request.headers
         connection = map(lambda s: s.strip().lower(),
             selected = self.select_subprotocol(subprotocols)
             if selected:
                 assert selected in subprotocols
-                subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % selected
+                subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % (
+                    selected)
 
         self.gsock.mt_send_data(ginsfsm.escape.utf8(
             "HTTP/1.1 101 Switching Protocols\r\n"
         #
         #   Subscribe all gsock events.
         #
-        self.gsock.delete_all_subscriptions()
+        self.gsock.delete_all_subscriptions()  # TODO: igual borro delete http
         self.gsock.subscribe_event(None, self)
         # TODO: mira interactuar con los timers del canal http.
+        #self.broadcast_event('EV_ON_OPEN')
+        self.gaplic.add_callback(self.broadcast_event, 'EV_ON_OPEN')
 
     def select_subprotocol(self, subprotocols):
         """Invoked when a new WebSocket requests specific subprotocols.
         proposed subprotocols was selected.
         """
         return None
+
+    def remove_gmsgs(self):
+        self.remove_header_gmsg()
+        self.remove_data_gmsg()
+
+    def remove_header_gmsg(self):
+        if self.rx_gmsg:
+            gmsg_remove(self.rx_gmsg)
+            self.rx_gmsg = None
+
+    def remove_data_gmsg(self):
+        if self.data_gmsg:
+            gmsg_remove(self.data_gmsg)
+            self.data_gmsg = None
+
+    def flush_current_frame(self):
+        hd = self.frame_header
+        print "==============> OPCODE", hd.opcode
+        self.frame_header = None
+
+        gmsg = self.data_gmsg
+        if gmsg:
+            gmsg.reset_rd()
+            ln = gmsg.bytesleft()
+            unmasked = gmsg.getdata(ln)
+            mask = self.masking_key
+            for i in xrange(ln):
+                unmasked[i] = unmasked[i] ^ mask[i % 4]
+
+            if hd.final:
+                print "<====== MESSAGE", unmasked
+                self.handle_message(hd.opcode, unmasked)
+
+        self.remove_data_gmsg()
+        self.set_new_state('ST_WAITING_HEADER')
+        self.post_event(self, 'EV_RX_DATA', data=None)
+
+    def handle_message(self, opcode, data):
+        if opcode == OPCODE_TEXT_FRAME or opcode == OPCODE_BINARY_FRAME:
+            self.gaplic.add_callback(
+                self.broadcast_event,
+                'EV_ON_MESSAGE',
+                data=data
+            )
+        elif opcode == OPCODE_CONTROL_CLOSE:
+            self.gsock.mt_drop()
+        elif opcode == OPCODE_CONTROL_PING:
+            pass
+        elif opcode == OPCODE_CONTROL_PONG:
+            pass
+        else:
+            pass
+
+    def write_message(self, message, binary=False):
+        """Sends the given message to the client of this Web Socket."""
+        if binary:
+            opcode = 0x2
+        else:
+            opcode = 0x1
+        message = ginsfsm.escape.utf8(message)
+        assert isinstance(message, binary_type)
+        self._write_frame(True, opcode, message)
+
+    def _write_frame(self, final, opcode, data):
+        if final:
+            finbit = opcode | 0x80
+        else:
+            finbit = opcode
+
+        l = len(data)
+        if l < 126:
+            frame = STRUCT_BB.pack(finbit, l)
+        elif l <= 0xFFFF:
+            frame = STRUCT_BBH.pack(finbit, 126, l)
+        else:
+            frame = STRUCT_BBQ.pack(finbit, 127, l)
+        frame += data
+        self.gsock.mt_send_data(frame)