Jeffrey Gelens avatar Jeffrey Gelens committed 4bd0e63

Cleaned up examples and moved the autobahn testrunner.

Comments (0)

Files changed (13)

examples/echoserver.py

 
 
 def echo(environ, start_response):
-    websocket = environ.get('wsgi.websocket')
+    websocket = environ.get("wsgi.websocket")
     if websocket is None:
         return http_handler(environ, start_response)
     try:
             websocket.send(message)
         websocket.close()
     except geventwebsocket.WebSocketError, ex:
-        print '%s: %s' % (ex.__class__.__name__, ex)
+        print "%s: %s" % (ex.__class__.__name__, ex)
 
 
 def http_handler(environ, start_response):
-    if environ['PATH_INFO'].strip('/') == 'version':
-        start_response('200 OK', [])
+    if environ["PATH_INFO"].strip("/") == "version":
+        start_response("200 OK", [])
         return [agent]
-    start_response('400 Bad Request', [])
-    return ['WebSocket connection is expected here.']
+    else:
+        start_response("400 Bad Request", [])
+        return ["WebSocket connection is expected here."]
 
 
 path = os.path.dirname(geventwebsocket.__file__)
-agent = 'gevent-websocket/%s' % (geventwebsocket.__version__)
-print 'Running %s from %s' % (agent, path)
-WSGIServer(('', 7000), echo, handler_class=geventwebsocket.WebSocketHandler).serve_forever()
+agent = "gevent-websocket/%s" % (geventwebsocket.__version__)
+print "Running %s from %s" % (agent, path)
+WSGIServer(("", 8000), echo, handler_class=geventwebsocket.WebSocketHandler).serve_forever()

examples/gunicorn_websocket.py

-# -*- coding: utf-8 -
-#
-# gunicorn -k "geventwebsocket.gunicorn.workers.GeventWebSocketWorker" gunicorn_websocket:app
-
-import gevent
-
-# demo app
-import os
-import random
-def app(environ, start_response):
-    if environ['PATH_INFO'] == '/test':
-        start_response("200 OK", [('Content-Type', 'text/plain')])
-        return ["blaat"]
-    elif environ['PATH_INFO'] == "/data":
-        ws = environ['wsgi.websocket']
-        for i in xrange(10000):
-            ws.send("0 %s %s\n" % (i, random.random()))
-            gevent.sleep(1)
-    else:
-        start_response("404 Not Found", [])
-        return []
-

examples/plot_graph.html

+<!DOCTYPE html>
+<html>
+<head>
+	<!-- idea and code swiped from
+	http://assorted.svn.sourceforge.net/viewvc/assorted/real-time-plotter/trunk/src/rtp.html?view=markup -->
+	<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.1/jquery.min.js"></script>
+	<script src="http://people.iola.dk/olau/flot/jquery.flot.js"></script>
+	<script>
+	var iets = "";
+	window.onload = function() {
+		var data = {};
+		try {
+			var s = new WebSocket("ws://localhost:8000/data");
+		}
+		catch (e) {
+			var s = new MozWebSocket("ws://localhost:8000/data");
+		}
+		s.onopen = function() {
+			s.send('hi');
+		};
+		s.onmessage = function(e) {
+			var lines = e.data.split('\n');
+
+			for (var i = 0; i < lines.length - 1; i++) {
+				var parts = lines[i].split(' ');
+				var d = parts[0], x = parseFloat(parts[1]), y = parseFloat(parts[2]);
+				if (!(d in data)) data[d] = [];
+				data[d].push([x,y]);
+			}
+
+			var plots = [];
+			for (var d in data) plots.push({ data: data[d].slice(data[d].length - 200) });
+
+			$.plot($("#holder"), plots, {
+				series: { lines: { show: true, fill: true },},
+				yaxis: { min: 0 },
+			});
+			s.send('');
+		};
+	};
+	</script>
+</head>
+	<body>
+	<h3>Plot</h3>
+	<div id="holder" style="width:600px;height:300px"></div>
+	</body>
+</html>

examples/plot_graph.py

+"""
+This example generates random data and plots a graph in the browser.
+
+Run it using Gevent directly using:
+    $ python plot_grapg.py
+
+Or with an Gunicorn wrapper:
+    $ gunicorn -k "geventwebsocket.gunicorn.workers.GeventWebSocketWorker" \
+        plot_graph:app
+"""
+
+import gevent
+import os
+import random
+
+from gevent import pywsgi
+from geventwebsocket.handler import WebSocketHandler
+
+
+def handle(ws):
+    """
+    This is the websocket handler function. Note that we can dispatch based on
+    path in here, too.
+    """
+
+    if ws.path == "/echo":
+        while True:
+            m = ws.receive()
+            if m is None:
+                break
+            ws.send(m)
+
+    elif ws.path == "/data":
+        for i in xrange(10000):
+            ws.send("0 %s %s\n" % (i, random.random()))
+            gevent.sleep(0.1)
+
+
+def app(environ, start_response):
+    if environ["PATH_INFO"] == "/":
+        start_response("200 OK", [("Content-Type", "text/html")])
+        return open("plot_graph.html").readlines()
+    elif environ["PATH_INFO"] in ("/data", "/echo"):
+        handle(environ["wsgi.websocket"])
+    else:
+        start_response("404 Not Found", [])
+        return []
+
+
+if __name__ == "__main__":
+    server = pywsgi.WSGIServer(("", 8000), app,
+        handler_class=WebSocketHandler)
+    server.serve_forever()

examples/test.py

-from geventwebsocket.handler import WebSocketHandler
-from gevent import pywsgi
-import gevent
-
-
-# demo app
-import os
-import random
-def handle(ws):
-    """  This is the websocket handler function.  Note that we
-    can dispatch based on path in here, too."""
-    if ws.path == '/echo':
-        while True:
-            m = ws.receive()
-            if m is None:
-                break
-            ws.send(m)
-
-    elif ws.path == '/data':
-        for i in xrange(10000):
-            ws.send("0 %s %s\n" % (i, random.random()))
-            #print "0 %s %s\n" % (i, random.random())
-            gevent.sleep(0.1)
-
-
-def app(environ, start_response):
-    if environ['PATH_INFO'] == '/test':
-        start_response("200 OK", [('Content-Type', 'text/plain')])
-        return ["blaat"]
-    elif environ['PATH_INFO'] in ("/data", "/echo"):
-        handle(environ['wsgi.websocket'])
-    else:
-        start_response("404 Not Found", [])
-        return []
-
-
-server = pywsgi.WSGIServer(('0.0.0.0', 8000), app,
-        handler_class=WebSocketHandler)
-server.serve_forever()

examples/websocket.html

-<!DOCTYPE html>
-<html>
-<head>
-<!-- idea and code swiped from
-http://assorted.svn.sourceforge.net/viewvc/assorted/real-time-plotter/trunk/src/rtp.html?view=markup -->
-<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.1/jquery.min.js"></script>
-<script src="http://people.iola.dk/olau/flot/jquery.flot.js"></script>
-<script>
-var iets = "";
-window.onload = function() {
-    var data = {};
-	try {
-		var s = new WebSocket("ws://localhost:8000/data");
-	}
-	catch (e) {
-		var s = new MozWebSocket("ws://localhost:8000/data");
-	}
-    s.onopen = function() {
-        //alert('open');
-        s.send('hi');
-    };
-    s.onmessage = function(e) {
-
-      //alert('got ' + e.data);
-      var lines = e.data.split('\n');
-      for (var i = 0; i < lines.length - 1; i++) {
-        var parts = lines[i].split(' ');
-        var d = parts[0], x = parseFloat(parts[1]), y = parseFloat(parts[2]);
-        if (!(d in data)) data[d] = [];
-        data[d].push([x,y]);
-      }
-      var plots = [];
-      for (var d in data) plots.push( { data: data[d].slice(data[d].length - 200) } );
-      $.plot( $("#holder"), plots,
-              {
-                series: {
-                  lines: { show: true, fill: true },
-                },
-                yaxis: { min: 0 },
-              } );
-
-      s.send('');
-    };
-};
-</script>
-</head>
-<body>
-<h3>Plot</h3>
-<div id="holder" style="width:600px;height:300px"></div>
-</body>
-</html>
-

geventwebsocket/handler.py

         self.pre_start()
         environ = self.environ
         upgrade = environ.get('HTTP_UPGRADE', '').lower()
+
         if upgrade == 'websocket':
             connection = environ.get('HTTP_CONNECTION', '').lower()
             if 'upgrade' in connection:
 
     def _handle_websocket(self):
         environ = self.environ
+
         try:
             if environ.get("HTTP_SEC_WEBSOCKET_VERSION"):
                 self.close_connection = True
             elif environ.get("HTTP_ORIGIN"):
                 self.close_connection = True
                 result = self._handle_hixie()
+
             self.result = []
             if not result:
                 return
+
             self.application(environ, None)
             return []
         finally:
                 ("Connection", "Upgrade"),
                 ("WebSocket-Location", reconstruct_url(environ)),
             ]
+
             if self.websocket.protocol is not None:
                 headers.append(("WebSocket-Protocol", self.websocket.protocol))
             if self.websocket.origin:
     def respond(self, status, headers=[]):
         self.close_connection = True
         self._send_reply(status, headers)
+
         if self.socket is not None:
             try:
                 self.socket._sock.close()
 
     url += quote(environ.get('SCRIPT_NAME', ''))
     url += quote(environ.get('PATH_INFO', ''))
+
     if environ.get('QUERY_STRING'):
         url += '?' + environ['QUERY_STRING']
+
     return url

geventwebsocket/python_fixes.py

+import sys
+
+
 if sys.version_info[:2] == (2, 7):
     # Python 2.7 has a working BufferedReader but socket.makefile() does not
     # use it.

geventwebsocket/websocket.py

-import sys
 import struct
 
 from errno import EINTR
 
 
 class WebSocket(object):
-    def _encode_text(self, s):
-        if isinstance(s, unicode):
-            return s.encode('utf-8')
+    def _encode_text(self, text):
+        if isinstance(text, unicode):
+            return text.encode('utf-8')
         else:
-            return s
+            return text
 
 
 class WebSocketHixie(WebSocket):
-
     def __init__(self, socket, environ):
         self.origin = environ.get('HTTP_ORIGIN')
         self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
         self.path = environ.get('PATH_INFO')
+        self.fobj = socket.makefile()
         self._writelock = Semaphore(1)
-        self.fobj = socket.makefile()
         self._write = socket.sendall
 
     def send(self, message):
 
         while True:
             if self.fobj is None:
-                raise WebSocketError('Connenction closed unexpectedly while reading message length')
+                raise WebSocketError('Connection closed unexpectedly while reading message length')
             byte_str = self.fobj.read(1)
 
             if not byte_str:
             if self.fobj is None:
                 msg = ''.join(bytes)
                 raise WebSocketError('Connection closed unexpectedly while reading message: %r' % msg)
+
             byte = read(1)
             if ord(byte) != 0xff:
                 bytes.append(byte)
 
     def receive(self):
         read = self.fobj.read
+
         while self.fobj is not None:
             frame_str = read(1)
+
             if not frame_str:
                 self.close()
                 return
 
         assert not self._reading, 'Reading is not possible from multiple greenlets'
         self._reading = True
+
         try:
             data0 = read(2)
+
             if not data0:
                 self._close()
                 return
                 data1 = ''
             elif length == 126:
                 data1 = read(2)
+
                 if len(data1) != 2:
                     self.close()
                     raise WebSocketError('Incomplete read while reading 2-byte length: %r' % (data0 + data1))
+
                 length = struct.unpack('!H', data1)[0]
             else:
                 assert length == 127, length
                 data1 = read(8)
+
                 if len(data1) != 8:
                     self.close()
                     raise WebSocketError('Incomplete read while reading 8-byte length: %r' % (data0 + data1))
+
                 length = struct.unpack('!Q', data1)[0]
 
             mask = read(4)
 
             if payload:
                 payload = bytearray(payload)
+
                 for i in xrange(len(payload)):
                     payload[i] = payload[i] ^ mask[i % 4]
 
 
     def _receive(self):
         """Return the next text or binary message from the socket."""
+
         opcode = None
         result = bytearray()
+
         while True:
             frame = self.receive_frame()
             if frame is None:
         result = self._receive()
         if not result:
             return result
+
         message, is_binary = result
         if is_binary:
             return message
             self._write = None
             fobj = self.fobj
             self.fobj = None
+
             if not self._reading:
                 fobj.close()

run_autobahn_tests.py

-#!/usr/bin/env python
-"""Test gevent-websocket with the test suite of Autobahn
-
-    http://www.tavendo.de/autobahn/testsuite.html
-"""
-import sys
-import os
-import subprocess
-import time
-import urllib2
-from twisted.python import log
-from twisted.internet import reactor
-from autobahn.fuzzing import FuzzingClientFactory
-
-
-spec = {
-   "options": {"failByDrop": False},
-   "enable-ssl": False,
-   "servers": []}
-
-
-default_args = ["*",
-         "x7.5.1",
-         "x7.9.3",
-         "x7.9.4",
-         "x7.9.5",
-         "x7.9.6",
-         "x7.9.7",
-         "x7.9.8",
-         "x7.9.9",
-         "x7.9.10",
-         "x7.9.11",
-         "x7.9.12",
-         "x7.9.13"]
-# We ignore 7.5.1 because it checks that close frame has valid utf-8 message
-# we do not validate utf-8.
-
-# We ignore 7.9.3-13 because it checks that when a close frame with code 1004
-# and others sent, 1002 is sent back; we only send back 1002 for obvious
-# violations like < 1000 and >= 5000; for all codes in the 1000-5000 range
-# we send code 1000 back
-
-
-class ProcessPool(object):
-
-    def __init__(self):
-        self.popens = []
-
-    def spawn(self, *args, **kwargs):
-        popen = subprocess.Popen(*args, **kwargs)
-        self.popens.append(popen)
-        time.sleep(0.2)
-        self.check()
-        return popen
-
-    def check(self):
-        for popen in self.popens:
-            if popen.poll() is not None:
-                sys.exit(1)
-
-    def wait(self, timeout):
-        end = time.time() + timeout
-        while True:
-            time.sleep(0.1)
-            pool.check()
-            if time.time() > end:
-                break
-
-    def kill(self):
-        while self.popens:
-            popen = self.popens.pop()
-            try:
-                popen.kill()
-            except Exception, ex:
-                print ex
-
-
-if __name__ == '__main__':
-    import optparse
-    parser = optparse.OptionParser()
-    parser.add_option('--geventwebsocket', default='examples/echoserver.py')
-    parser.add_option('--autobahn', default='../../src/Autobahn/testsuite/websockets/servers/test_autobahn.py')
-    options, args = parser.parse_args()
-
-    cases = []
-    exclude_cases = []
-
-    for arg in (args or default_args):
-        if arg.startswith('x'):
-            arg = arg[1:]
-            exclude_cases.append(arg)
-        else:
-            cases.append(arg)
-
-    spec['cases'] = cases
-    spec['exclude-cases'] = exclude_cases
-
-    if options.autobahn and not os.path.exists(options.autobahn):
-        print 'Ignoring %s (not found)' % options.autobahn
-        options.autobahn = None
-    pool = ProcessPool()
-    try:
-        if options.geventwebsocket:
-            pool.spawn([sys.executable, options.geventwebsocket])
-        if options.autobahn:
-            pool.spawn([sys.executable, options.autobahn])
-        pool.wait(1)
-        if options.geventwebsocket:
-            agent = urllib2.urlopen('http://127.0.0.1:7000/version').read().strip()
-            assert agent and '\n' not in agent and 'gevent-websocket' in agent, agent
-            spec['servers'].append({"url": "ws://localhost:7000",
-                                    "agent": agent,
-                                    "options": {"version": 17}})
-        if options.autobahn:
-            spec['servers'].append({'url': 'ws://localhost:9000/',
-                                    'agent': 'AutobahnServer',
-                                    'options': {'version': 17}})
-        log.startLogging(sys.stdout)
-        FuzzingClientFactory(spec)
-        reactor.run()
-    finally:
-        pool.kill()

tests/greentest.py

-# Copyright (c) 2008-2009 AG Projects
-# Author: Denis Bilenko
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-# package is named greentest, not test, so it won't be confused with test in stdlib
-import sys
-import unittest
-import time
-import traceback
-import re
-
-import gevent
-
-disabled_marker = '-*-*-*-*-*- disabled -*-*-*-*-*-'
-def exit_disabled():
-    sys.exit(disabled_marker)
-
-def exit_unless_25():
-    if sys.version_info[:2] < (2, 5):
-        exit_disabled()
-
-VERBOSE = sys.argv.count('-v') > 1
-
-if '--debug-greentest' in sys.argv:
-    sys.argv.remove('--debug-greentest')
-    DEBUG = True
-else:
-    DEBUG = False
-
-
-class TestCase(unittest.TestCase):
-
-    __timeout__ = 1
-    switch_expected = True
-    _switch_count = None
-
-    def setUp(self):
-        gevent.sleep(0) # switch at least once to setup signal handlers
-        if hasattr(gevent.core, '_event_count'):
-            self._event_count = (gevent.core._event_count(), gevent.core._event_count_active())
-        hub = gevent.hub.get_hub()
-        if hasattr(hub, 'switch_count'):
-            self._switch_count = hub.switch_count
-        self._timer = gevent.Timeout.start_new(self.__timeout__, RuntimeError('test is taking too long'))
-
-    def tearDown(self):
-        try:
-            if not hasattr(self, 'stderr'):
-                self.unhook_stderr()
-            if hasattr(self, 'stderr'):
-                sys.__stderr__.write(self.stderr)
-        except:
-            traceback.print_exc()
-        if hasattr(self, '_timer'):
-            self._timer.cancel()
-            hub = gevent.hub.get_hub()
-            if self._switch_count is not None and hasattr(hub, 'switch_count'):
-                msg = ''
-                if hub.switch_count < self._switch_count:
-                    msg = 'hub.switch_count decreased?\n'
-                elif hub.switch_count == self._switch_count:
-                    if self.switch_expected:
-                        msg = '%s.%s did not switch\n' % (type(self).__name__, self.testname)
-                elif hub.switch_count > self._switch_count:
-                    if not self.switch_expected:
-                        msg = '%s.%s switched but expected not to\n' % (type(self).__name__, self.testname)
-                if msg:
-                    print >> sys.stderr, 'WARNING: ' + msg
-
-            if hasattr(gevent.core, '_event_count'):
-                event_count = (gevent.core._event_count(), gevent.core._event_count_active())
-                if event_count > self._event_count:
-                    args = (type(self).__name__, self.testname, self._event_count, event_count)
-                    sys.stderr.write('WARNING: %s.%s event count was %s, now %s\n' % args)
-                    gevent.sleep(0.1)
-        else:
-            sys.stderr.write('WARNING: %s.setUp does not call base class setUp\n' % (type(self).__name__, ))
-
-    @property
-    def testname(self):
-        return getattr(self, '_testMethodName', '') or getattr(self, '_TestCase__testMethodName')
-
-    @property
-    def testcasename(self):
-        return self.__class__.__name__ + '.' + self.testname
-
-    def hook_stderr(self):
-        if VERBOSE:
-            return
-        from cStringIO import StringIO
-        self.new_stderr = StringIO()
-        self.old_stderr = sys.stderr
-        sys.stderr = self.new_stderr
-
-    def unhook_stderr(self):
-        if VERBOSE:
-            return
-        try:
-            value = self.new_stderr.getvalue()
-        except AttributeError:
-            return None
-        sys.stderr = self.old_stderr
-        self.stderr = value
-        return value
-
-    def assert_no_stderr(self):
-        stderr = self.unhook_stderr()
-        assert not stderr, 'Expected no stderr, got:\n__________\n%s\n^^^^^^^^^^\n\n' % (stderr, )
-
-    def assert_stderr_traceback(self, typ, value=None):
-        if VERBOSE:
-            return
-        if isinstance(typ, Exception):
-            if value is None:
-                value = str(typ)
-            typ = typ.__class__.__name__
-        else:
-            typ = getattr(typ, '__name__', typ)
-        stderr = self.unhook_stderr()
-        assert stderr is not None, repr(stderr)
-        traceback_re = '^Traceback \\(most recent call last\\):\n( +.*?\n)+^(?P<type>\w+): (?P<value>.*?)$'
-        self.extract_re(traceback_re, type=typ, value=value)
-
-    def assert_stderr(self, message):
-        if VERBOSE:
-            return
-        exact_re = '^' + message + '.*?\n$.*'
-        if re.match(exact_re, self.stderr):
-            self.extract_re(exact_re)
-        else:
-            words_re = '^' + '.*?'.join(message.split()) + '.*?\n$'
-            if re.match(words_re, self.stderr):
-                self.extract_re(words_re)
-            else:
-                if message.endswith('...'):
-                    another_re = '^' + '.*?'.join(message.split()) + '.*?(\n +.*?$){2,5}\n\n'
-                    self.extract_re(another_re)
-                else:
-                    raise AssertionError('%r did not match:\n%r' % (message, self.stderr))
-
-    def assert_mainloop_assertion(self, message=None):
-        self.assert_stderr_traceback('AssertionError', 'Cannot switch to MAINLOOP from MAINLOOP')
-        if message is not None:
-            self.assert_stderr(message)
-
-    def extract_re(self, regex, **kwargs):
-        assert self.stderr is not None
-        m = re.search(regex, self.stderr, re.DOTALL|re.M)
-        if m is None:
-            raise AssertionError('%r did not match:\n%r' % (regex, self.stderr))
-        for key, expected_value in kwargs.items():
-            real_value = m.group(key)
-            if expected_value is not None:
-                try:
-                    self.assertEqual(real_value, expected_value)
-                except AssertionError:
-                    print 'failed to process: %s' % self.stderr
-                    raise
-        if DEBUG:
-            ate = '\n#ATE#: ' + self.stderr[m.start(0):m.end(0)].replace('\n', '\n#ATE#: ') + '\n'
-            sys.__stderr__.write(ate)
-        self.stderr = self.stderr[:m.start(0)] + self.stderr[m.end(0)+1:]
-
-
-main = unittest.main
-
-_original_Hub = gevent.hub.Hub
-
-class CountingHub(_original_Hub):
-
-    switch_count = 0
-
-    def switch(self):
-        self.switch_count += 1
-        return _original_Hub.switch(self)
-
-gevent.hub.Hub = CountingHub
-
-
-def test_outer_timeout_is_not_lost(self):
-    timeout = gevent.Timeout.start_new(0.01)
-    try:
-        self.wait(timeout=0.02)
-    except gevent.Timeout, ex:
-        assert ex is timeout, (ex, timeout)
-    else:
-        raise AssertionError('must raise Timeout')
-    gevent.sleep(0.02)
-
-
-class GenericWaitTestCase(TestCase):
-
-    def wait(self, timeout):
-        raise NotImplementedError('override me in subclass')
-
-    test_outer_timeout_is_not_lost = test_outer_timeout_is_not_lost
-
-    def test_returns_none_after_timeout(self):
-        start = time.time()
-        result = self.wait(timeout=0.01)
-        # join and wait simply returns after timeout expires
-        delay = time.time() - start
-        assert 0.01 - 0.001 <= delay < 0.01 + 0.01 + 0.1, delay
-        assert result is None, repr(result)
-
-
-class GenericGetTestCase(TestCase):
-
-    def wait(self, timeout):
-        raise NotImplementedError('override me in subclass')
-
-    test_outer_timeout_is_not_lost = test_outer_timeout_is_not_lost
-
-    def test_raises_timeout_number(self):
-        start = time.time()
-        self.assertRaises(gevent.Timeout, self.wait, timeout=0.01)
-        # get raises Timeout after timeout expired
-        delay = time.time() - start
-        assert 0.01 - 0.001 <= delay < 0.01 + 0.01 + 0.1, delay
-
-    def test_raises_timeout_Timeout(self):
-        start = time.time()
-        timeout = gevent.Timeout(0.01)
-        try:
-            self.wait(timeout=timeout)
-        except gevent.Timeout, ex:
-            assert ex is timeout, (ex, timeout)
-        delay = time.time() - start
-        assert 0.01 - 0.001 <= delay < 0.01 + 0.01 + 0.1, delay
-
-    def test_raises_timeout_Timeout_exc_customized(self):
-        start = time.time()
-        error = RuntimeError('expected error')
-        timeout = gevent.Timeout(0.01, exception=error)
-        try:
-            self.wait(timeout=timeout)
-        except RuntimeError, ex:
-            assert ex is error, (ex, error)
-        delay = time.time() - start
-        assert 0.01 - 0.001 <= delay < 0.01 + 0.01 + 0.1, delay
-
-
-class ExpectedException(Exception):
-    """An exception whose traceback should be ignored"""

tests/run_autobahn_tests.py

+#!/usr/bin/env python
+"""Test gevent-websocket with the test suite of Autobahn
+
+    http://www.tavendo.de/autobahn/testsuite.html
+"""
+import sys
+import os
+import subprocess
+import time
+import urllib2
+from twisted.python import log
+from twisted.internet import reactor
+from autobahn.fuzzing import FuzzingClientFactory
+
+
+spec = {
+   "options": {"failByDrop": False},
+   "enable-ssl": False,
+   "servers": []}
+
+
+default_args = ["*",
+         "x7.5.1",
+         "x7.9.3",
+         "x7.9.4",
+         "x7.9.5",
+         "x7.9.6",
+         "x7.9.7",
+         "x7.9.8",
+         "x7.9.9",
+         "x7.9.10",
+         "x7.9.11",
+         "x7.9.12",
+         "x7.9.13"]
+# We ignore 7.5.1 because it checks that close frame has valid utf-8 message
+# we do not validate utf-8.
+
+# We ignore 7.9.3-13 because it checks that when a close frame with code 1004
+# and others sent, 1002 is sent back; we only send back 1002 for obvious
+# violations like < 1000 and >= 5000; for all codes in the 1000-5000 range
+# we send code 1000 back
+
+
+class ProcessPool(object):
+
+    def __init__(self):
+        self.popens = []
+
+    def spawn(self, *args, **kwargs):
+        popen = subprocess.Popen(*args, **kwargs)
+        self.popens.append(popen)
+        time.sleep(0.2)
+        self.check()
+        return popen
+
+    def check(self):
+        for popen in self.popens:
+            if popen.poll() is not None:
+                sys.exit(1)
+
+    def wait(self, timeout):
+        end = time.time() + timeout
+        while True:
+            time.sleep(0.1)
+            pool.check()
+            if time.time() > end:
+                break
+
+    def kill(self):
+        while self.popens:
+            popen = self.popens.pop()
+            try:
+                popen.kill()
+            except Exception, ex:
+                print ex
+
+
+if __name__ == '__main__':
+    import optparse
+    parser = optparse.OptionParser()
+    parser.add_option('--geventwebsocket', default='../examples/echoserver.py')
+    parser.add_option('--autobahn', default='../../src/Autobahn/testsuite/websockets/servers/test_autobahn.py')
+    options, args = parser.parse_args()
+
+    cases = []
+    exclude_cases = []
+
+    for arg in (args or default_args):
+        if arg.startswith('x'):
+            arg = arg[1:]
+            exclude_cases.append(arg)
+        else:
+            cases.append(arg)
+
+    spec['cases'] = cases
+    spec['exclude-cases'] = exclude_cases
+
+    if options.autobahn and not os.path.exists(options.autobahn):
+        print 'Ignoring %s (not found)' % options.autobahn
+        options.autobahn = None
+    pool = ProcessPool()
+    try:
+        if options.geventwebsocket:
+            pool.spawn([sys.executable, options.geventwebsocket])
+        if options.autobahn:
+            pool.spawn([sys.executable, options.autobahn])
+        pool.wait(1)
+        if options.geventwebsocket:
+            agent = urllib2.urlopen('http://127.0.0.1:7000/version').read().strip()
+            assert agent and '\n' not in agent and 'gevent-websocket' in agent, agent
+            spec['servers'].append({"url": "ws://localhost:7000",
+                                    "agent": agent,
+                                    "options": {"version": 17}})
+        if options.autobahn:
+            spec['servers'].append({'url': 'ws://localhost:9000/',
+                                    'agent': 'AutobahnServer',
+                                    'options': {'version': 17}})
+        log.startLogging(sys.stdout)
+        FuzzingClientFactory(spec)
+        reactor.run()
+    finally:
+        pool.kill()

tests/test__websocket.py

-# Websocket tests by Jeffrey Gelens, Copyright 2010, Noppo.pro
-# Socket related functions by:
-#
-# @author Donovan Preston
-#
-# Copyright (c) 2007, Linden Research, Inc.
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-from gevent import monkey
-monkey.patch_all(thread=False)
-
-import base64
-import struct
-import sys
-import greentest
-import gevent
-import gevent.local
-from gevent import socket
-from geventwebsocket.handler import WebSocketHandler
-from geventwebsocket.websocket import WebSocketVersion7
-
-CONTENT_LENGTH = 'Content-Length'
-CONN_ABORTED_ERRORS = []
-DEBUG = '-v' in sys.argv
-
-try:
-    from errno import WSAECONNABORTED
-    CONN_ABORTED_ERRORS.append(WSAECONNABORTED)
-except ImportError:
-    pass
-
-
-class ConnectionClosed(Exception):
-    pass
-
-
-def read_headers(fd):
-    response_line = fd.readline()
-    if not response_line:
-        raise ConnectionClosed
-    headers = {}
-    while True:
-        line = fd.readline().strip()
-        if not line:
-            break
-        try:
-            key, value = line.split(': ', 1)
-        except:
-            print 'Failed to split: %r' % (line, )
-            raise
-        assert key.lower() not in [x.lower() for x in headers.keys()], 'Header %r:%r sent more than once: %r' % (key, value, headers)
-        headers[key] = value
-    return response_line, headers
-
-
-def iread_chunks(fd):
-    while True:
-        line = fd.readline()
-        chunk_size = line.strip()
-        try:
-            chunk_size = int(chunk_size, 16)
-        except:
-            print 'Failed to parse chunk size: %r' % line
-            raise
-        if chunk_size == 0:
-            crlf = fd.read(2)
-            assert crlf == '\r\n', repr(crlf)
-            break
-        data = fd.read(chunk_size)
-        yield data
-        crlf = fd.read(2)
-        assert crlf == '\r\n', repr(crlf)
-
-
-class Response(object):
-
-    def __init__(self, status_line, headers, body=None, chunks=None):
-        self.status_line = status_line
-        self.headers = headers
-        self.body = body
-        self.chunks = chunks
-        try:
-            version, code, self.reason = status_line[:-2].split(' ', 2)
-        except Exception:
-            print 'Error: %r' % status_line
-            raise
-        self.code = int(code)
-        HTTP, self.version = version.split('/')
-        assert HTTP == 'HTTP', repr(HTTP)
-        assert self.version in ('1.0', '1.1'), repr(self.version)
-
-    def __iter__(self):
-        yield self.status_line
-        yield self.headers
-        yield self.body
-
-    def __str__(self):
-        args = (self.__class__.__name__, self.status_line, self.headers, self.body, self.chunks)
-        return '<%s status_line=%r headers=%r body=%r chunks=%r>' % args
-
-    def assertCode(self, code):
-        if hasattr(code, '__contains__'):
-            assert self.code in code, 'Unexpected code: %r (expected %r)\n%s' % (self.code, code, self)
-        else:
-            assert self.code == code, 'Unexpected code: %r (expected %r)\n%s' % (self.code, code, self)
-
-    def assertReason(self, reason):
-        assert self.reason == reason, 'Unexpected reason: %r (expected %r)\n%s' % (self.reason, reason, self)
-
-    def assertVersion(self, version):
-        assert self.version == version, 'Unexpected version: %r (expected %r)\n%s' % (self.version, version, self)
-
-    def assertHeader(self, header, value):
-        real_value = self.headers.get(header)
-        assert real_value == value, \
-               'Unexpected header %r: %r (expected %r)\n%s' % (header, real_value, value, self)
-
-    def assertBody(self, body):
-        assert self.body == body, \
-               'Unexpected body: %r (expected %r)\n%s' % (self.body, body, self)
-
-    @classmethod
-    def read(cls, fd, code=200, reason='default', version='1.1', body=None):
-        _status_line, headers = read_headers(fd)
-        self = cls(_status_line, headers)
-        if code is not None:
-            self.assertCode(code)
-        if reason == 'default':
-            reason = {200: 'OK'}.get(code)
-        if reason is not None:
-            self.assertReason(reason)
-        if version is not None:
-            self.assertVersion(version)
-        if self.code == 100:
-            return self
-        try:
-            if 'chunked' in headers.get('Transfer-Encoding', ''):
-                if CONTENT_LENGTH in headers:
-                    print "WARNING: server used chunked transfer-encoding despite having Content-Length header (libevent 1.x's bug)"
-                self.chunks = list(iread_chunks(fd))
-                self.body = ''.join(self.chunks)
-            elif CONTENT_LENGTH in headers:
-                num = int(headers[CONTENT_LENGTH])
-                self.body = fd.read(num)
-            #else:
-            #    self.body = fd.read(16)
-        except:
-            print 'Response.read failed to read the body:\n%s' % self
-            raise
-        if body is not None:
-            self.assertBody(body)
-        return self
-
-read_http = Response.read
-
-
-class DebugFileObject(object):
-
-    def __init__(self, obj):
-        self.obj = obj
-
-    def read(self, *args):
-        result = self.obj.read(*args)
-        if DEBUG:
-            print repr(result)
-        return result
-
-    def readline(self, *args):
-        result = self.obj.readline(*args)
-        if DEBUG:
-            print repr(result)
-        return result
-
-    def __getattr__(self, item):
-        assert item != 'obj'
-        return getattr(self.obj, item)
-
-
-def makefile(self, mode='r', bufsize=-1):
-    return DebugFileObject(socket._fileobject(self.dup(), mode, bufsize))
-
-socket.socket.makefile = makefile
-
-import gevent.coros
-class TestCase(greentest.TestCase):
-    __timeout__ = 5
-    _testlock = gevent.coros.Semaphore(1)
-
-    def get_wsgi_module(self):
-        from gevent import pywsgi
-        return pywsgi
-
-    def init_server(self, application):
-        self.local = gevent.local.local()
-        self.local.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0),
-            application, handler_class=WebSocketHandler)
-
-    def setUp(self):
-        application = self.application
-        self.init_server(application)
-        self.local.server.start()
-        self.port = self.local.server.server_port
-        greentest.TestCase.setUp(self)
-
-    def tearDown(self):
-        greentest.TestCase.tearDown(self)
-        timeout = gevent.Timeout.start_new(0.5)
-        try:
-            self.local.server.stop()
-        finally:
-            timeout.cancel()
-
-    def connect(self):
-        return socket.create_connection(('127.0.0.1', self.port))
-
-class TestWebSocket(TestCase):
-    message = "\x00Hello world\xff"
-
-    def application(self, environ, start_response):
-        if environ['PATH_INFO'] == "/echo":
-            try:
-                ws = environ['wsgi.websocket']
-            except KeyError:
-                start_response("400 Bad Request", [])
-                return []
-
-            while True:
-                message = ws.wait()
-                if message is None:
-                    break
-                ws.send(message)
-
-            return []
-
-    def test_basic(self):
-        fd = self.connect().makefile(bufsize=1)
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key2: 12998 5 Y3 1  .P00\r\n" \
-        "Sec-WebSocket-Protocol: test\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5\r\n" \
-        "Origin: http://localhost\r\n\r\n" \
-        "^n:ds[4U"
-
-        fd.write(headers)
-
-        response = read_http(fd, code=101, reason="Web Socket Protocol Handshake")
-        response.assertHeader("Upgrade", "WebSocket")
-        response.assertHeader("Connection", "Upgrade")
-        response.assertHeader("Sec-WebSocket-Origin", "http://localhost")
-        response.assertHeader("Sec-WebSocket-Location", "ws://localhost/echo")
-        response.assertHeader("Sec-WebSocket-Protocol", "test")
-        assert fd.read(16) == "8jKS'y:G*Co,Wxa-"
-
-        fd.write(self.message)
-        message = fd.read(len(self.message))
-        assert message == self.message, \
-               'Unexpected message: %r (expected %r)\n%s' % (message, self.message, self)
-
-        fd.close()
-
-    def test_10000_messages(self):
-        fd = self.connect().makefile(bufsize=1)
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key2: 12998 5 Y3 1  .P00\r\n" \
-        "Sec-WebSocket-Protocol: test\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5\r\n" \
-        "Origin: http://localhost\r\n\r\n" \
-        "^n:ds[4U"
-
-        fd.write(headers)
-
-        response = read_http(fd, code=101, reason="Web Socket Protocol Handshake")
-        response.assertHeader("Upgrade", "WebSocket")
-        response.assertHeader("Connection", "Upgrade")
-        response.assertHeader("Sec-WebSocket-Origin", "http://localhost")
-        response.assertHeader("Sec-WebSocket-Location", "ws://localhost/echo")
-        response.assertHeader("Sec-WebSocket-Protocol", "test")
-        assert fd.read(16) == "8jKS'y:G*Co,Wxa-"
-
-        for i in xrange(10000):
-            fd.write(self.message)
-            message = fd.read(len(self.message))
-
-            assert message == self.message, \
-                   'Unexpected message: %r (expected %r)\n%s' % (message, self.message, self)
-
-
-        fd.close()
-
-    def test_badrequest(self):
-        fd = self.connect().makefile(bufsize=1)
-        fd.write('GET /echo HTTP/1.1\r\nHost: localhost\r\n\r\n')
-        read_http(fd, code=400, reason='Bad Request')
-        fd.close()
-
-    def test_oldprotocol_version(self):
-        fd = self.connect().makefile(bufsize=1)
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Protocol: test\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5\r\n" \
-        "Origin: http://localhost\r\n\r\n" \
-        "^n:ds[4U"
-
-        fd.write(headers)
-        read_http(fd, code=400, reason='Bad Request',
-            body='Client using old/invalid protocol implementation')
-
-        fd.close()
-
-    def test_protocol_version75(self):
-        fd = self.connect().makefile(bufsize=1)
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Connection: Upgrade\r\n" \
-        "WebSocket-Protocol: sample\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Origin: http://example.com\r\n\r\n"
-
-        fd.write(headers)
-        response = read_http(fd, code=101, reason="Web Socket Protocol Handshake")
-
-        fd.write(self.message)
-        message = fd.read(len(self.message))
-        assert message == self.message, \
-               'Unexpected message: %r (expected %r)\n%s' % (message, self.message, self)
-
-        fd.close()
-
-class TestWebSocketVersion7(TestCase):
-
-    GOOD_HEADERS = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-    def application(self, environ, start_response):
-        if environ['PATH_INFO'] == "/echo":
-            try:
-                ws = environ['wsgi.websocket']
-                self.ws = ws
-                ws.compatibility_mode = False
-            except KeyError:
-                start_response("400 Bad Request", [])
-                return []
-
-            while not ws.websocket_closed:
-                gevent.sleep()
-            self.close_connection = True
-            return None
-
-    def test_bad_handshake_method(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "POST /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd, code=101, reason="Web Socket Protocol Handshake")
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with bad method"
-        fd.close()
-
-    def test_bad_handshake_version(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "GET /echo HTTP/1.0\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd)
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with bad version"
-        fd.close()
-
-    #XXX: this tests the check for SERVER_NAME != HTTP_HOST, which is commented
-    #     out until I figure out why SERVER_NAME is not set in pyramid.
-    """
-    def test_bad_handshake_host(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: example.com\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd)
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with bad Host"
-        fd.close()
-    """
-
-    def test_bad_handshake_no_key(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd)
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with no Sec-WebSocket-Key"
-        fd.close()
-
-    def test_bad_handshake_short_key(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: " + base64.b64encode('too short') + "\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd)
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with key that is too short"
-        fd.close()
-
-    def test_bad_handshake_long_key(self):
-        fd = self.connect().makefile(bufsize=1)
-        closed = False
-        headers = "" \
-        "GET /echo HTTP/1.1\r\n" \
-        "Host: localhost\r\n" \
-        "Upgrade: WebSocket\r\n" \
-        "Connection: Upgrade\r\n" \
-        "Sec-WebSocket-Key: " + base64.b64encode('too long. too long. too long') + "\r\n" \
-        "Sec-WebSocket-Origin: http://localhost\r\n" \
-        "Sec-WebSocket-Protocol: chat, superchat\r\n" \
-        "Sec-WebSocket-Version: 7\r\n" \
-        "\r\n"
-
-        fd.write(headers)
-        try:
-            response = read_http(fd)
-        except ConnectionClosed:
-            closed = True
-
-        assert closed, "Failed to abort connection with key that is too long"
-        fd.close()
-
-    def test_good_handshake(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        response = read_http(fd, code=101, reason="Switching Protocols")
-        response.assertHeader("Upgrade", "websocket")
-        response.assertHeader("Connection", "Upgrade")
-        response.assertHeader("Sec-WebSocket-Accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=")
-
-        fd.close();
-
-    def test_send_short_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason="Switching Protocols")
-
-        msg = 'Hello, websocket'
-        self.ws.send(msg)
-
-        preamble = fd.read(2)
-        opcode, length = struct.unpack('!BB', preamble)
-
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
-
-        rxd_msg = fd.read(length).decode('utf-8', 'replace')
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close()
-
-    def test_send_medium_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason="Switching Protocols")
-
-        msg = 'Hello, websocket' * 8
-        self.ws.send(msg)
-
-        preamble = fd.read(4)
-        opcode, length_code, length = struct.unpack('!BBH', preamble)
-
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
-        assert (length_code & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length_code == 126, 'The length code must be 126'
-        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
-
-        rxd_msg = fd.read(length).decode('utf-8', 'replace')
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close()
-
-    def test_send_long_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason="Switching Protocols")
-
-        msg = 'Hello, websocket' * 4098
-        self.ws.send(msg)
-
-        preamble = fd.read(10)
-        opcode, length_code, length = struct.unpack('!BBQ', preamble)
-
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 1, 'Opcode must be 0x1'
-        assert (length_code & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length_code == 127, 'The length code must be 127'
-        assert length == len(msg), 'Wrong length %d, expected %d' % (length, len(msg))
-
-        rxd_msg = fd.read(length).decode('utf-8', 'replace')
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close()
-
-    def test_binary_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason="Switching Protocols")
-
-        msg = struct.pack('!BHB', 129, 23, 42)
-        self.ws.send(msg, opcode=WebSocketVersion7.OPCODE_BINARY)
-
-        frame = fd.read(6)
-        opcode, length, first, second, third = struct.unpack('!BBBHB', frame)
-
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 2, 'Opcode must be 0x2'
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == 4, 'Wrong length %d, expected 4' % length
-        assert first == 129, 'Expected first value to be 129, but got %d' % first
-        assert second == 23, 'Expected second value to be 23, but got %d' % second
-        assert third == 42, 'Expected third value to be 42, but got %d' % third
-
-        fd.close()
-
-    def test_wait_bad_framing_reserved_bits(self):
-        for reserved_bits in xrange(1, 8):
-            fd = self.connect().makefile(bufsize=1)
-
-            fd.write(self.GOOD_HEADERS)
-            read_http(fd, code=101, reason='Switching Protocols')
-
-            expected_msg = 'Reserved bits cannot be set'
-
-            bad_opcode = \
-                    WebSocketVersion7.FIN | (reserved_bits << 4) | WebSocketVersion7.OPCODE_TEXT
-            fd.write(struct.pack('!BB', bad_opcode, int('10000000', 2)))
-
-            frame = self.ws.wait()
-            assert self.ws.websocket_closed, \
-                    'Failed to close connection when sent a frame with reserved bits set'
-
-            preamble = fd.read(2)
-
-            opcode, length = struct.unpack('!BB', preamble)
-            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-            reason = fd.read(2)
-            reason = struct.unpack('!H', reason)[0]
-            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-            fd.close();
-
-    def test_wait_bad_opcode(self):
-        bad_opcodes = range(WebSocketVersion7.OPCODE_BINARY + 1, WebSocketVersion7.OPCODE_CLOSE)
-        bad_opcodes += range(WebSocketVersion7.OPCODE_PONG + 1, 2**4)
-        for bad_opcode in bad_opcodes:
-            fd = self.connect().makefile(bufsize=1)
-
-            fd.write(self.GOOD_HEADERS)
-            read_http(fd, code=101, reason='Switching Protocols')
-
-            expected_msg = 'Invalid opcode %x' % bad_opcode
-
-            bad_opcode = WebSocketVersion7.FIN | bad_opcode
-            fd.write(struct.pack('!BB', bad_opcode, int('10000000', 2)))
-
-            frame = self.ws.wait()
-            assert self.ws.websocket_closed, \
-                    'Failed to close connection when sent a frame with unsupported opcode'
-
-            preamble = fd.read(2)
-
-            opcode, length = struct.unpack('!BB', preamble)
-            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-            reason = fd.read(2)
-            reason = struct.unpack('!H', reason)[0]
-            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-            fd.close();
-
-    def test_wait_no_mask(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = 'MASK must be set'
-
-        fd.write(struct.pack('!BB',
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, int('00000000', 2)))
-
-        frame = self.ws.wait()
-        assert self.ws.websocket_closed, \
-                'Failed to close connection when sent a frame with MASK not set'
-
-        preamble = fd.read(2)
-
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-        reason = fd.read(2)
-        reason = struct.unpack('!H', reason)[0]
-        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def _get_payload(self, mask, msg):
-        mask_octets = struct.unpack('!BBBB', struct.pack('!L', mask))
-        msg = unicode(msg).encode('utf-8')
-        result = ''
-        j = 0
-        for c in msg:
-            result += chr(ord(c) ^ mask_octets[j])
-            j = (j + 1) % 4
-        return result
-
-    def test_wait_short_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket'
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_med_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket' * 8
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBHL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | 126, length, mask, encoded_msg))
-
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_long_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket' * 8 #4098
-        mask = 42
-        encoded_msg = self._get_payload(42, msg)
-        length = len(encoded_msg)
-        payload = struct.pack('!BBQL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | 127, length, mask, encoded_msg)
-        fd.write(payload)
-
-        self.ws._waiter = 'test'
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_control_frames_of_unusual_size(self):
-        opcodes = [WebSocketVersion7.OPCODE_CLOSE,
-                WebSocketVersion7.OPCODE_PING,
-                WebSocketVersion7.OPCODE_PONG]
-        expected_msg = 'Control frame payload cannot be larger than 125 bytes'
-        for test_opcode in opcodes:
-            msg = ''
-            mask = 0x42424242
-            encoded_msg = self._get_payload(mask, msg)
-
-            fd = self.connect().makefile(bufsize=1)
-
-            fd.write(self.GOOD_HEADERS)
-            read_http(fd, code=101, reason='Switching Protocols')
-
-            fd.write(struct.pack('!BBHL',
-                WebSocketVersion7.FIN | test_opcode, 
-                WebSocketVersion7.MASK | 126, 0, mask))
-
-            frame = self.ws.wait()
-            assert self.ws.websocket_closed, \
-                    'Failed to close connection when sent a frame of unusual size'
-
-            preamble = fd.read(2)
-
-            opcode, length = struct.unpack('!BB', preamble)
-            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-            reason = fd.read(2)
-            reason = struct.unpack('!H', reason)[0]
-            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-            fd.close();
-
-            fd = self.connect().makefile(bufsize=1)
-
-            fd.write(self.GOOD_HEADERS)
-            read_http(fd, code=101, reason='Switching Protocols')
-
-            fd.write(struct.pack('!BBQL',
-                WebSocketVersion7.FIN | test_opcode, 
-                WebSocketVersion7.MASK | 127, length, mask))
-
-            frame = self.ws.wait()
-            assert self.ws.websocket_closed, \
-                    'Failed to close connection when sent a frame of unusual size'
-
-            preamble = fd.read(2)
-
-            opcode, length = struct.unpack('!BB', preamble)
-            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-            reason = fd.read(2)
-            reason = struct.unpack('!H', reason)[0]
-            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-            fd.close();
-
-    def test_wait_close_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket'
-        mask = 0x42424242
-        reason = 1000
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg) + 2
-        fd.write(struct.pack('!BBLH%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_CLOSE,
-            WebSocketVersion7.MASK | length, mask, reason ^ (mask >> 16), encoded_msg))
-
-        rxd_msg = self.ws.wait()
-        assert self.ws.websocket_closed, 'Did not close connection when sent a close frame'
-        assert rxd_msg == (WebSocketVersion7.OPCODE_CLOSE, (reason, msg)), \
-                'Wrong result "%s"' % (rxd_msg,)
-
-        preamble = fd.read(2)
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_CLOSE, \
-                'Opcode must be %x' % WebSocketVersion7.OPCODE_CLOSE
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == 2, 'The payload length must be 2, but got %d' % length
-        
-        reason = struct.unpack('!H', fd.read(2))[0]
-        assert reason == 1000, 'The reason must be 1000, but got %d' % reason
-
-        fd.close();
-
-    def test_wait_close_frame_no_payload(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = ''
-        mask = 0x42424242
-        encoded_msg = self._get_payload(mask, msg)
-        length = 0
-        fd.write(struct.pack('!BBL',
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_CLOSE,
-            WebSocketVersion7.MASK | length, mask))
-
-        rxd_msg = self.ws.wait()
-        assert self.ws.websocket_closed, \
-                'Did not close connection when sent a close frame with no payload'
-        assert rxd_msg == (WebSocketVersion7.OPCODE_CLOSE, (None, None)), \
-                'Wrong result "%s"' % (rxd_msg,)
-
-        preamble = fd.read(2)
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_CLOSE, \
-                'Opcode must be %x' % WebSocketVersion7.OPCODE_CLOSE
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == 2, 'The payload length must be 2, but got %d' % length
-        
-        reason = struct.unpack('!H', fd.read(2))[0]
-        assert reason == WebSocketVersion7.REASON_NORMAL, \
-                'The reason must be 1000, but got %d' % reason
-
-        fd.close();
-
-    def test_wait_ping_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket'
-        mask = 0x42424242
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PING,
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a ping frame'
-        assert rxd_msg == (WebSocketVersion7.OPCODE_PING, msg.encode('utf-8')), \
-                'Wrong result "%s"' % (rxd_msg,)
-
-        preamble = fd.read(2)
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_PONG, \
-                'Opcode must be %x' % WebSocketVersion7.OPCODE_PONG
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == len(encoded_msg), 'Wrong payload length. Got %d' % length
-
-        txd_msg = fd.read(length)
-        assert txd_msg == msg.encode('utf-8'), 'Wrong message "%s", expected "%s"' % (txd_msg, msg)
-
-        fd.close();
-
-    def test_wait_pong_frame(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        msg = 'Hello, websocket'
-        mask = 0x42424242
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PONG,
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a pong frame'
-        assert rxd_msg == (WebSocketVersion7.OPCODE_PONG, msg.encode('utf-8')), \
-                'Wrong result "%s"' % (rxd_msg,)
-
-        fd.close();
-
-    def test_wait_fragmented_control_frame(self):
-        opcodes = [WebSocketVersion7.OPCODE_CLOSE,
-                WebSocketVersion7.OPCODE_PING,
-                WebSocketVersion7.OPCODE_PONG]
-        expected_msg = 'Control frames cannot be fragmented'
-        for test_opcode in opcodes:
-            msg = ''
-            mask = 0x42424242
-            encoded_msg = self._get_payload(mask, msg)
-
-            fd = self.connect().makefile(bufsize=1)
-
-            fd.write(self.GOOD_HEADERS)
-            read_http(fd, code=101, reason='Switching Protocols')
-
-            fd.write(struct.pack('!BBL', test_opcode, WebSocketVersion7.MASK | 0, mask))
-
-            frame = self.ws.wait()
-            assert self.ws.websocket_closed, \
-                    'Failed to close connection when sent a fragmented control frame'
-
-            preamble = fd.read(2)
-
-            opcode, length = struct.unpack('!BB', preamble)
-            assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-            assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-            assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-            reason = fd.read(2)
-            reason = struct.unpack('!H', reason)[0]
-            assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-            rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-            assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-            fd.close();
-
-    def test_wait_unfinished_fragmented_message(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = 'Received new unfragmented data frame during fragmented message'
-
-        msg = 'Hello, '
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        frame = self.ws.wait()
-        assert self.ws.websocket_closed, \
-                'Failed to close connection when sent new unfragmented ' + \
-                'data frame during fragmented message'
-
-        preamble = fd.read(2)
-
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-        reason = fd.read(2)
-        reason = struct.unpack('!H', reason)[0]
-        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_bad_fragment_opcode(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = 'Received new fragment frame with non-zero opcode'
-
-        msg = 'Hello, '
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        frame = self.ws.wait()
-        assert self.ws.websocket_closed, \
-                'Failed to close connection when sent fragment with non-zero opcode'
-
-        preamble = fd.read(2)
-
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == 8, 'Opcode must be 0x8'
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-
-        reason = fd.read(2)
-        reason = struct.unpack('!H', reason)[0]
-        assert reason == 1002, 'Expected reason to be 1002, but got %d' % reason
-
-        rxd_msg = fd.read(length - 2).decode('utf-8', 'replace')
-        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_two_fragments(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = ''
-
-        msg = 'Hello, '
-        expected_msg += msg
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        msg = 'websocket'
-        expected_msg += msg
-        mask = 23
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_three_fragments(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = ''
-
-        msg = 'Hello, '
-        expected_msg += msg
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        msg = 'websocket'
-        expected_msg += msg
-        mask = 1337
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_FRAG, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        msg = '!'
-        expected_msg += msg
-        mask = 23
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == expected_msg, 'Wrong message "%s"' % rxd_msg
-
-        fd.close();
-
-    def test_wait_control_frame_during_fragmented_message(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        expected_msg = ''
-
-        msg = 'Hello, '
-        expected_msg += msg
-        mask = 42
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.OPCODE_TEXT, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        ping_msg = 'Marco!'
-        mask = 0x42424242
-        encoded_ping_msg = self._get_payload(mask, ping_msg)
-        length = len(encoded_ping_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_PING,
-            WebSocketVersion7.MASK | length, mask, encoded_ping_msg))
-
-        msg = 'websocket'
-        expected_msg += msg
-        mask = 23
-        encoded_msg = self._get_payload(mask, msg)
-        length = len(encoded_msg)
-        fd.write(struct.pack('!BBL%ds' % length,
-            WebSocketVersion7.FIN | WebSocketVersion7.OPCODE_FRAG, 
-            WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-        rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a ping frame'
-        assert rxd_msg == (WebSocketVersion7.OPCODE_PING, ping_msg.encode('utf-8')), \
-                'Wrong result "%s"' % (rxd_msg,)
-
-        preamble = fd.read(2)
-        opcode, length = struct.unpack('!BB', preamble)
-        assert opcode & WebSocketVersion7.FIN, 'FIN must be set'
-        assert (opcode & WebSocketVersion7.OPCODE) == WebSocketVersion7.OPCODE_PONG, \
-                'Opcode must be %x' % WebSocketVersion7.OPCODE_PONG
-        assert (length & WebSocketVersion7.MASK) == 0, 'MASK must not be set'
-        assert length == len(encoded_ping_msg), 'Wrong payload length. Got %d' % length
-
-        txd_msg = fd.read(length)
-        assert txd_msg == ping_msg.encode('utf-8'), \
-                'Wrong message "%s", expected "%s"' % (txd_msg, ping_msg)
-
-        opcode, rxd_msg = self.ws.wait()
-        assert not self.ws.websocket_closed, 'Closed connection when sent a good frame'
-        assert opcode == WebSocketVersion7.OPCODE_TEXT, 'Wrong opcode "%x"' % opcode
-        assert rxd_msg == expected_msg, \
-                'Wrong message "%s", expected "%s"' % (rxd_msg, expected_msg)
-
-        fd.close();
-
-    def test_wait_two_fragmented_messages(self):
-        fd = self.connect().makefile(bufsize=1)
-
-        fd.write(self.GOOD_HEADERS)
-        read_http(fd, code=101, reason='Switching Protocols')
-
-        for x in xrange(0, 2):
-            expected_msg = ''
-
-            msg = 'Hello, '
-            expected_msg += msg
-            mask = 42
-            encoded_msg = self._get_payload(mask, msg)
-            length = len(encoded_msg)
-            fd.write(struct.pack('!BBL%ds' % length,
-                WebSocketVersion7.OPCODE_TEXT, 
-                WebSocketVersion7.MASK | length, mask, encoded_msg))
-
-            msg = 'websocket %d' % x
-            expected_msg += msg
-            mask = 23
-            encoded_msg = self._get_payload(mask, msg)
<