Commits

Jeffrey Gelens committed c8717f3 Merge

Merged changes from stable

Comments (0)

Files changed (6)

examples/echoserver.py

+import os
+from gevent.pywsgi import WSGIServer
+import geventwebsocket
+
+
+def echo(environ, start_response):
+    websocket = environ.get('wsgi.websocket')
+    if websocket is None:
+        return http_handler(environ, start_response)
+    try:
+        while True:
+            message = websocket.receive()
+            if message is None:
+                break
+            websocket.send(message)
+        websocket.close()
+    except geventwebsocket.WebSocketError, ex:
+        print '%s: %s' % (ex.__class__.__name__, ex)
+
+
+def http_handler(environ, start_response):
+    if environ['PATH_INFO'].strip('/') == 'version':
+        start_response('200 OK', [])
+        return [agent]
+    start_response('400 Bad Request', [])
+    return ['WebSocket connection is expected here.']
+
+
+path = os.path.dirname(geventwebsocket.__file__)
+agent = 'gevent-websocket/%s' % (geventwebsocket.__version__)
+print 'Running %s from %s' % (agent, path)
+WSGIServer(('', 7000), echo, handler_class=geventwebsocket.WebSocketHandler).serve_forever()

geventwebsocket/__init__.py

 
 version_info = (0, 3, 0, 'dev')
-__version__ =  ".".join(map(str, version_info))
+__version__ = ".".join(map(str, version_info))
 
-__all__ = ['WebSocketHandler']
+__all__ = ['WebSocketHandler', 'WebSocketError']
 
 from geventwebsocket.handler import WebSocketHandler
+from geventwebsocket.websocket import WebSocketError

geventwebsocket/gunicorn/workers.py

 from geventwebsocket.handler import WebSocketHandler
 from gunicorn.workers.ggevent import GeventPyWSGIWorker
 
+
 class GeventWebSocketWorker(GeventPyWSGIWorker):
     wsgi_handler = WebSocketHandler

geventwebsocket/handler.py

 
         if secure:
             if environ['SERVER_PORT'] != '443':
-               url += ':' + environ['SERVER_PORT']
+                url += ':' + environ['SERVER_PORT']
         else:
             if environ['SERVER_PORT'] != '80':
-               url += ':' + environ['SERVER_PORT']
+                url += ':' + environ['SERVER_PORT']
 
     url += quote(environ.get('SCRIPT_NAME', ''))
     url += quote(environ.get('PATH_INFO', ''))

geventwebsocket/websocket.py

 from gevent.coros import Semaphore
 
 
-class Closed(object):
-
-    def __init__(self, reason, message):
-        self.reason = reason
-        self.message = message
-
-    def __nonzero__(self):
-        return False
-
-    def __repr__(self):
-        return '%s(%r, %r)' % (self.__class__.__name__, self.reason, self.message)
-
-
 class WebSocketError(socket_error):
     pass
 
     pass
 
 
-class WebSocketHixie(object):
+class WebSocket(object):
+
+    def _encode_text(self, s):
+        if isinstance(s, unicode):
+            return s.encode('utf-8')
+        else:
+            return s
+
+
+class WebSocketHixie(WebSocket):
 
     def __init__(self, fobj, environ):
         self.origin = environ.get('HTTP_ORIGIN')
         self.fobj = fobj
         self._write = _get_write(fobj)
 
-    def _encode_text(self, s):
-        if isinstance(s, unicode):
-            return s.encode('utf-8')
-        elif isinstance(s, str):
-            return unicode(s).encode('utf-8')
-        else:
-            raise Exception('Invalid encoding')
-
     def send(self, message):
         message = self._encode_text(message)
 
                 raise WebSocketError("Received an invalid frame_type=%r" % frame_type)
 
 
-class WebSocketHybi(object):
-    FIN = int("10000000", 2)
-    RSV = int("01110000", 2)
-    OPCODE = int("00001111", 2)
-    MASK = int("10000000", 2)
-    PAYLOAD = int("01111111", 2)
-
-    OPCODE_CONTINUATION = 0x0
+class WebSocketHybi(WebSocket):
     OPCODE_TEXT = 0x1
     OPCODE_BINARY = 0x2
     OPCODE_CLOSE = 0x8
     OPCODE_PING = 0x9
     OPCODE_PONG = 0xA
 
-    REASON_NORMAL = 1000
-    REASON_GOING_AWAY = 1001
-    REASON_PROTOCOL_ERROR = 1002
-    REASON_UNSUPPORTED_DATA_TYPE = 1003
-    REASON_TOO_LARGE = 1004
-
-    LEN_16 = 126
-    LEN_64 = 127
-
     def __init__(self, fobj, environ):
         self.origin = environ.get('HTTP_SEC_WEBSOCKET_ORIGIN')
         self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'unknown')
         self.path = environ.get('PATH_INFO')
         self._chunks = bytearray()
-        self._first_opcode = None
         self._writelock = Semaphore(1)
         self.fobj = fobj
         self._write = _get_write(fobj)
+        self.close_code = None
+        self.close_message = None
 
     def _parse_header(self, data):
         if len(data) != 2:
             self.close()
             raise WebSocketError('Incomplete read while reading header: %r' % data)
+
         first_byte, second_byte = struct.unpack('!BB', data)
 
         fin = (first_byte >> 7) & 1
         # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
         # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
         if rsv1 or rsv2 or rsv3:
-            self.close()
-            raise WebSocketError('Reserved bits cannot be set: %r' % data)
+            self.close(1002)
+            raise WebSocketError('Received frame with non-zero reserved bits: %r' % data)
 
-        #if self._is_invalid_opcode(opcode):
-        #    raise WebSocketError('Invalid opcode %x' % opcode)
+        if opcode > 0x7 and fin == 0:
+            self.close(1002)
+            raise WebSocketError('Received fragmented control frame: %r' % data)
 
-        # control frames cannot be fragmented
-        if opcode > 0x7 and fin == 0:
-            self.close()
-            raise WebSocketError('Control frames cannot be fragmented: %r' % data)
-
-        if len(self._chunks) > 0 and fin == 0 and opcode != self.OPCODE_CONTINUATION:
-            self.close(self.REASON_PROTOCOL_ERROR, 'Received new fragment frame with non-zero opcode')
+        if len(self._chunks) > 0 and fin == 0 and not opcode:
+            self.close(1002)
             raise WebSocketError('Received new fragment frame with non-zero opcode: %r' % data)
 
         if len(self._chunks) > 0 and fin == 1 and (self.OPCODE_TEXT <= opcode <= self.OPCODE_BINARY):
-            self.close(self.REASON_PROTOCOL_ERROR, 'Received new unfragmented data frame during fragmented message')
+            self.close(1002)
             raise WebSocketError('Received new unfragmented data frame during fragmented message: %r' % data)
 
-        mask = (second_byte >> 7) & 1
+        has_mask = (second_byte >> 7) & 1
         length = (second_byte) & 0x7f
 
-        #if not self.MASK & length_octet: # TODO: where is this in the docs?
-        #    self.close(self.REASON_PROTOCOL_ERROR, 'MASK must be set')
-
         # Control frames MUST have a payload length of 125 bytes or less
         if opcode > 0x7 and length > 125:
-            self.close()
+            self.close(1002)
             raise FrameTooLargeException("Control frame payload cannot be larger than 125 bytes: %r" % data)
 
-        return fin, opcode, mask, length
+        return fin, opcode, has_mask, length
 
-    def receive(self):
+    def receive_frame(self):
         """Return the next frame from the socket."""
         if self.fobj is None:
             return
 
         read = self.fobj.read
 
+        data0 = read(2)
+        if not data0:
+            self._close()
+            return
+
+        fin, opcode, has_mask, length = self._parse_header(data0)
+
+        if length < 126:
+            data1 = ''
+        elif length == 126:
+            data1 = read(2)
+            if len(data1) != 2:
+                self.close()
+                raise WebSocketError('Incomplete read while reading 2-byte length: %r' % (data0 + data1))
+            length = struct.unpack('!H', data1)[0]
+        elif length == 127:
+            data1 = read(8)
+            if len(data1) != 8:
+                self.close()
+                raise WebSocketError('Incomplete read while reading 8-byte length: %r' % (data0 + data1))
+            length = struct.unpack('!Q', data1)[0]
+        else:
+            self.close()
+            raise WebSocketError('Invalid length: %r' % data0)
+
+        if has_mask:
+            data2 = read(4)
+            if len(data2) != 4:
+                self.close()
+                raise WebSocketError('Incomplete read while reading mask: %r' % (data0 + data1 + data2))
+            masking_key = struct.unpack('!BBBB', data2)
+        else:
+            data2 = ''
+
+        if length:
+            payload = read(length)
+            if len(payload) != length:
+                self.close()
+                args = (length, len(payload))
+                raise WebSocketError('Incomplete read: expected message of %s bytes, got %s bytes' % args)
+        else:
+            payload = ''
+
+        if has_mask and payload:
+            # XXX message from client actually should always be masked
+            masked_payload = bytearray(payload)
+
+            for i in range(len(masked_payload)):
+                masked_payload[i] = masked_payload[i] ^ masking_key[i % 4]
+
+            payload = masked_payload
+
+        return fin, opcode, payload
+
+    def _receive(self):
+        """Return the next text or binary message from the socket."""
+        opcode = None
+        result = bytearray()
         while True:
-            data0 = read(2)
-            if not data0:
-                self._close()
+            frame = self.receive_frame()
+            if frame is None:
+                if result:
+                    raise WebSocketError('Peer closed connection unexpectedly')
                 return
 
-            fin, opcode, mask, length = self._parse_header(data0)
+            f_fin, f_opcode, f_payload = frame
 
-            if length < 126:
-                data1 = ''
-            elif length == 126:
-                data1 = read(2)
-                if len(data1) != 2:
+            if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
+                if opcode is None:
+                    opcode = f_opcode
+                else:
+                    raise WebSocketError('The opcode in non-fin frame is expected to be zero, got %r' % (f_opcode, ))
+            elif not f_opcode:
+                if opcode is None:
+                    self.close(1002)
+                    raise WebSocketError('Unexpected frame with opcode=0')
+            elif f_opcode == self.OPCODE_CLOSE:
+                if len(f_payload) >= 2:
+                    self.close_code = struct.unpack('!H', str(f_payload[:2]))[0]
+                    self.close_message = f_payload[2:]
+                elif f_payload:
+                    self._close()
+                    raise WebSocketError('Invalid close frame: %s %s %s' % (f_fin, f_opcode, repr(f_payload)))
+                code = self.close_code
+                if code is None or (code >= 1000 and code < 5000):
                     self.close()
-                    raise WebSocketError('Incomplete read while reading 2-byte length: %r' % (data0 + data1))
-                length = struct.unpack('!H', data1)[0]
-            elif length == 127:
-                data1 = read(8)
-                if len(data1) != 8:
-                    self.close()
-                    raise WebSocketError('Incomplete read while reading 8-byte length: %r' % (data0 + data1))
-                length = struct.unpack('!Q', data1)[0]
-            else:
-                self.close()
-                raise WebSocketError('Invalid length: %r' % data0)
-
-            # Unmask the payload if necessary
-            if mask and length:
-                data2 = read(4)
-                if len(data2) != 4:
-                    self.close()
-                    raise WebSocketError('Incomplete read while reading mask: %r' % (data0 + data1 + data2))
-                masking_key = struct.unpack('!BBBB', data2)
-            else:
-                data2 = ''
-
-            if length:
-                payload = read(length)
-                if len(payload) != length:
-                    self.close()
-                    args = (length, data0 + data1 + data2, payload)
-                    raise WebSocketError('Incomplete read (expected message of %s bytes): %r %r' % args)
-            else:
-                payload = ''
-
-            if mask:
-                # XXX message from client actually should always be masked
-                masked_payload = bytearray(payload)
-
-                for i in range(len(masked_payload)):
-                    masked_payload[i] = masked_payload[i] ^ masking_key[i%4]
-
-                payload = masked_payload
-
-            if opcode == self.OPCODE_TEXT:
-                self._first_opcode = opcode
-                if payload:
-                    # XXX given that we have OPCODE_CONTINUATION, shouldn't we just reset _chunks here?
-                    self._chunks.extend(payload)
-            elif opcode == self.OPCODE_BINARY:
-                self._first_opcode = opcode
-                if payload:
-                    self._chunks.extend(payload)
-            elif opcode == self.OPCODE_CONTINUATION:
-                self._chunks.extend(payload)
-            elif opcode == self.OPCODE_CLOSE:
-                if length >= 2:
-                    reason, message = struct.unpack('!H%ds' % (length - 2), buffer(payload))
                 else:
-                    reason = message = None
-                self.close(self.REASON_NORMAL, '')
-                return Closed(reason, message)
-            elif opcode == self.OPCODE_PING:
-                self.send(payload, opcode=self.OPCODE_PONG)
+                    self.close(1002)
+                    raise WebSocketError('Received invalid close frame: %r %r' % (code, self.close_message))
+                return
+            elif f_opcode == self.OPCODE_PING:
+                self.send_frame(f_payload, opcode=self.OPCODE_PONG)
                 continue
-            elif opcode == self.OPCODE_PONG:
+            elif f_opcode == self.OPCODE_PONG:
                 continue
             else:
-                self.close()
-                raise WebSocketError("Unexpected opcode=%r" % (opcode, ))
+                self._close()  # XXX should send proper reason?
+                raise WebSocketError("Unexpected opcode=%r" % (f_opcode, ))
 
-            if fin == 1:
-                if self._first_opcode == self.OPCODE_TEXT:
-                    msg = self._chunks.decode("utf-8")
-                else:
-                    msg = self._chunks
-
-                self._first_opcode = False
-                self._chunks = bytearray()
-
-                return msg
-
-    def _encode_text(self, s):
-        if isinstance(s, unicode):
-            return s.encode('utf-8')
-        elif isinstance(s, str):
-            return unicode(s).encode('utf-8')
-        else:
-            raise TypeError('Invalid encoding')
-
-    def _is_valid_opcode(self, opcode):
-        return opcode in (self.OPCODE_CONTINUATION, self.OPCODE_TEXT, self.OPCODE_BINARY,
-            self.OPCODE_CLOSE, self.OPCODE_PING, self.OPCODE_PONG)
-
-    def send(self, message, opcode=OPCODE_TEXT):
-        """Send a frame over the websocket with message as its payload
-
-        Keyword args:
-        opcode -- the opcode to use (default OPCODE_TEXT)
-        """
-
-        if not self._is_valid_opcode(opcode):
-            raise ValueError('Invalid opcode %d' % opcode)
+            result.extend(f_payload)
+            if f_fin:
+                break
 
         if opcode == self.OPCODE_TEXT:
-            message = self._encode_text(message)
+            return result, False
+        elif opcode == self.OPCODE_BINARY:
+            return result, True
+        else:
+            raise AssertionError('internal serror in gevent-websocket: opcode=%r' % (opcode, ))
 
-        # TODO: implement fragmented messages
-        mask_bit = 0
-        fin = 1
+    def receive(self):
+        result = self._receive()
+        if not result:
+            return result
+        message, is_binary = result
+        if is_binary:
+            return message
+        else:
+            try:
+                return message.decode('utf-8')
+            except ValueError:
+                self.close(1007)
+                raise
 
-        ## +-+-+-+-+-------+
-        ## |F|R|R|R| opcode|
-        ## |I|S|S|S|  (4)  |
-        ## |N|V|V|V|       |
-        ## | |1|2|3|       |
-        ## +-+-+-+-+-------+
-        header = chr(
-            (fin << 7) |
-            (0 << 6) | # RSV1
-            (0 << 5) | # RSV2
-            (0 << 4) | # RSV3
-            opcode
-        )
+    def send_frame(self, message, opcode):
+        """Send a frame over the websocket with message as its payload"""
+        header = chr(0x80 | opcode)
 
-        ##                 +-+-------------+-------------------------------+
-        ##                 |M| Payload len |    Extended payload length    |
-        ##                 |A|     (7)     |             (16/63)           |
-        ##                 |S|             |   (if payload len==126/127)   |
-        ##                 |K|             |                               |
-        ## +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
-        ## |     Extended payload length continued, if payload len == 127  |
-        ## + - - - - - - - - - - - - - - - +-------------------------------+
+        if isinstance(message, unicode):
+            message = message.encode('utf-8')
 
         msg_length = len(message)
 
-        if opcode == self.OPCODE_TEXT:
-            message = struct.pack('!%ds' % msg_length, message)
-
         if msg_length < 126:
-            header += chr(mask_bit | msg_length)
+            header += chr(msg_length)
         elif msg_length < (1 << 16):
-            header += chr(mask_bit | 126) + struct.pack('!H', msg_length)
+            header += chr(126) + struct.pack('!H', msg_length)
         elif msg_length < (1 << 63):
-            header += chr(mask_bit | 127) + struct.pack('!Q', msg_length)
+            header += chr(127) + struct.pack('!Q', msg_length)
         else:
             raise FrameTooLargeException()
 
         with self._writelock:
             self._write(header + message)
 
-    def close(self, reason=1000, message=''):
-        """Close the websocket, sending the specified reason and message"""
+    def send(self, message, binary=None):
+        """Send a frame over the websocket with message as its payload"""
+        if binary is None:
+            binary = not isinstance(message, (str, unicode))
+
+        if binary:
+            return self.send_frame(message, self.OPCODE_BINARY)
+        else:
+            return self.send_frame(message, self.OPCODE_TEXT)
+
+    def close(self, code=1000, message=''):
+        """Close the websocket, sending the specified code and message"""
         if self.fobj is not None:
             message = self._encode_text(message)
-            self.send(struct.pack('!H%ds' % len(message), reason, message), opcode=self.OPCODE_CLOSE)
+            self.send_frame(struct.pack('!H%ds' % len(message), code, message), opcode=self.OPCODE_CLOSE)
             self.fobj.close()
             self.fobj = None
 

run_autobahn_tests.py

+#!/usr/bin/python
+"""Test gevent-websocket with the test suite of Autobahn
+
+    http://www.tavendo.de/autobahn/testsuite.html
+"""
+import sys
+import os
+import subprocess
+import time
+import urllib2
+from twisted.python import log
+from twisted.internet import reactor
+from autobahn.fuzzing import FuzzingClientFactory
+
+
+spec = {
+   "options": {"failByDrop": False},
+   "enable-ssl": False,
+   "servers": [],
+   "cases": ["*"],
+   "exclude-cases": ["7.5.1",
+                     "7.9.3",
+                     "7.9.4",
+                     "7.9.5",
+                     "7.9.6",
+                     "7.9.7",
+                     "7.9.8",
+                     "7.9.9",
+                     "7.9.10",
+                     "7.9.11",
+                     "7.9.12",
+                     "7.9.13"]
+}
+# We ignore 7.5.1 because it checks that close frame has valid utf-8 message
+# we do not validate utf-8.
+
+# We ignore 7.9.3-13 because it checks that when a close frame with code 1004
+# and others sent, 1002 is sent back; we only send back 1002 for obvious
+# violations like < 1000 and >= 5000; for all codes in the 1000-5000 range
+# we send code 1000 back
+
+
+class ProcessPool(object):
+
+    def __init__(self):
+        self.popens = []
+
+    def spawn(self, *args, **kwargs):
+        popen = subprocess.Popen(*args, **kwargs)
+        self.popens.append(popen)
+        time.sleep(0.2)
+        self.check()
+        return popen
+
+    def check(self):
+        for popen in self.popens:
+            if popen.poll() is not None:
+                sys.exit(1)
+
+    def wait(self, timeout):
+        end = time.time() + timeout
+        while True:
+            time.sleep(0.1)
+            pool.check()
+            if time.time() > end:
+                break
+
+    def kill(self):
+        while self.popens:
+            popen = self.popens.pop()
+            try:
+                popen.kill()
+            except Exception, ex:
+                print ex
+
+
+if __name__ == '__main__':
+    import optparse
+    parser = optparse.OptionParser()
+    parser.add_option('--geventwebsocket', default='examples/echoserver.py')
+    parser.add_option('--autobahn', default='../../src/Autobahn/testsuite/websockets/servers/test_autobahn.py')
+    options, args = parser.parse_args()
+    assert not args, args
+    if options.autobahn and not os.path.exists(options.autobahn):
+        print 'Ignoring %s (not found)' % options.autobahn
+        options.autobahn = None
+    pool = ProcessPool()
+    try:
+        if options.geventwebsocket:
+            pool.spawn([sys.executable, options.geventwebsocket])
+        if options.autobahn:
+            pool.spawn([sys.executable, options.autobahn])
+        pool.wait(1)
+        if options.geventwebsocket:
+            agent = urllib2.urlopen('http://127.0.0.1:7000/version').read().strip()
+            assert agent and '\n' not in agent and 'gevent-websocket' in agent, agent
+            spec['servers'].append({"url": "ws://localhost:7000",
+                                    "agent": agent,
+                                    "options": {"version": 17}})
+        if options.autobahn:
+            spec['servers'].append({'url': 'ws://localhost:9000/',
+                                    'agent': 'AutobahnServer',
+                                    'options': {'version': 17}})
+        log.startLogging(sys.stdout)
+        FuzzingClientFactory(spec)
+        reactor.run()
+    finally:
+        pool.kill()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.