Commits

Jeffrey Gelens  committed deff731

Backported Nick Joyce's changes @ https://github.com/njoyce which improves error
handling, code structure and more.

  • Participants
  • Parent commits c1f924a

Comments (0)

Files changed (3)

File geventwebsocket/exceptions.py

 
 
 class WebSocketError(socket_error):
-    pass
+    """
+    Base class for all websocket errors.
+    """
 
 
-class FrameTooLargeException(WebSocketError):
-    pass
+class ProtocolError(WebSocketError):
+    """
+    Raised if an error occurs when de/encoding the websocket protocol.
+    """
+
+
+class FrameTooLargeException(ProtocolError):
+    """
+    Raised if a frame is received that is too large.
+    """

File geventwebsocket/handler.py

 import base64
-import re
-import struct
-from hashlib import md5, sha1
-from socket import error as socket_error
-from urllib import quote
+import hashlib
 
 from gevent.pywsgi import WSGIHandler
-from geventwebsocket.websocket import WebSocketHybi, WebSocketHixie
+from .websocket import WebSocket, Stream
 
 
 class WebSocketHandler(WSGIHandler):
-    """Automatically upgrades the connection to websockets.
+    """
+    Automatically upgrades the connection to a websocket.
 
     To prevent the WebSocketHandler to call the underlying WSGI application,
     but only setup the WebSocket negotiations, do:
 
       mywebsockethandler.prevent_wsgi_call = True
 
-    before calling handle_one_response().  This is useful if you want to do
-    more things before calling the app, and want to off-load the WebSocket
-    negotiations to this library.  Socket.IO needs this for example, to
-    send the 'ack' before yielding the control to your WSGI app.
+    before calling run_application().  This is useful if you want to do more
+    things before calling the app, and want to off-load the WebSocket
+    negotiations to this library.  Socket.IO needs this for example, to send
+    the 'ack' before yielding the control to your WSGI app.
     """
 
+    SUPPORTED_VERSIONS = ('13', '8', '7')
     GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
-    SUPPORTED_VERSIONS = ('13', '8', '7')
 
-    def handle_one_response(self):
-        self.pre_start()
-        environ = self.environ
-        upgrade = environ.get('HTTP_UPGRADE', '').lower()
+    def run_websocket(self):
+        """
+        Called when a websocket has been created successfully.
+        """
+        if hasattr(self, 'prevent_wsgi_call') and self.prevent_wsgi_call:
+            return
+
+        # Since we're now a websocket connection, we don't care what the
+        # application actually responds with for the http response
+        try:
+            self.application(self.environ, lambda s, h: [])
+        finally:
+            self.websocket.close()
+
+    def run_application(self):
+        self.logger.debug("Application started")
+        self.result = self.upgrade_websocket()
+
+        if hasattr(self, 'websocket'):
+            if self.status and not self.headers_sent:
+                self.write('')
+
+            self.run_websocket()
+        else:
+            if self.status:
+                # A status was set, likely an error so just send the response
+                if not self.result:
+                    self.result = []
+
+                self.process_result()
+                return
+
+            # This handler did not handle the request, so defer it to the
+            # underlying application object
+            return super(WebSocketHandler, self).run_application()
+
+    def upgrade_websocket(self):
+        """
+        Attempt to upgrade the current environ into a websocket enabled
+        connection. If successful, the environ dict with be updated with two
+        new entries, `wsgi.websocket` and `wsgi.websocket_version`.
+
+        :returns: Whether the upgrade was successful.
+        """
+
+        # Some basic sanity checks first
+
+        self.logger.debug("Validating WebSocket request")
+
+        if self.environ.get('REQUEST_METHOD', '') != 'GET':
+            self.start_response('400 Bad Request', [])
+            self.logger.warning("No request method in headers")
+
+            return ['Unknown request method']
+
+        if self.request_version != 'HTTP/1.1':
+            self.start_response('402 Bad Request', [])
+            self.logger.warning("Bad server protocol in headers")
+
+            return ['Bad protocol version']
+
+        upgrade = self.environ.get('HTTP_UPGRADE', '').lower()
 
         if upgrade == 'websocket':
-            connection = environ.get('HTTP_CONNECTION', '').lower()
-            if 'upgrade' in connection:
-                return self._handle_websocket()
-        return super(WebSocketHandler, self).handle_one_response()
+            connection = self.environ.get('HTTP_CONNECTION', '').lower()
 
-    def pre_start(self):
-        pass
+            if 'upgrade' not in connection:
+                # this is not a websocket request, so we must not handle it
+                self.logger.warning("Client didn't ask for a connection "
+                                    "upgrade")
+                return
 
-    def _fake_start_response(self, *args, **kwargs):
-        return None
+        if self.environ.get('HTTP_SEC_WEBSOCKET_VERSION'):
+            return self.upgrade_connection()
+        else:
+            self.start_response('426 Upgrade Required', [
+                ('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))])
+            self.logger.warning("No protocol defined")
 
-    def _handle_websocket(self):
-        environ = self.environ
+            return ['No Websocket protocol version defined']
+
+    def upgrade_connection(self):
+        """
+        Validate and 'upgrade' the HTTP request to a WebSocket request.
+
+        If an upgrade succeeded then then handler will have `start_response`
+        with a status of `101`, the environ will also be updated with
+        `wsgi.websocket` and `wsgi.websocket_version` keys.
+
+        :param environ: The WSGI environ dict.
+        :param start_response: The callable used to start the response.
+        :param stream: File like object that will be read from/written to by
+            the underlying WebSocket object, if created.
+        :return: The WSGI response iterator is something went awry.
+        """
+
+        self.logger.debug("Attempting to upgrade connection")
+
+        version = self.environ.get("HTTP_SEC_WEBSOCKET_VERSION")
+
+        if version not in self.SUPPORTED_VERSIONS:
+            msg = "Unsupported WebSocket Version: {0}".format(version)
+
+            self.logger.warning(msg)
+            self.start_response('400 Bad Request', [
+                ('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))
+            ])
+
+            return [msg]
+
+        key = self.environ.get("HTTP_SEC_WEBSOCKET_KEY", '').strip()
+
+        if not key:
+            # 5.2.1 (3)
+            msg = "Sec-WebSocket-Key header is missing/empty"
+
+            self.logger.warning(msg)
+            self.start_response('400 Bad Request', [])
+
+            return [msg]
 
         try:
-            if environ.get("HTTP_SEC_WEBSOCKET_VERSION"):
-                self.close_connection = True
-                result = self._handle_hybi()
-            elif environ.get("HTTP_ORIGIN"):
-                self.close_connection = True
-                result = self._handle_hixie()
+            key_len = len(base64.b64decode(key))
+        except TypeError:
+            msg = "Invalid key: {0}".format(key)
 
-            self.result = []
-            if not result:
-                return
+            self.logger.warning(msg)
+            self.start_response('400 Bad Request', [])
 
-            if not hasattr(self, 'prevent_wsgi_call'):
-                self.application(environ, self._fake_start_response)
+            return [msg]
 
-            return []
-        finally:
-            self.log_request()
+        if key_len != 16:
+            # 5.2.1 (3)
+            msg = "Invalid key: {0}".format(key)
 
-    def _handle_hybi(self):
-        environ = self.environ
-        version = environ.get("HTTP_SEC_WEBSOCKET_VERSION")
+            self.logger.warning(msg)
+            self.start_response('400 Bad Request', [])
 
-        environ['wsgi.websocket_version'] = 'hybi-%s' % version
+            return [msg]
 
-        if version not in self.SUPPORTED_VERSIONS:
-            self.log_error('400: Unsupported Version: %r', version)
-            self.respond(
-                '400 Unsupported Version',
-                [('Sec-WebSocket-Version', '13, 8, 7')]
-            )
-            return
+        self.websocket = WebSocket(self.environ, Stream(self))
 
-        protocol, version = self.request_version.split("/")
-        key = environ.get("HTTP_SEC_WEBSOCKET_KEY")
-
-        # check client handshake for validity
-        if not environ.get("REQUEST_METHOD") == "GET":
-            # 5.2.1 (1)
-            self.respond('400 Bad Request')
-            return
-        elif not protocol == "HTTP":
-            # 5.2.1 (1)
-            self.respond('400 Bad Request')
-            return
-        elif float(version) < 1.1:
-            # 5.2.1 (1)
-            self.respond('400 Bad Request')
-            return
-        # XXX: nobody seems to set SERVER_NAME correctly. check the spec
-        #elif not environ.get("HTTP_HOST") == environ.get("SERVER_NAME"):
-            # 5.2.1 (2)
-            #self.respond('400 Bad Request')
-            #return
-        elif not key:
-            # 5.2.1 (3)
-            self.log_error('400: HTTP_SEC_WEBSOCKET_KEY is missing from request')
-            self.respond('400 Bad Request')
-            return
-        elif len(base64.b64decode(key)) != 16:
-            # 5.2.1 (3)
-            self.log_error('400: Invalid key: %r', key)
-            self.respond('400 Bad Request')
-            return
-
-        self.websocket = WebSocketHybi(self.socket, environ)
-        environ['wsgi.websocket'] = self.websocket
-
-        # TODO: just echo the protocols for now
-        ws_protocols = environ.get("HTTP_SEC_WEBSOCKET_PROTOCOL", "")
+        self.environ.update({
+            'wsgi.websocket_version': version,
+            'wsgi.websocket': self.websocket
+        })
 
         headers = [
             ("Upgrade", "websocket"),
             ("Connection", "Upgrade"),
-            ("Sec-WebSocket-Protocol", ws_protocols),
-            ("Sec-WebSocket-Accept", base64.b64encode(sha1(key + self.GUID).digest())),
+            ("Sec-WebSocket-Accept", base64.b64encode(
+                hashlib.sha1(key + self.GUID).digest())),
         ]
-        self._send_reply("101 Switching Protocols", headers)
-        return True
 
-    def _handle_hixie(self):
-        environ = self.environ
-        assert "upgrade" in self.environ.get("HTTP_CONNECTION", "").lower()
+        self.logger.debug("WebSocket request accepted, switching protocols")
+        self.start_response("101 Switching Protocols", headers)
 
-        self.websocket = WebSocketHixie(self.socket, environ)
-        environ['wsgi.websocket'] = self.websocket
+    @property
+    def logger(self):
+        return self.server.logger
 
-        key1 = self.environ.get('HTTP_SEC_WEBSOCKET_KEY1')
-        key2 = self.environ.get('HTTP_SEC_WEBSOCKET_KEY2')
 
-        if key1 is not None:
-            environ['wsgi.websocket_version'] = 'hixie-76'
-            if not key1:
-                self.log_error("400: SEC-WEBSOCKET-KEY1 header is empty")
-                self.respond('400 Bad Request')
-                return
-            if not key2:
-                self.log_error("400: SEC-WEBSOCKET-KEY2 header is missing or empty")
-                self.respond('400 Bad Request')
-                return
-
-            part1 = self._get_key_value(key1)
-            part2 = self._get_key_value(key2)
-            if part1 is None or part2 is None:
-                self.respond('400 Bad Request')
-                return
-
-            headers = [
-                ("Upgrade", "WebSocket"),
-                ("Connection", "Upgrade"),
-                ("Sec-WebSocket-Location", reconstruct_url(environ)),
-            ]
-            if self.websocket.protocol is not None:
-                headers.append(("Sec-WebSocket-Protocol", self.websocket.protocol))
-            if self.websocket.origin:
-                headers.append(("Sec-WebSocket-Origin", self.websocket.origin))
-
-            self._send_reply("101 WebSocket Protocol Handshake", headers)
-
-            # This request should have 8 bytes of data in the body
-            key3 = self.rfile.read(8)
-
-            challenge = md5(struct.pack("!II", part1, part2) + key3).digest()
-
-            self.socket.sendall(challenge)
-            return True
-        else:
-            environ['wsgi.websocket_version'] = 'hixie-75'
-            headers = [
-                ("Upgrade", "WebSocket"),
-                ("Connection", "Upgrade"),
-                ("WebSocket-Location", reconstruct_url(environ)),
-            ]
-
-            if self.websocket.protocol is not None:
-                headers.append(("WebSocket-Protocol", self.websocket.protocol))
-            if self.websocket.origin:
-                headers.append(("WebSocket-Origin", self.websocket.origin))
-
-            self._send_reply("101 Web Socket Protocol Handshake", headers)
-
-    def _send_reply(self, status, headers):
-        self.status = status
-
-        towrite = []
-        towrite.append('%s %s\r\n' % (self.request_version, self.status))
-
-        for header in headers:
-            towrite.append("%s: %s\r\n" % header)
-
-        towrite.append("\r\n")
-        msg = ''.join(towrite)
-        self.socket.sendall(msg)
-        self.headers_sent = True
-
-    def respond(self, status, headers=[]):
-        self.close_connection = True
-        self._send_reply(status, headers)
-
-        if self.socket is not None:
-            try:
-                self.socket._sock.close()
-                self.socket.close()
-            except socket_error:
-                pass
-
-    def _get_key_value(self, key_value):
-        key_number = int(re.sub("\\D", "", key_value))
-        spaces = re.subn(" ", "", key_value)[1]
-
-        if key_number % spaces != 0:
-            self.log_error("key_number %d is not an intergral multiple of spaces %d", key_number, spaces)
-        else:
-            return key_number / spaces
-
-
-def reconstruct_url(environ):
-    secure = environ['wsgi.url_scheme'] == 'https'
-    if secure:
-        url = 'wss://'
-    else:
-        url = 'ws://'
-
-    if environ.get('HTTP_HOST'):
-        url += environ['HTTP_HOST']
-    else:
-        url += environ['SERVER_NAME']
-
-        if secure:
-            if environ['SERVER_PORT'] != '443':
-                url += ':' + environ['SERVER_PORT']
-        else:
-            if environ['SERVER_PORT'] != '80':
-                url += ':' + environ['SERVER_PORT']
-
-    url += quote(environ.get('SCRIPT_NAME', ''))
-    url += quote(environ.get('PATH_INFO', ''))
-
-    if environ.get('QUERY_STRING'):
-        url += '?' + environ['QUERY_STRING']
-
-    return url
-
-
-class MessageHandler(object):
-    def __init__(self, environ, interfaces):
-        self.ws = environ['wsgi.websocket']
-        self.interfaces = interfaces
-
-        if self.ws.path in interfaces.keys():
-            self.active_interface = interfaces[self.ws.path]
-        else:
-            raise Exception("no interface found")
-
-        self.on_open()
-        self._handle()
-
-    def _handle(self):
-        while True:
-            message = self.ws.receive()
-
-            if message is None:
-                self.active_interface.on_close()
-                break
-            else:
-                self.active_interface.on_message(message)
+#class MessageHandler(object):
+#    def __init__(self, environ, interfaces):
+#        self.ws = environ['wsgi.websocket']
+#        self.interfaces = interfaces
+#
+#        if self.ws.path in interfaces.keys():
+#            self.active_interface = interfaces[self.ws.path]
+#        else:
+#            raise Exception("no interface found")
+#
+#        self.on_open()
+#        self._handle()
+#
+#    def _handle(self):
+#        while True:
+#            message = self.ws.receive()
+#
+#            if message is None:
+#                self.active_interface.on_close()
+#                break
+#            else:
+#                self.active_interface.on_message(message)

File geventwebsocket/websocket.py

 import struct
 
-from errno import EINTR
-from gevent.coros import Semaphore
+from socket import error
 
-from python_fixes import makefile, is_closed
-from exceptions import FrameTooLargeException, WebSocketError
+from .exceptions import ProtocolError
+from .exceptions import WebSocketError
+from .exceptions import FrameTooLargeException
 
 
 class WebSocket(object):
-    def _encode_text(self, text):
-        if isinstance(text, unicode):
-            return text.encode('utf-8')
-        else:
+    """
+    Base class for supporting websocket operations.
+
+    :ivar environ: The http environment referenced by this connection.
+    :ivar closed: Whether this connection is closed/closing.
+    :ivar stream: The underlying file like object that will be read from /
+        written to by this WebSocket object.
+    """
+
+    __slots__ = ('environ', 'closed', 'stream', 'raw_write', 'raw_read')
+
+    OPCODE_CONTINUATION = 0x00
+    OPCODE_TEXT = 0x01
+    OPCODE_BINARY = 0x02
+    OPCODE_CLOSE = 0x08
+    OPCODE_PING = 0x09
+    OPCODE_PONG = 0x0a
+
+    def __init__(self, environ, stream):
+        self.environ = environ
+        self.closed = False
+
+        self.stream = stream
+
+        self.raw_write = stream.write
+        self.raw_read = stream.read
+
+    def __del__(self):
+        try:
+            self.close()
+        except:
+            # close() may fail if __init__ didn't complete
+            pass
+
+    def _decode_bytes(self, bytestring):
+        """
+        Internal method used to convert the utf-8 encoded bytestring into
+        unicode.
+
+        If the conversion fails, the socket will be closed.
+        """
+
+        if not bytestring:
+            return u''
+
+        try:
+            return bytestring.decode('utf-8')
+        except UnicodeDecodeError:
+            self.close(1007)
+
+            raise
+
+    def _encode_bytes(self, text):
+        """
+        :returns: The utf-8 byte string equivalent of `text`.
+        """
+
+        if isinstance(text, str):
             return text
 
+        if not isinstance(text, unicode):
+            text = unicode(text or '')
 
-class WebSocketHixie(WebSocket):
-    def __init__(self, socket, environ):
-        self.origin = environ.get('HTTP_ORIGIN')
-        self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
-        self.path = environ.get('PATH_INFO')
-        self.fobj = socket.makefile()
-        self.socket = socket
-        self._writelock = Semaphore(1)
-        self._write = socket.sendall
+        return text.encode('utf-8')
 
-    def send(self, message):
-        message = self._encode_text(message)
+    def is_valid_close_code(self, code):
+        """
+        :returns: Whether the returned close code is a valid hybi return code.
+        """
+        if code < 1000:
+            return False
 
-        with self._writelock:
-            self._write("\x00" + message + "\xFF")
+        if 1004 <= code <= 1006:
+            return False
 
-    def close(self):
-        if self.fobj is not None:
-            self.fobj.close()
-            self.fobj = None
-            self._write = None
+        if 1012 <= code <= 1016:
+            return False
 
-    def _message_length(self):
-        length = 0
+        if code == 1100:
+            # not sure about this one but the autobahn fuzzer requires it.
+            return False
+
+        if 2000 <= code <= 2999:
+            return False
+
+        return True
+
+    @property
+    def origin(self):
+        if not self.environ:
+            return
+
+        return self.environ.get('HTTP_ORIGIN')
+
+    @property
+    def protocol(self):
+        if not self.environ:
+            return
+
+        return self.environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
+
+    @property
+    def version(self):
+        if not self.environ:
+            return
+
+        return self.environ.get('HTTP_SEC_WEBSOCKET_VERSION')
+
+    @property
+    def path(self):
+        if not self.environ:
+            return
+
+        return self.environ.get('PATH_INFO')
+
+    def handle_close(self, header, payload):
+        """
+        Called when a close frame has been decoded from the stream.
+
+        :param header: The decoded `Header`.
+        :param payload: The bytestring payload associated with the close frame.
+        """
+        if not payload:
+            self.close(1000, None)
+
+            return
+
+        if len(payload) < 2:
+            raise ProtocolError('Invalid close frame: {0} {1}'.format(
+                header, payload))
+
+        code = struct.unpack('!H', str(payload[:2]))[0]
+        payload = payload[2:]
+
+        if payload:
+            payload = self._decode_bytes(payload)
+
+        if not self._is_valid_close_code(code):
+            raise ProtocolError('Invalid close code {0}'.format(code))
+
+        self.close(code, payload)
+
+    def handle_ping(self, header, payload):
+        self.send_frame(payload, self.OPCODE_PONG)
+
+    def handle_pong(self, header, payload):
+        pass
+
+    def read_frame(self):
+        """
+        Block until a full frame has been read from the socket.
+
+        This is an internal method as calling this will not cleanup correctly
+        if an exception is called. Use `receive` instead.
+
+        :return: The header and payload as a tuple.
+        """
+        header = Header.decode_header(self.stream)
+
+        if header.flags:
+            raise ProtocolError
+
+        if not header.length:
+            return header, ''
+
+        try:
+            payload = self.raw_read(header.length)
+        except error:
+            payload = ''
+        except Exception:
+            # TODO log out this exception
+            payload = ''
+
+        if len(payload) != header.length:
+            raise WebSocketError('Unexpected EOF reading frame payload')
+
+        if header.mask:
+            payload = header.unmask_payload(payload)
+
+        return header, payload
+
+    def read_message(self):
+        """
+        Return the next text or binary message from the socket.
+
+        This is an internal method as calling this will not cleanup correctly
+        if an exception is called. Use `receive` instead.
+        """
+        opcode = None
+        message = ""
 
         while True:
-            if self.fobj is None:
-                raise WebSocketError('Connection closed unexpectedly while reading message length')
-            byte_str = self.fobj.read(1)
+            header, payload = self.read_frame()
+            f_opcode = header.opcode
 
-            if not byte_str:
-                return 0
+            if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
+                # a new frame
+                if opcode:
+                    raise ProtocolError("The opcode in non-fin frame is "
+                                        "expected to be zero, got "
+                                        "{0!r}".format(f_opcode))
+
+                opcode = f_opcode
+
+            elif f_opcode == self.OPCODE_CONTINUATION:
+                if not opcode:
+                    raise ProtocolError("Unexpected frame with opcode=0")
+
+            elif f_opcode == self.OPCODE_PING:
+                self.handle_ping(header, payload)
+                continue
+
+            elif f_opcode == self.OPCODE_PONG:
+                self.handle_pong(header, payload)
+                continue
+
+            elif f_opcode == self.OPCODE_CLOSE:
+                self.handle_close(header, payload)
+                return
+
             else:
-                byte = ord(byte_str)
+                raise ProtocolError("Unexpected opcode={0!r}".format(f_opcode))
 
-            if byte != 0x00:
-                length = length * 128 + (byte & 0x7f)
-                if (byte & 0x80) != 0x80:
-                    break
+            message += payload
 
-        return length
-
-    def _read_until(self):
-        bytes = []
-
-        read = self.fobj.read
-
-        while True:
-            if self.fobj is None:
-                msg = ''.join(bytes)
-                raise WebSocketError('Connection closed unexpectedly while reading message: %r' % msg)
-
-            byte = read(1)
-            if ord(byte) != 0xff:
-                bytes.append(byte)
-            else:
+            if header.fin:
                 break
 
-        return ''.join(bytes)
+        if opcode == self.OPCODE_TEXT:
+            return self._decode_bytes(message)
+
+        return bytearray(message)
 
     def receive(self):
-        read = self.fobj.read
+        """
+        Read and return a message from the stream. If `None` is returned, then
+        the socket is considered closed/errored.
+        """
+        if self.closed:
+            return
 
-        while self.fobj is not None:
-            frame_str = read(1)
+        try:
+            return self.read_message()
 
-            if not frame_str:
-                self.close()
-                return
-            else:
-                frame_type = ord(frame_str)
+        except ProtocolError:
+            self.close(1002)
+            raise
 
-            if frame_type == 0x00:
-                bytes = self._read_until()
-                return bytes.decode("utf-8", "replace")
-            else:
-                raise WebSocketError("Received an invalid frame_type=%r" % frame_type)
+        except error:
+            raise WebSocketError("Socket is dead")
 
+    def send_frame(self, message, opcode):
+        """
+        Send a frame over the websocket with message as its payload
+        """
+        if self.closed:
+            raise WebSocketError("The connection was closed")
 
-class WebSocketHybi(WebSocket):
-    OPCODE_TEXT = 0x1
-    OPCODE_BINARY = 0x2
-    OPCODE_CLOSE = 0x8
-    OPCODE_PING = 0x9
-    OPCODE_PONG = 0xA
+        if opcode == self.OPCODE_TEXT:
+            message = self._encode_bytes(message)
+        elif opcode == self.OPCODE_BINARY:
+            message = str(message)
 
-    def __init__(self, socket, environ):
-        self.origin = environ.get('HTTP_SEC_WEBSOCKET_ORIGIN')
-        self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'unknown')
-        self.path = environ.get('PATH_INFO')
-        self._chunks = bytearray()
-        self._writelock = Semaphore(1)
-        self.socket = socket
-        self._write = socket.sendall
-        self.fobj = makefile(socket)
-        self.close_code = None
-        self.close_message = None
-        self._reading = False
+        header = Header.encode_header(True, opcode, '', len(message), 0)
 
-    def _parse_header(self, data):
+        try:
+            self.raw_write(header + message)
+        except error:
+            raise WebSocketError("Socket is dead")
+
+    def send(self, message, binary=None):
+        """
+        Send a frame over the websocket with message as its payload
+        """
+        if binary is None:
+            binary = not isinstance(message, (str, unicode))
+
+        opcode = self.OPCODE_BINARY if binary else self.OPCODE_TEXT
+
+        self.send_frame(message, opcode)
+
+    def close(self, code=1000, message=''):
+        """
+        Close the websocket and connection, sending the specified code and
+        message.  The underlying socket object is _not_ closed, that is the
+        responsibility of the initiator.
+        """
+
+        if self.closed:
+            return
+
+        try:
+            message = self._encode_bytes(message)
+
+            self.send_frame(
+                struct.pack('!H%ds' % len(message), code, message),
+                opcode=self.OPCODE_CLOSE)
+        except WebSocketError:
+            # failed to write the closing frame but it's ok because we're
+            # closing the socket anyway.
+            pass
+        finally:
+            self.closed = True
+
+            self.stream = None
+            self.raw_write = None
+            self.raw_read = None
+
+            self.environ = None
+
+
+class Stream(object):
+    """
+    Wraps the handler's socket/rfile attributes and makes it in to a file like
+    object that can be read from/written to by the lower level websocket api.
+    """
+
+    __slots__ = ('handler', 'read', 'write')
+
+    def __init__(self, handler):
+        self.handler = handler
+        self.read = handler.rfile.read
+        self.write = handler.socket.sendall
+
+
+class Header(object):
+    __slots__ = ('fin', 'mask', 'opcode', 'flags', 'length')
+
+    FIN_MASK = 0x80
+    OPCODE_MASK = 0x0f
+    MASK_MASK = 0x80
+    LENGTH_MASK = 0x7f
+
+    RSV0_MASK = 0x40
+    RSV1_MASK = 0x20
+    RSV2_MASK = 0x10
+
+    # bitwise mask that will determine the reserved bits for a frame header
+    HEADER_FLAG_MASK = RSV0_MASK | RSV1_MASK | RSV2_MASK
+
+    def __init__(self, fin=0, opcode=0, flags=0, length=0):
+        self.mask = ''
+        self.fin = fin
+        self.opcode = opcode
+        self.flags = flags
+        self.length = length
+
+    def mask_payload(self, payload):
+        payload = bytearray(payload)
+        mask = bytearray(self.mask)
+
+        for i in xrange(self.length):
+            payload[i] ^= mask[i % 4]
+
+        return str(payload)
+
+    # it's the same operation
+    unmask_payload = mask_payload
+
+    def __repr__(self):
+        return ("<Header fin={0} opcode={1} length={2} flags={3} at "
+                "0x{4!x}>").format(self.fin, self.opcode, self.length,
+                                   self.flags, id(self))
+
+    @classmethod
+    def decode_header(cls, stream):
+        """
+        Decode a WebSocket header.
+
+        :param stream: A file like object that can be 'read' from.
+        :returns: A `Header` instance.
+        """
+        read = stream.read
+        data = read(2)
+
         if len(data) != 2:
-            self._close()
-            raise WebSocketError('Incomplete read while reading header: %r' % data)
+            raise WebSocketError("Unexpected EOF while decoding header")
 
         first_byte, second_byte = struct.unpack('!BB', data)
 
-        fin = (first_byte >> 7) & 1
-        rsv1 = (first_byte >> 6) & 1
-        rsv2 = (first_byte >> 5) & 1
-        rsv3 = (first_byte >> 4) & 1
-        opcode = first_byte & 0xf
+        header = cls(
+            fin=first_byte & cls.FIN_MASK == cls.FIN_MASK,
+            opcode=first_byte & cls.OPCODE_MASK,
+            flags=first_byte & cls.HEADER_FLAG_MASK,
+            length=second_byte & cls.LENGTH_MASK)
 
-        # frame-fin = %x0 ; more frames of this message follow
-        #           / %x1 ; final frame of this message
+        has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK
 
-        # frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
-        # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
-        # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
-        if rsv1 or rsv2 or rsv3:
-            self.close(1002)
-            raise WebSocketError('Received frame with non-zero reserved bits: %r' % str(data))
+        if header.opcode > 0x07:
+            if not header.fin:
+                raise ProtocolError(
+                    "Received fragmented control frame: {0!r}".format(data))
 
-        if opcode > 0x7 and fin == 0:
-            self.close(1002)
-            raise WebSocketError('Received fragmented control frame: %r' % str(data))
+            # Control frames MUST have a payload length of 125 bytes or less
+            if header.length > 125:
+                raise FrameTooLargeException(
+                    "Control frame cannot be larger than 125 bytes: "
+                    "{0!r}".format(data))
 
-        if len(self._chunks) > 0 and fin == 0 and not opcode:
-            self.close(1002)
-            raise WebSocketError('Received new fragment frame with non-zero opcode: %r' % str(data))
+        if header.length == 126:
+            # 16 bit length
+            data = read(2)
 
-        if len(self._chunks) > 0 and fin == 1 and (self.OPCODE_TEXT <= opcode <= self.OPCODE_BINARY):
-            self.close(1002)
-            raise WebSocketError('Received new unfragmented data frame during fragmented message: %r' % str(data))
+            if len(data) != 2:
+                raise WebSocketError('Unexpected EOF while decoding header')
 
-        has_mask = (second_byte >> 7) & 1
-        length = (second_byte) & 0x7f
+            header.length = struct.unpack('!H', data)[0]
+        elif header.length == 127:
+            # 64 bit length
+            data = read(8)
 
-        # Control frames MUST have a payload length of 125 bytes or less
-        if opcode > 0x7 and length > 125:
-            self.close(1002)
-            raise FrameTooLargeException("Control frame payload cannot be larger than 125 bytes: %r" % str(data))
+            if len(data) != 8:
+                raise WebSocketError('Unexpected EOF while decoding header')
 
-        return fin, opcode, has_mask, length
+            header.length = struct.unpack('!Q', data)[0]
 
-    def receive_frame(self):
-        """Return the next frame from the socket."""
-        fobj = self.fobj
+        if has_mask:
+            mask = read(4)
 
-        if fobj is None:
-            return
+            if len(mask) != 4:
+                raise WebSocketError('Unexpected EOF while decoding header')
 
-        if is_closed(fobj):
-            return
+            header.mask = mask
 
-        read = self.fobj.read
+        return header
 
-        assert not self._reading, 'Reading is not possible from multiple greenlets'
-        self._reading = True
+    @classmethod
+    def encode_header(cls, fin, opcode, mask, length, flags):
+        """
+        Encodes a WebSocket header.
 
-        try:
-            data0 = read(2)
+        :param fin: Whether this is the final frame for this opcode.
+        :param opcode: The opcode of the payload, see `OPCODE_*`
+        :param mask: Whether the payload is masked.
+        :param length: The length of the frame.
+        :param flags: The RSV* flags.
+        :return: A bytestring encoded header.
+        """
+        first_byte = opcode
+        second_byte = 0
+        extra = ''
 
-            if not data0:
-                self._close()
-                return
+        if fin:
+            first_byte |= cls.FIN_MASK
 
-            fin, opcode, has_mask, length = self._parse_header(data0)
+        if flags & cls.RSV0_MASK:
+            first_byte |= cls.RSV0_MASK
 
-            if not has_mask and length:
-                self.close(1002)
-                raise WebSocketError('Message from client is not masked')
+        if flags & cls.RSV1_MASK:
+            first_byte |= cls.RSV1_MASK
 
-            if length < 126:
-                data1 = ''
-            elif length == 126:
-                data1 = read(2)
+        if flags & cls.RSV2_MASK:
+            first_byte |= cls.RSV2_MASK
 
-                if len(data1) != 2:
-                    self.close()
-                    raise WebSocketError('Incomplete read while reading 2-byte length: %r' % (data0 + data1))
+        # now deal with length complexities
+        if length < 126:
+            second_byte += length
+        elif length <= 0xffff:
+            second_byte += 126
+            extra = struct.pack('!H', length)
+        elif length <= 0xffffffffffffffff:
+            second_byte += 127
+            extra = struct.pack('!Q', length)
+        else:
+            raise FrameTooLargeException
 
-                length = struct.unpack('!H', data1)[0]
-            else:
-                assert length == 127, length
-                data1 = read(8)
+        if mask:
+            second_byte |= cls.MASK_MASK
 
-                if len(data1) != 8:
-                    self.close()
-                    raise WebSocketError('Incomplete read while reading 8-byte length: %r' % (data0 + data1))
+            extra += mask
 
-                length = struct.unpack('!Q', data1)[0]
-
-            mask = read(4)
-            if len(mask) != 4:
-                self._close()
-                raise WebSocketError('Incomplete read while reading mask: %r' % (data0 + data1 + mask))
-
-            mask = struct.unpack('!BBBB', mask)
-
-            if length:
-                payload = read(length)
-                if len(payload) != length:
-                    self._close()
-                    args = (length, len(payload))
-                    raise WebSocketError('Incomplete read: expected message of %s bytes, got %s bytes' % args)
-            else:
-                payload = ''
-
-            if payload:
-                payload = bytearray(payload)
-
-                for i in xrange(len(payload)):
-                    payload[i] = payload[i] ^ mask[i % 4]
-
-            return fin, opcode, payload
-        finally:
-            self._reading = False
-            if self.fobj is None:
-                fobj.close()
-
-    def _receive(self):
-        """Return the next text or binary message from the socket."""
-
-        opcode = None
-        result = bytearray()
-
-        while True:
-            frame = self.receive_frame()
-            if frame is None:
-                if result:
-                    raise WebSocketError('Peer closed connection unexpectedly')
-                return
-
-            f_fin, f_opcode, f_payload = frame
-
-            if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
-                if opcode is None:
-                    opcode = f_opcode
-                else:
-                    raise WebSocketError('The opcode in non-fin frame is expected to be zero, got %r' % (f_opcode, ))
-            elif not f_opcode:
-                if opcode is None:
-                    self.close(1002)
-                    raise WebSocketError('Unexpected frame with opcode=0')
-            elif f_opcode == self.OPCODE_CLOSE:
-                if len(f_payload) >= 2:
-                    self.close_code = struct.unpack('!H', str(f_payload[:2]))[0]
-                    self.close_message = f_payload[2:]
-                elif f_payload:
-                    self._close()
-                    raise WebSocketError('Invalid close frame: %s %s %s' % (f_fin, f_opcode, repr(f_payload)))
-                code = self.close_code
-                if code is None or (code >= 1000 and code < 5000):
-                    self.close()
-                else:
-                    self.close(1002)
-                    raise WebSocketError('Received invalid close frame: %r %r' % (code, self.close_message))
-                return
-            elif f_opcode == self.OPCODE_PING:
-                self.send_frame(f_payload, opcode=self.OPCODE_PONG)
-                continue
-            elif f_opcode == self.OPCODE_PONG:
-                continue
-            else:
-                self._close()  # XXX should send proper reason?
-                raise WebSocketError("Unexpected opcode=%r" % (f_opcode, ))
-
-            result.extend(f_payload)
-            if f_fin:
-                break
-
-        if opcode == self.OPCODE_TEXT:
-            return result, False
-        elif opcode == self.OPCODE_BINARY:
-            return result, True
-        else:
-            raise AssertionError('internal serror in gevent-websocket: opcode=%r' % (opcode, ))
-
-    def receive(self):
-        result = self._receive()
-        if not result:
-            return result
-
-        message, is_binary = result
-        if is_binary:
-            return message
-        else:
-            try:
-                return message.decode('utf-8')
-            except ValueError:
-                self.close(1007)
-                raise
-
-    def send_frame(self, message, opcode):
-        """Send a frame over the websocket with message as its payload"""
-
-        if self.socket is None:
-            raise WebSocketError('The connection was closed')
-
-        header = chr(0x80 | opcode)
-
-        if isinstance(message, unicode):
-            message = message.encode('utf-8')
-
-        msg_length = len(message)
-
-        if msg_length < 126:
-            header += chr(msg_length)
-        elif msg_length < (1 << 16):
-            header += chr(126) + struct.pack('!H', msg_length)
-        elif msg_length < (1 << 63):
-            header += chr(127) + struct.pack('!Q', msg_length)
-        else:
-            raise FrameTooLargeException()
-
-        try:
-            combined = header + message
-        except TypeError:
-            with self._writelock:
-                self._write(header)
-                self._write(message)
-        else:
-            with self._writelock:
-                self._write(combined)
-
-    def send(self, message, binary=None):
-        """Send a frame over the websocket with message as its payload"""
-        if binary is None:
-            binary = not isinstance(message, (str, unicode))
-
-        if binary:
-            return self.send_frame(message, self.OPCODE_BINARY)
-        else:
-            return self.send_frame(message, self.OPCODE_TEXT)
-
-    def close(self, code=1000, message=''):
-        """Close the websocket, sending the specified code and message"""
-        if self.socket is not None:
-            message = self._encode_text(message)
-            self.send_frame(struct.pack('!H%ds' % len(message), code, message), opcode=self.OPCODE_CLOSE)
-            self._close()
-
-    def _close(self):
-        if self.socket is not None:
-            self.socket = None
-            self._write = None
-
-            if not self._reading:
-                self.fobj.close()
-
-            self.fobj = None
+        return chr(first_byte) + chr(second_byte) + extra