gevent-websocket / geventwebsocket / websocket.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
import sys
from socket import error as socket_error
from errno import EINTR
import struct
from gevent.coros import Semaphore


if sys.version_info[:2] == (2, 7):
    # Python 2.7 has a working BufferedReader but socket.makefile() does not use it
    # Python 2.6's BufferedReader is broken (TypeError: recv_into() argument 1 must be pinned buffer, not bytearray)
    from io import BufferedReader, RawIOBase

    class SocketIO(RawIOBase):

        def __init__(self, sock):
            RawIOBase.__init__(self)
            self._sock = sock

        def readinto(self, b):
            self._checkClosed()
            while True:
                try:
                    return self._sock.recv_into(b)
                except socket_error as ex:
                    if ex.args[0] == EINTR:
                        continue
                    raise

        def readable(self):
            return self._sock is not None

        @property
        def closed(self):
            return self._sock is None

        def fileno(self):
            self._checkClosed()
            return self._sock.fileno()

        @property
        def name(self):
            if not self.closed:
                return self.fileno()
            else:
                return -1

        def close(self):
            if self.closed:
                return
            RawIOBase.close(self)
            self._sock = None

    def makefile(socket):
        return BufferedReader(SocketIO(socket))

else:

    def makefile(socket):
        # XXX on python3 enable buffering
        return socket.makefile()


if sys.version_info[:2] < (2, 7):

    def is_closed(fobj):
        return fobj._sock is None

else:

    def is_closed(fobj):
        return fobj.closed


class WebSocketError(socket_error):
    pass


class FrameTooLargeException(WebSocketError):
    pass


class WebSocket(object):

    def _encode_text(self, s):
        if isinstance(s, unicode):
            return s.encode('utf-8')
        else:
            return s


class WebSocketHixie(WebSocket):

    def __init__(self, socket, environ):
        self.origin = environ.get('HTTP_ORIGIN')
        self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
        self.path = environ.get('PATH_INFO')
        self._writelock = Semaphore(1)
        self.fobj = socket.makefile()
        self._write = socket.sendall

    def send(self, message):
        message = self._encode_text(message)

        with self._writelock:
            self._write("\x00" + message + "\xFF")

    def close(self):
        if self.fobj is not None:
            self.fobj.close()
            self.fobj = None
            self._write = None

    def _message_length(self):
        length = 0

        while True:
            if self.fobj is None:
                raise WebSocketError('Connenction closed unexpectedly while reading message length')
            byte_str = self.fobj.read(1)

            if not byte_str:
                return 0
            else:
                byte = ord(byte_str)

            if byte != 0x00:
                length = length * 128 + (byte & 0x7f)
                if (byte & 0x80) != 0x80:
                    break

        return length

    def _read_until(self):
        bytes = []

        read = self.fobj.read

        while True:
            if self.fobj is None:
                msg = ''.join(bytes)
                raise WebSocketError('Connection closed unexpectedly while reading message: %r' % msg)
            byte = read(1)
            if ord(byte) != 0xff:
                bytes.append(byte)
            else:
                break

        return ''.join(bytes)

    def receive(self):
        read = self.fobj.read
        while self.fobj is not None:
            frame_str = read(1)
            if not frame_str:
                self.close()
                return
            else:
                frame_type = ord(frame_str)

            if frame_type == 0x00:
                bytes = self._read_until()
                return bytes.decode("utf-8", "replace")
            else:
                raise WebSocketError("Received an invalid frame_type=%r" % frame_type)


class WebSocketHybi(WebSocket):
    OPCODE_TEXT = 0x1
    OPCODE_BINARY = 0x2
    OPCODE_CLOSE = 0x8
    OPCODE_PING = 0x9
    OPCODE_PONG = 0xA

    def __init__(self, socket, environ):
        self.origin = environ.get('HTTP_SEC_WEBSOCKET_ORIGIN')
        self.protocol = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'unknown')
        self.path = environ.get('PATH_INFO')
        self._chunks = bytearray()
        self._writelock = Semaphore(1)
        self.socket = socket
        self._write = socket.sendall
        self.fobj = makefile(socket)
        self.close_code = None
        self.close_message = None
        self._reading = False

    def _parse_header(self, data):
        if len(data) != 2:
            self._close()
            raise WebSocketError('Incomplete read while reading header: %r' % data)

        first_byte, second_byte = struct.unpack('!BB', data)

        fin = (first_byte >> 7) & 1
        rsv1 = (first_byte >> 6) & 1
        rsv2 = (first_byte >> 5) & 1
        rsv3 = (first_byte >> 4) & 1
        opcode = first_byte & 0xf

        # frame-fin = %x0 ; more frames of this message follow
        #           / %x1 ; final frame of this message

        # frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
        # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
        # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise
        if rsv1 or rsv2 or rsv3:
            self.close(1002)
            raise WebSocketError('Received frame with non-zero reserved bits: %r' % str(data))

        if opcode > 0x7 and fin == 0:
            self.close(1002)
            raise WebSocketError('Received fragmented control frame: %r' % str(data))

        if len(self._chunks) > 0 and fin == 0 and not opcode:
            self.close(1002)
            raise WebSocketError('Received new fragment frame with non-zero opcode: %r' % str(data))

        if len(self._chunks) > 0 and fin == 1 and (self.OPCODE_TEXT <= opcode <= self.OPCODE_BINARY):
            self.close(1002)
            raise WebSocketError('Received new unfragmented data frame during fragmented message: %r' % str(data))

        has_mask = (second_byte >> 7) & 1
        length = (second_byte) & 0x7f

        # Control frames MUST have a payload length of 125 bytes or less
        if opcode > 0x7 and length > 125:
            self.close(1002)
            raise FrameTooLargeException("Control frame payload cannot be larger than 125 bytes: %r" % str(data))

        return fin, opcode, has_mask, length

    def receive_frame(self):
        """Return the next frame from the socket."""
        fobj = self.fobj

        if fobj is None:
            return

        if is_closed(fobj):
            return

        read = self.fobj.read

        assert not self._reading, 'Reading is not possible from multiple greenlets'
        self._reading = True
        try:
            data0 = read(2)
            if not data0:
                self._close()
                return

            fin, opcode, has_mask, length = self._parse_header(data0)

            if not has_mask and length:
                self.close(1002)
                raise WebSocketError('Message from client is not masked')

            if length < 126:
                data1 = ''
            elif length == 126:
                data1 = read(2)
                if len(data1) != 2:
                    self.close()
                    raise WebSocketError('Incomplete read while reading 2-byte length: %r' % (data0 + data1))
                length = struct.unpack('!H', data1)[0]
            else:
                assert length == 127, length
                data1 = read(8)
                if len(data1) != 8:
                    self.close()
                    raise WebSocketError('Incomplete read while reading 8-byte length: %r' % (data0 + data1))
                length = struct.unpack('!Q', data1)[0]

            mask = read(4)
            if len(mask) != 4:
                self._close()
                raise WebSocketError('Incomplete read while reading mask: %r' % (data0 + data1 + mask))

            mask = struct.unpack('!BBBB', mask)

            if length:
                payload = read(length)
                if len(payload) != length:
                    self._close()
                    args = (length, len(payload))
                    raise WebSocketError('Incomplete read: expected message of %s bytes, got %s bytes' % args)
            else:
                payload = ''

            if payload:
                payload = bytearray(payload)
                for i in xrange(len(payload)):
                    payload[i] = payload[i] ^ mask[i % 4]

            return fin, opcode, payload
        finally:
            self._reading = False
            if self.fobj is None:
                fobj.close()

    def _receive(self):
        """Return the next text or binary message from the socket."""
        opcode = None
        result = bytearray()
        while True:
            frame = self.receive_frame()
            if frame is None:
                if result:
                    raise WebSocketError('Peer closed connection unexpectedly')
                return

            f_fin, f_opcode, f_payload = frame

            if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
                if opcode is None:
                    opcode = f_opcode
                else:
                    raise WebSocketError('The opcode in non-fin frame is expected to be zero, got %r' % (f_opcode, ))
            elif not f_opcode:
                if opcode is None:
                    self.close(1002)
                    raise WebSocketError('Unexpected frame with opcode=0')
            elif f_opcode == self.OPCODE_CLOSE:
                if len(f_payload) >= 2:
                    self.close_code = struct.unpack('!H', str(f_payload[:2]))[0]
                    self.close_message = f_payload[2:]
                elif f_payload:
                    self._close()
                    raise WebSocketError('Invalid close frame: %s %s %s' % (f_fin, f_opcode, repr(f_payload)))
                code = self.close_code
                if code is None or (code >= 1000 and code < 5000):
                    self.close()
                else:
                    self.close(1002)
                    raise WebSocketError('Received invalid close frame: %r %r' % (code, self.close_message))
                return
            elif f_opcode == self.OPCODE_PING:
                self.send_frame(f_payload, opcode=self.OPCODE_PONG)
                continue
            elif f_opcode == self.OPCODE_PONG:
                continue
            else:
                self._close()  # XXX should send proper reason?
                raise WebSocketError("Unexpected opcode=%r" % (f_opcode, ))

            result.extend(f_payload)
            if f_fin:
                break

        if opcode == self.OPCODE_TEXT:
            return result, False
        elif opcode == self.OPCODE_BINARY:
            return result, True
        else:
            raise AssertionError('internal serror in gevent-websocket: opcode=%r' % (opcode, ))

    def receive(self):
        result = self._receive()
        if not result:
            return result
        message, is_binary = result
        if is_binary:
            return message
        else:
            try:
                return message.decode('utf-8')
            except ValueError:
                self.close(1007)
                raise

    def send_frame(self, message, opcode):
        """Send a frame over the websocket with message as its payload"""
        header = chr(0x80 | opcode)

        if isinstance(message, unicode):
            message = message.encode('utf-8')

        msg_length = len(message)

        if msg_length < 126:
            header += chr(msg_length)
        elif msg_length < (1 << 16):
            header += chr(126) + struct.pack('!H', msg_length)
        elif msg_length < (1 << 63):
            header += chr(127) + struct.pack('!Q', msg_length)
        else:
            raise FrameTooLargeException()

        try:
            combined = header + message
        except TypeError:
            with self._writelock:
                self._write(header)
                self._write(message)
        else:
            with self._writelock:
                self._write(combined)

    def send(self, message, binary=None):
        """Send a frame over the websocket with message as its payload"""
        if binary is None:
            binary = not isinstance(message, (str, unicode))

        if binary:
            return self.send_frame(message, self.OPCODE_BINARY)
        else:
            return self.send_frame(message, self.OPCODE_TEXT)

    def close(self, code=1000, message=''):
        """Close the websocket, sending the specified code and message"""
        if self.socket is not None:
            message = self._encode_text(message)
            self.send_frame(struct.pack('!H%ds' % len(message), code, message), opcode=self.OPCODE_CLOSE)
            self._close()

    def _close(self):
        if self.socket is not None:
            self.socket._sock.close()
            self.socket = None
            self._write = None
            fobj = self.fobj
            self.fobj = None
            if not self._reading:
                fobj.close()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.