Jeffrey Gelens avatar Jeffrey Gelens committed ff262b8 Merge

Merged

Comments (0)

Files changed (5)

 syntax: glob
 
-.pyc
+*.pyc
+*.sw?
+gevent_websocket.egg-info/*
 tests/testresults.sqlite3
Add a comment to this file

geventwebsocket/__init__.py

File contents unchanged.

geventwebsocket/handler.py

+import base64
 import re
 import struct
 from hashlib import md5, sha1
         super(WebSocketHandler, self).__init__(*args, **kwargs)
 
     def handle_one_response(self, call_wsgi_app=True):
+        if self.environ.get("HTTP_ORIGIN"):
+            self._handle_one_legacy_response()
+        elif self.environ.get("HTTP_SEC_WEBSOCKET_VERSION"):
+            version = int(self.environ.get("HTTP_SEC_WEBSOCKET_VERSION"))
+            if version is 7:
+                if not self._handle_one_version7_response():
+                    return
+            else:
+                return
+        else:
+            # not a valid websocket request
+            return super(WebSocketHandler, self).handle_one_response()
+
+        if call_wsgi_app:
+            return self.application(self.environ, self.start_response)
+        else:
+            return
+
+    def _close_connection(self, reason=None):
+        # based on gevent/pywsgi.py
+        # see http://pypi.python.org/pypi/gevent#downloads
+
+        if reason:
+            print "Closing the connection because %s!" % reason
+        if self.socket is not None:
+            try:
+                self.socket._sock.close()
+                self.socket.close()
+            except socket.error:
+                pass
+
+    def _handle_one_version7_response(self):
+        environ = self.environ
+
+        protocol, version = self.request_version.split("/")
+        key = environ.get("HTTP_SEC_WEBSOCKET_KEY")
+
+        # check client handshake for validity
+        if not environ.get("REQUEST_METHOD") == "GET":
+            # 5.2.1 (1)
+            self._close_connection()
+            return False
+        elif not protocol == "HTTP":
+            # 5.2.1 (1)
+            self._close_connection()
+            return False
+        elif float(version) < 1.1:
+            # 5.2.1 (1)
+            self._close_connection()
+            return False
+        # XXX: nobody seems to set SERVER_NAME correctly. check the spec
+        #elif not environ.get("HTTP_HOST") == environ.get("SERVER_NAME"):
+            # 5.2.1 (2)
+            #self._close_connection()
+            #return False
+        elif not key:
+            # 5.2.1 (3)
+            self._close_connection()
+            return False
+        elif len(base64.b64decode(key)) != 16:
+            # 5.2.1 (3)
+            self._close_connection()
+            return False
+
+        #TODO: compare Sec-WebSocket-Origin against self.allowed_paths
+
+        self.websocket_connection = True
+        self.websocket = WebSocketVersion7(self.socket, self.rfile, self.environ)
+        self.environ['wsgi.websocket'] = self.websocket
+
+        headers = [
+            ("Upgrade", "websocket"),
+            ("Connection", "Upgrade"),
+            ("Sec-WebSocket-Accept", base64.b64encode(sha1(key + self.GUID).digest())),
+        ]
+        self.start_response("101 Switching Protocols", headers)
+        return True
+
+    def _handle_one_legacy_response(self):
         # In case the client doesn't want to initialize a WebSocket connection
         # we will proceed with the default PyWSGI functionality.
         print self.environ.get("HTTP_CONNECTION", "").lower().split(",")
         self.init_websocket()
         self.environ['wsgi.websocket'] = self.websocket
 
-        if call_wsgi_app:
-            return self.application(self.environ, self.start_response)
+        # Detect the Websocket protocol
+        if "HTTP_SEC_WEBSOCKET_KEY1" in self.environ:
+            version = 76
         else:
-            return
+            version = 75
 
-    def init_websocket(self):
-        version = self.environ.get("HTTP_SEC_WEBSOCKET_VERSION")
-        print "VERSION", version
+        if version == 75:
+            headers = [
+                ("Upgrade", "WebSocket"),
+                ("Connection", "Upgrade"),
+                ("WebSocket-Origin", self.websocket.origin),
+                ("WebSocket-Protocol", self.websocket.protocol),
+                ("WebSocket-Location", "ws://" + self.environ.get('HTTP_HOST') + self.websocket.path),
+            ]
+            self.start_response("101 Web Socket Protocol Handshake", headers)
+        elif version == 76:
+            challenge = self._get_challenge()
+            headers = [
+                ("Upgrade", "WebSocket"),
+                ("Connection", "Upgrade"),
+                ("Sec-WebSocket-Origin", self.websocket.origin),
+                ("Sec-WebSocket-Protocol", self.websocket.protocol),
+                ("Sec-WebSocket-Location", "ws://" + self.environ.get('HTTP_HOST') + self.websocket.path),
+            ]
 
-        if self.environ.get("HTTP_ORIGIN"):
-            print "OLD ", version
-            self.websocket = WebSocketLegacy(self.socket, self.rfile, self.environ)
+            self.start_response("101 Web Socket Protocol Handshake", headers)
+            self.write(challenge)
+        else:
+            raise Exception("Version not supported")
 
-            if "HTTP_SEC_WEBSOCKET_KEY1" in self.environ:
-                self._handshake_hybi00()
-            else:
-                self._handshake_hixie75()
-
-            return True
-        else:
-            print "NEW ", version
-            self.websocket = WebSocketVersion7(self.socket, self.rfile, self.environ)
-
-            if int(version) in self.SUPPORTED_VERSIONS:
-                self._handshake_version7()
-                return True
-            else:
-                pass
-                # TODO: not support by websockets yet
-                #headers = [
-                #    ("Sec-WebSocket-Version", self.SUPPORTED_VERSION),
-                #]
-                #self.start_response("400 Bad Request", headers)
-                #self._close_connection()
-
-
-    def _handshake_hixie75(self):
-        headers = [
-            ("Upgrade", "WebSocket"),
-            ("Connection", "Upgrade"),
-            ("WebSocket-Origin", self.websocket.origin),
-            ("WebSocket-Protocol", self.websocket.protocol),
-            ("WebSocket-Location", "ws://" + self.environ.get('HTTP_HOST') + self.websocket.path),
-        ]
-        self.start_response("101 Web Socket Protocol Handshake", headers)
-
-    def _handshake_hybi00(self):
-        challenge = self._get_challenge_hybi00()
-
-        headers = [
-            ("Upgrade", "WebSocket"),
-            ("Connection", "Upgrade"),
-            ("Sec-WebSocket-Origin", self.websocket.origin),
-            ("Sec-WebSocket-Protocol", self.websocket.protocol),
-            ("Sec-WebSocket-Location", "ws://" + self.environ.get('HTTP_HOST') + self.websocket.path),
-        ]
-
-        self.start_response("101 Web Socket Protocol Handshake", headers)
-        self.write(challenge)
-
-    def _handshake_version7(self):
-        environ = self.environ
-
-        protocol, version = self.request_version.split("/")
-        key = environ.get("HTTP_SEC_WEBSOCKET_KEY").replace(" ", "")
-
-        print key
-
-        # check client handshake for validity
-        if not environ.get("REQUEST_METHOD") == "GET":
-            # 5.2.1 (1)
-            self._close_connection()
-            return False
-        elif not protocol == "HTTP":
-            # 5.2.1 (1)
-            self._close_connection()
-            return False
-        elif float(version) < 1.1:
-            # 5.2.1 (1)
-            self._close_connection()
-            return False
-        # XXX: nobody seems to set SERVER_NAME correctly. check the spec
-        #elif not environ.get("HTTP_HOST") == environ.get("SERVER_NAME"):
-             # 5.2.1 (2)
-             #self._close_connection()
-             #return False
-        elif not key:
-            # 5.2.1 (3)
-            self._close_connection()
-            return False
-        elif len(b64decode(key)) != 16:
-            # 5.2.1 (3)
-            self._close_connection()
-            return False
-
-        headers = [
-            ("Upgrade", "websocket"),
-            ("Connection", "Upgrade"),
-            ("Sec-WebSocket-Accept", b64encode(sha1(key + self.GUID).digest())),
-        ]
-        self.start_response("101 Switching Protocols", headers)
-        print headers
-        return True
-
-
-
-    def _handshake_hybi06(self):
-        raise Exception("Version not yet supported")
-        challenge = self._get_challange_hybi06()
-        headers = [
-            ("Upgrade", "WebSocket"),
-            ("Connection", "Upgrade"),
-            ("Sec-WebSocket-Accept", challenge),
-        ]
-        self.start_response("101 Switching Protocols", headers)
-        self.write(challenge)
-
-    def _close_connection(self, reason=None):
-        # based on gevent/pywsgi.py
-        # see http://pypi.python.org/pypi/gevent#downloads
-
-        if reason:
-            print "Closing the connection because %s!" % reason
-        if self.socket is not None:
-            try:
-                self.socket._sock.close()
-                self.socket.close()
-            except socket.error:
-                pass
-
-
-    def upgrade_allowed(self):
+    def accept_upgrade(self):
         """
         Returns True if request is allowed to be upgraded.
         If self.allowed_paths is non-empty, self.environ['PATH_INFO'] will

geventwebsocket/websocket.py

 import struct
 
 
+import struct
+
 class WebSocket(object):
     pass
 
     MASK = int("10000000", 2)
     PAYLOAD = int("01111111", 2)
 
-    OPCODE_FRAG = 0x0
+    OPCODE_CONTINUATION = 0x0
     OPCODE_TEXT = 0x1
     OPCODE_BINARY = 0x2
     OPCODE_CLOSE = 0x8
         """
 
         while True:
-            payload = ''
             if self.websocket_closed:
                 return None
 
-            opcode_octet, length_octet = struct.unpack('!BB', self._read_from_socket(2))
+            payload = ""
+            first_byte, second_byte = struct.unpack('!BB', self._read_from_socket(2))
 
-            if self.RSV & opcode_octet:
-                self.close(self.REASON_PROTOCOL_ERROR, 'Reserved bits cannot be set')
-                return None
+            fin = (first_byte >> 7) & 1
+            rsv1 = (first_byte >> 6) & 1
+            rsv2 = (first_byte >> 5) & 1
+            rsv3 = (first_byte >> 4) & 1
+            opcode = first_byte & 0xf
 
-            opcode = opcode_octet & self.OPCODE
-            is_final_frag = (self.FIN & opcode_octet) != 0
+            # frame-fin = %x0 ; more frames of this message follow
+            #           / %x1 ; final frame of this message
+            if fin not in (0, 1):
+                raise ProtocolException("")
 
-            if self._is_opcode_invalid(opcode):
-                self.close(self.REASON_PROTOCOL_ERROR, 'Invalid opcode %x' % opcode)
-                return None
+            # frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+            # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+            # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
+            if rsv1 or rsv2 or rsv3:
+                raise ProtocolException('Reserved bits cannot be set')
 
-            if not is_final_frag and self.OPCODE_CLOSE <= opcode <= self.OPCODE_PONG:
-                self.close(self.REASON_PROTOCOL_ERROR, 'Control frames cannot be fragmented')
-                return None
+            #if self._is_invalid_opcode(opcode):
+            #    raise ProtocolException('Invalid opcode %x' % opcode)
 
-            if len(self._fragments) > 0 and not is_final_frag and opcode != self.OPCODE_FRAG:
-                self.close(self.REASON_PROTOCOL_ERROR,
-                        'Received new fragment frame with non-zero opcode')
-                return None
+            # control frames cannot be fragmented
+            if opcode > 0x7 and fin == 0:
+                raise ProtocolException('Control frames cannot be fragmented')
 
-            if len(self._fragments) > 0 and is_final_frag and (
-                    self.OPCODE_TEXT <= opcode <= self.OPCODE_BINARY):
-                self.close(self.REASON_PROTOCOL_ERROR,
-                        'Received new unfragmented data frame during fragmented message')
-                return None
+            #if len(self._fragments) > 0 and not is_fin and opcode != self.OPCODE_FRAG:
+            #    self.close(self.REASON_PROTOCOL_ERROR,
+            #            'Received new fragment frame with non-zero opcode')
 
-            if not self.MASK & length_octet:
-                self.close(self.REASON_PROTOCOL_ERROR, 'MASK must be set')
-                return None
+            #if len(self._fragments) > 0 and is_fin and (
+            #        self.OPCODE_TEXT <= opcode <= self.OPCODE_BINARY):
+            #    self.close(self.REASON_PROTOCOL_ERROR,
+            #            'Received new unfragmented data frame during fragmented message')
 
-            length_code = length_octet & self.PAYLOAD
+            mask = (second_byte >> 7) & 1
+            payload_length = (second_byte) & 0x7f
 
-            if length_code >= self.LEN_16 and (self.OPCODE_CLOSE <= opcode <= self.OPCODE_PONG):
-                self.close(self.REASON_PROTOCOL_ERROR,
-                        'Control frame payload cannot be larger than 125 bytes')
-                return None
+            #if not self.MASK & length_octet:
+            #    self.close(self.REASON_PROTOCOL_ERROR, 'MASK must be set')
 
-            if length_code < self.LEN_16:
-                length = length_code
-            elif length_code == self.LEN_16:
+            #length_code = length_octet & self.PAYLOAD
+
+            # Control frames MUST have a payload length of 125 bytes or less
+            if opcode > 0x7 and payload_length > 125:
+                raise FrameTooLargeException("Control frame payload cannot be larger than 125 bytes")
+
+            if payload_length < 126:
+                length = payload_length
+            if payload_length == 126:
                 length = struct.unpack('!H', self._read_from_socket(2))[0]
-            elif length_code == self.LEN_64:
+            elif payload_length == 127:
                 length = struct.unpack('!Q', self._read_from_socket(8))[0]
             else:
-                raise Exception('Calculated invalid length')
+                raise ProtocolException('Calculated invalid length')
 
-            mask_octets = struct.unpack('!BBBB', self._read_from_socket(4))
-            masked_payload = self._read_from_socket(length)
+            payload = ""
 
-            payload = ''
+            if mask:
+                masking_key = struct.unpack('!BBBB', self._read_from_socket(4))
+                masked_payload = self._read_from_socket(length)
 
-            j = 0
-            for c in masked_payload:
-                # TODO: optimize me? http://www.skymind.com/~ocrow/python_string/
-                payload += chr(ord(c) ^ mask_octets[j])
-                j = (j + 1) % 4
+                masked_payload = bytearray(masked_payload)
+                key = map(ord, masking_key)
+
+                for i in range(len(masked_payload)):
+                    masked_payload[i] = masked_payload[i] ^ key[i%4]
+
+                payload = masked_payload
+
 
             if opcode == self.OPCODE_TEXT:
-                payload = payload.decode('utf-8')
+                self._fragments.append(payload.decode("utf-8", "replace"))
+
+            elif opcode == self.OPCODE_BINARY:
+                self._fragments.append(payload)
+
+            elif opcode == self.OPCODE_CONTINUATION:
+                if len(self._fragments) != 0:
+                    raise ProtocolException("Cannot continue a non started message")
+
+            if opcode == self.OPCODE_TEXT:
+                    self._fragments.append(payload.decode("utf-8", "replace"))
+                else:
+                    self._fragments.append(payload)
+
             elif opcode == self.OPCODE_CLOSE:
                 if length >= 2:
                     reason, message = struct.unpack('!H%ds' % (length - 2), payload)
                     return (self.OPCODE_CLOSE, (reason, message))
                 else:
                     return None
+
             elif opcode == self.OPCODE_PING:
                 self.send(payload, opcode=self.OPCODE_PONG)
+
                 if not self.compatibility_mode:
                     return (self.OPCODE_PING, payload)
                 else:
                     continue
+
             elif opcode == self.OPCODE_PONG:
                 if not self.compatibility_mode:
                     return (self.OPCODE_PONG, payload)
                 else:
                     continue
 
-            if is_final_frag:
+            if fin == 1:
                 if len(self._fragments) > 0:
-                    opcode = self._original_opcode
-                    self._original_opcode = -1
-                    payload = ''.join(self._fragments) + payload
+                    msg = ''.join(self._fragments)
                     self._fragments = []
                 if not self.compatibility_mode:
-                    return (opcode, payload)
+                    return (opcode, msg)
                 else:
-                    return payload
-            else:
-                if len(self._fragments) == 0:
-                    self._original_opcode = opcode
-                self._fragments.append(payload)
+                    return msg
 
     def _encode_text(self, s):
         if isinstance(s, unicode):
             # this can't really happen, but for correctness sake...
             raise Exception('Message is too long')
 
-        print preamble, message
-
         self.socket.sendall(preamble + message)
 
     def close(self, reason, message):

tests/test__websocket.py

 from gevent import monkey
 monkey.patch_all(thread=False)
 
+import base64
+import struct
 import sys
 import greentest
 import gevent
+import gevent.local
 from gevent import socket
 from geventwebsocket.handler import WebSocketHandler
-
+from geventwebsocket.websocket import WebSocketVersion7
 
 CONTENT_LENGTH = 'Content-Length'
 CONN_ABORTED_ERRORS = []
 
 socket.socket.makefile = makefile
 
+import gevent.coros
 class TestCase(greentest.TestCase):
     __timeout__ = 5
+    _testlock = gevent.coros.Semaphore(1)
 
     def get_wsgi_module(self):
         from gevent import pywsgi
         return pywsgi
 
     def init_server(self, application):
-        self.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0),
+        self.local = gevent.local.local()
+        self.local.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0),
             application, handler_class=WebSocketHandler)
 
     def setUp(self):
         application = self.application
         self.init_server(application)
-        self.server.start()
-        self.port = self.server.server_port
+        self.local.server.start()
+        self.port = self.local.server.server_port
         greentest.TestCase.setUp(self)
 
-
     def tearDown(self):
         greentest.TestCase.tearDown(self)
         timeout = gevent.Timeout.start_new(0.5)
         try:
-            self.server.stop()
+            self.local.server.stop()
         finally:
             timeout.cancel()
 
     def connect(self):
         return socket.create_connection(('127.0.0.1', self.port))
 
-
 class TestWebSocket(TestCase):
     message = "\x00Hello world\xff"
 
 
         fd.close()
 
+class TestWebSocketVersion7(TestCase):
+
+    GOOD_HEADERS = "" \
+        "GET /echo HTTP/1.1\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+    def application(self, environ, start_response):
+        if environ['PATH_INFO'] == "/echo":
+            try:
+                ws = environ['wsgi.websocket']
+                self.ws = ws
+                ws.compatibility_mode = False
+            except KeyError:
+                start_response("400 Bad Request", [])
+                return []
+
+            while not ws.websocket_closed:
+                gevent.sleep()
+            self.close_connection = True
+            return None
+
+    def test_bad_handshake_method(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "POST /echo HTTP/1.1\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd, code=101, reason="Web Socket Protocol Handshake")
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with bad method"
+        fd.close()
+
+    def test_bad_handshake_version(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "GET /echo HTTP/1.0\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd)
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with bad version"
+        fd.close()
+
+    #XXX: this tests the check for SERVER_NAME != HTTP_HOST, which is commented
+    #     out until I figure out why SERVER_NAME is not set in pyramid.
+    """
+    def test_bad_handshake_host(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "GET /echo HTTP/1.1\r\n" \
+        "Host: example.com\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd)
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with bad Host"
+        fd.close()
+    """
+
+    def test_bad_handshake_no_key(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "GET /echo HTTP/1.1\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd)
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with no Sec-WebSocket-Key"
+        fd.close()
+
+    def test_bad_handshake_short_key(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "GET /echo HTTP/1.1\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: " + base64.b64encode('too short') + "\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd)
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with key that is too short"
+        fd.close()
+
+    def test_bad_handshake_long_key(self):
+        fd = self.connect().makefile(bufsize=1)
+        closed = False
+        headers = "" \
+        "GET /echo HTTP/1.1\r\n" \
+        "Host: localhost\r\n" \
+        "Upgrade: WebSocket\r\n" \
+        "Connection: Upgrade\r\n" \
+        "Sec-WebSocket-Key: " + base64.b64encode('too long. too long. too long') + "\r\n" \
+        "Sec-WebSocket-Origin: http://localhost\r\n" \
+        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
+        "Sec-WebSocket-Version: 7\r\n" \
+        "\r\n"
+
+        fd.write(headers)
+        try:
+            response = read_http(fd)
+        except ConnectionClosed:
+            closed = True
+
+        assert closed, "Failed to abort connection with key that is too long"
+        fd.close()
+
+    def test_good_handshake(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        response = read_http(fd, code=101, reason="Switching Protocols")
+        response.assertHeader("Upgrade", "websocket")
+        response.assertHeader("Connection", "Upgrade")
+        response.assertHeader("Sec-WebSocket-Accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=")
+
+        fd.close();
+
+    def test_send_short_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason="Switching Protocols")
+
+        msg = 'Hello, websocket'
+        self.ws.send(msg)
+
+        preamble = fd.read(2)
+        opcode, length = struct.unpack('!BB', preamble)
+
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
+
+        rxd_msg = fd.read(length).decode('utf-8', 'replace')
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close()
+
+    def test_send_medium_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason="Switching Protocols")
+
+        msg = 'Hello, websocket' * 8
+        self.ws.send(msg)
+
+        preamble = fd.read(4)
+        opcode, length_code, length = struct.unpack('!BBH', preamble)
+
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
+        assert (length_code & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length_code == 126, 'The length code must be 126'
+        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
+
+        rxd_msg = fd.read(length).decode('utf-8', 'replace')
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close()
+
+    def test_send_long_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason="Switching Protocols")
+
+        msg = 'Hello, websocket' * 4098
+        self.ws.send(msg)
+
+        preamble = fd.read(10)
+        opcode, length_code, length = struct.unpack('!BBQ', preamble)
+
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
+        assert (length_code & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length_code == 127, 'The length code must be 127'
+        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
+
+        rxd_msg = fd.read(length).decode('utf-8', 'replace')
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close()
+
+    def test_binary_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason="Switching Protocols")
+
+        msg = struct.pack('!BHB', 129, 23, 42)
+        self.ws.send(msg, opcode=WebSocketVersion7.OPCODE_BINARY)
+
+        frame = fd.read(6)
+        opcode, length, first, second, third = struct.unpack('!BBBHB', frame)
+
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 2, 'Opcode must be 0x2'
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == 4, 'Wrong length %d, expected 4' % length
+        assert first == 129, 'Expected first value to be 129, but got %d' % first
+        assert second == 23, 'Expected second value to be 23, but got %d' % second
+        assert third == 42, 'Expected third value to be 42, but got %d' % third
+
+        fd.close()
+
+    def test_wait_bad_framing_reserved_bits(self):
+        for reserved_bits in xrange(1, 8):
+            fd = self.connect().makefile(bufsize=1)
+
+            fd.write(self.GOOD_HEADERS)
+            read_http(fd, code=101, reason='Switching Protocols')
+
+            expected_msg = 'Reserved bits cannot be set'
+
+            bad_opcode = \
+                    WebSocketVersion7.FIN | (reserved_bits << 4) | WebSocketVersion7.OPCODE_TEXT
+            fd.write(struct.pack('!BB', bad_opcode, int('10000000', 2)))
+
+            frame = self.ws.wait()
+            assert self.ws.websocket_closed, \
+                    'Failed to close connection when sent a frame with reserved bits set'
+
+            preamble = fd.read(2)
+
+            opcode, length = struct.unpack('!BB', preamble)
+            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+            reason = fd.read(2)
+            reason = struct.unpack('!H', reason)[0]
+            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+            fd.close();
+
+    def test_wait_bad_opcode(self):
+        bad_opcodes = range(WebSocketVersion7.OPCODE_BINARY + 1, WebSocketVersion7.OPCODE_CLOSE)
+        bad_opcodes += range(WebSocketVersion7.OPCODE_PONG + 1, 2**4)
+        for bad_opcode in bad_opcodes:
+            fd = self.connect().makefile(bufsize=1)
+
+            fd.write(self.GOOD_HEADERS)
+            read_http(fd, code=101, reason='Switching Protocols')
+
+            expected_msg = 'Invalid opcode %x' % bad_opcode
+
+            bad_opcode = WebSocketVersion7.FIN | bad_opcode
+            fd.write(struct.pack('!BB', bad_opcode, int('10000000', 2)))
+
+            frame = self.ws.wait()
+            assert self.ws.websocket_closed, \
+                    'Failed to close connection when sent a frame with unsupported opcode'
+
+            preamble = fd.read(2)
+
+            opcode, length = struct.unpack('!BB', preamble)
+            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+            reason = fd.read(2)
+            reason = struct.unpack('!H', reason)[0]
+            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+            fd.close();
+
+    def test_wait_no_mask(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = 'MASK must be set'
+
+        fd.write(struct.pack('!BB',
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, int('00000000', 2)))
+
+        frame = self.ws.wait()
+        assert self.ws.websocket_closed, \
+                'Failed to close connection when sent a frame with MASK not set'
+
+        preamble = fd.read(2)
+
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+        reason = fd.read(2)
+        reason = struct.unpack('!H', reason)[0]
+        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def _get_payload(self, mask, msg):
+        mask_octets = struct.unpack('!BBBB', struct.pack('!L', mask))
+        msg = unicode(msg).encode('utf-8')
+        result = ''
+        j = 0
+        for c in msg:
+            result += chr(ord(c) ^ mask_octets[j])
+            j = (j + 1) % 4
+        return result
+
+    def test_wait_short_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket'
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_med_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket' * 8
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBHL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | 126, length, mask, encoded_msg))
+
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_long_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket' * 8 #4098
+        mask = 42
+        encoded_msg = self._get_payload(42, msg)
+        length = len(encoded_msg)
+        payload = struct.pack('!BBQL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | 127, length, mask, encoded_msg)
+        fd.write(payload)
+
+        self.ws._waiter = 'test'
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_control_frames_of_unusual_size(self):
+        opcodes = [WebSocketVersion7.OPCODE_CLOSE,
+                WebSocketVersion7.OPCODE_PING,
+                WebSocketVersion7.OPCODE_PONG]
+        expected_msg = 'Control frame payload cannot be larger than 125 bytes'
+        for test_opcode in opcodes:
+            msg = ''
+            mask = 0x42424242
+            encoded_msg = self._get_payload(mask, msg)
+
+            fd = self.connect().makefile(bufsize=1)
+
+            fd.write(self.GOOD_HEADERS)
+            read_http(fd, code=101, reason='Switching Protocols')
+
+            fd.write(struct.pack('!BBHL',
+                WebSocketVersion7.FIN | test_opcode, 
+                WebSocketVersion7.MASK | 126, 0, mask))
+
+            frame = self.ws.wait()
+            assert self.ws.websocket_closed, \
+                    'Failed to close connection when sent a frame of unusual size'
+
+            preamble = fd.read(2)
+
+            opcode, length = struct.unpack('!BB', preamble)
+            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+            reason = fd.read(2)
+            reason = struct.unpack('!H', reason)[0]
+            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+            fd.close();
+
+            fd = self.connect().makefile(bufsize=1)
+
+            fd.write(self.GOOD_HEADERS)
+            read_http(fd, code=101, reason='Switching Protocols')
+
+            fd.write(struct.pack('!BBQL',
+                WebSocketVersion7.FIN | test_opcode, 
+                WebSocketVersion7.MASK | 127, length, mask))
+
+            frame = self.ws.wait()
+            assert self.ws.websocket_closed, \
+                    'Failed to close connection when sent a frame of unusual size'
+
+            preamble = fd.read(2)
+
+            opcode, length = struct.unpack('!BB', preamble)
+            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+            reason = fd.read(2)
+            reason = struct.unpack('!H', reason)[0]
+            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+            fd.close();
+
+    def test_wait_close_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket'
+        mask = 0x42424242
+        reason = 1000
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg) + 2
+        fd.write(struct.pack('!BBLH%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_CLOSE,
+            WebSocketVersion7.MASK | length, mask, reason ^ (mask >> 16), encoded_msg))
+
+        rxd_msg = self.ws.wait()
+        assert self.ws.websocket_closed, 'Did not close connection when sent a close frame'
+        assert rxd_msg == (WebSocketVersion7.OPCODE_CLOSE, (reason, msg)), \
+                'Wrong result "%s"' % (rxd_msg,)
+
+        preamble = fd.read(2)
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_CLOSE, \
+                'Opcode must be %x' % WebSocketVersion7.OPCODE_CLOSE
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == 2, 'The payload length must be 2, but got %d' % length
+        
+        reason = struct.unpack('!H', fd.read(2))[0]
+        assert reason == 1000, 'The reason must be 1000, but got %d' % reason
+
+        fd.close();
+
+    def test_wait_close_frame_no_payload(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = ''
+        mask = 0x42424242
+        encoded_msg = self._get_payload(mask, msg)
+        length = 0
+        fd.write(struct.pack('!BBL',
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_CLOSE,
+            WebSocketVersion7.MASK | length, mask))
+
+        rxd_msg = self.ws.wait()
+        assert self.ws.websocket_closed, \
+                'Did not close connection when sent a close frame with no payload'
+        assert rxd_msg == (WebSocketVersion7.OPCODE_CLOSE, (None, None)), \
+                'Wrong result "%s"' % (rxd_msg,)
+
+        preamble = fd.read(2)
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_CLOSE, \
+                'Opcode must be %x' % WebSocketVersion7.OPCODE_CLOSE
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == 2, 'The payload length must be 2, but got %d' % length
+        
+        reason = struct.unpack('!H', fd.read(2))[0]
+        assert reason == WebSocketVersion7.REASON_NORMAL, \
+                'The reason must be 1000, but got %d' % reason
+
+        fd.close();
+
+    def test_wait_ping_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket'
+        mask = 0x42424242
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PING,
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a ping frame'
+        assert rxd_msg == (WebSocketVersion7.OPCODE_PING, msg.encode('utf-8')), \
+                'Wrong result "%s"' % (rxd_msg,)
+
+        preamble = fd.read(2)
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_PONG, \
+                'Opcode must be %x' % WebSocketVersion7.OPCODE_PONG
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == len(encoded_msg), 'Wrong payload length. Got %d' % length
+
+        txd_msg = fd.read(length)
+        assert txd_msg == msg.encode('utf-8'), 'Wrong message "%s", expected "%s"' % (txd_msg, msg)
+
+        fd.close();
+
+    def test_wait_pong_frame(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        msg = 'Hello, websocket'
+        mask = 0x42424242
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PONG,
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a pong frame'
+        assert rxd_msg == (WebSocketVersion7.OPCODE_PONG, msg.encode('utf-8')), \
+                'Wrong result "%s"' % (rxd_msg,)
+
+        fd.close();
+
+    def test_wait_fragmented_control_frame(self):
+        opcodes = [WebSocketVersion7.OPCODE_CLOSE,
+                WebSocketVersion7.OPCODE_PING,
+                WebSocketVersion7.OPCODE_PONG]
+        expected_msg = 'Control frames cannot be fragmented'
+        for test_opcode in opcodes:
+            msg = ''
+            mask = 0x42424242
+            encoded_msg = self._get_payload(mask, msg)
+
+            fd = self.connect().makefile(bufsize=1)
+
+            fd.write(self.GOOD_HEADERS)
+            read_http(fd, code=101, reason='Switching Protocols')
+
+            fd.write(struct.pack('!BBL', test_opcode, WebSocketVersion7.MASK | 0, mask))
+
+            frame = self.ws.wait()
+            assert self.ws.websocket_closed, \
+                    'Failed to close connection when sent a fragmented control frame'
+
+            preamble = fd.read(2)
+
+            opcode, length = struct.unpack('!BB', preamble)
+            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+            reason = fd.read(2)
+            reason = struct.unpack('!H', reason)[0]
+            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+            fd.close();
+
+    def test_wait_unfinished_fragmented_message(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = 'Received new unfragmented data frame during fragmented message'
+
+        msg = 'Hello, '
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        frame = self.ws.wait()
+        assert self.ws.websocket_closed, \
+                'Failed to close connection when sent new unfragmented ' + \
+                'data frame during fragmented message'
+
+        preamble = fd.read(2)
+
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+        reason = fd.read(2)
+        reason = struct.unpack('!H', reason)[0]
+        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_bad_fragment_opcode(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = 'Received new fragment frame with non-zero opcode'
+
+        msg = 'Hello, '
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        frame = self.ws.wait()
+        assert self.ws.websocket_closed, \
+                'Failed to close connection when sent fragment with non-zero opcode'
+
+        preamble = fd.read(2)
+
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+
+        reason = fd.read(2)
+        reason = struct.unpack('!H', reason)[0]
+        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
+
+        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
+        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_two_fragments(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = ''
+
+        msg = 'Hello, '
+        expected_msg += msg
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        msg = 'websocket'
+        expected_msg += msg
+        mask = 23
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_three_fragments(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = ''
+
+        msg = 'Hello, '
+        expected_msg += msg
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        msg = 'websocket'
+        expected_msg += msg
+        mask = 1337
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_FRAG, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        msg = '!'
+        expected_msg += msg
+        mask = 23
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    def test_wait_control_frame_during_fragmented_message(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        expected_msg = ''
+
+        msg = 'Hello, '
+        expected_msg += msg
+        mask = 42
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.OPCODE_TEXT, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        ping_msg = 'Marco!'
+        mask = 0x42424242
+        encoded_ping_msg = self._get_payload(mask, ping_msg)
+        length = len(encoded_ping_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PING,
+            WebSocketVersion7.MASK | length, mask, encoded_ping_msg))
+
+        msg = 'websocket'
+        expected_msg += msg
+        mask = 23
+        encoded_msg = self._get_payload(mask, msg)
+        length = len(encoded_msg)
+        fd.write(struct.pack('!BBL%ds' % length,
+            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
+            WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+        rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a ping frame'
+        assert rxd_msg == (WebSocketVersion7.OPCODE_PING, ping_msg.encode('utf-8')), \
+                'Wrong result "%s"' % (rxd_msg,)
+
+        preamble = fd.read(2)
+        opcode, length = struct.unpack('!BB', preamble)
+        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
+        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_PONG, \
+                'Opcode must be %x' % WebSocketVersion7.OPCODE_PONG
+        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
+        assert length == len(encoded_ping_msg), 'Wrong payload length. Got %d' % length
+
+        txd_msg = fd.read(length)
+        assert txd_msg == ping_msg.encode('utf-8'), \
+                'Wrong message "%s", expected "%s"' % (txd_msg, ping_msg)
+
+        opcode, rxd_msg = self.ws.wait()
+        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+        assert rxd_msg == expected_msg, \
+                'Wrong message "%s", expected "%s"' % (rxd_msg, expected_msg)
+
+        fd.close();
+
+    def test_wait_two_fragmented_messages(self):
+        fd = self.connect().makefile(bufsize=1)
+
+        fd.write(self.GOOD_HEADERS)
+        read_http(fd, code=101, reason='Switching Protocols')
+
+        for x in xrange(0, 2):
+            expected_msg = ''
+
+            msg = 'Hello, '
+            expected_msg += msg
+            mask = 42
+            encoded_msg = self._get_payload(mask, msg)
+            length = len(encoded_msg)
+            fd.write(struct.pack('!BBL%ds' % length,
+                WebSocketVersion7.OPCODE_TEXT, 
+                WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+            msg = 'websocket %d' % x
+            expected_msg += msg
+            mask = 23
+            encoded_msg = self._get_payload(mask, msg)
+            length = len(encoded_msg)
+            fd.write(struct.pack('!BBL%ds' % length,
+                WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
+                WebSocketVersion7.MASK | length, mask, encoded_msg))
+
+            opcode, rxd_msg = self.ws.wait()
+            assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
+            assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
+            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
+
+        fd.close();
+
+    """
+    TODO: write fragmentation test cases:
+          - too large of a message
+            - make this a config attribute that applies to individual messages as well
+    """
+
 if __name__ == '__main__':
     greentest.main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.