Commits

Ginés Martínez Sánchez committed 7492b80 Draft

sockjs tests passed in python2/python3

Comments (0)

Files changed (18)

 
 from ginsfsm.gobj import GObj
 from ginsfsm.gconfig import GConfig
-from ginsfsm.compat import binary_type
+from ginsfsm.compat import text_type
 from ginsfsm.utils import (
     hexdump,
 )
             Equivalent to waitress' write_soon().
             The data are not really sent until _send_some() is called.
         """
-        assert isinstance(data, binary_type)
+        if isinstance(data, text_type):
+            data = data.encode(encoding='utf-8', errors='strict')
         if data:
             if data.__class__ is ReadOnlyFileBasedBuffer:
                 # they used wsgi.file_wrapper

ginsfsm/examples/sockjs/test_sockjs_server.py

 
 """
 
+import logging
+logging.basicConfig(level=logging.WARN)
+
 from ginsfsm.gaplic import GAplic
 from ginsfsm.globals import (
     set_global_app,
     """ You can run directly this file, without gserve.
     """
     import logging
-    logging.basicConfig(level=logging.INFO)
+    logging.basicConfig(level=logging.WARN)
     # simulate running from ini file
     local_conf = {
         'GObj.trace_mach': False,
         'GObj.logger': logging,
-        'GSock.trace_dump': True,
+        'GSock.trace_dump': False,
         'GWsgiServer.host': '0.0.0.0',
         'GWsgiServer.port': 8080,
         'application': 'wsgi-application',

ginsfsm/protocols/http/common/response.py

         for i, (headername, headerval) in enumerate(response_headers):
             headername = '-'.join(
                 [x.capitalize() for x in headername.split('-')]
-                )
+            )
             if headername == 'Content-Length':
                 content_length_header = headerval
             if headername == 'Date':
             content_length_header = str(self.content_length)
             self.response_headers.append(
                 ('Content-Length', content_length_header)
-                )
+            )
 
         def close_on_finish():
             if connection_close_header is None:
             rh = self.build_response_header()
             self.channel.send_event(gsock, 'EV_WRITE_OUTPUT_DATA', data=rh)
             self.wrote_header = True
+
         if data:
             towrite = data
             cl = self.content_length

ginsfsm/protocols/http/server/c_http_clisrv.py

 def ac_http_response(self, event):
     response = event.response
     if not isinstance(response, HttpResponse):
-        logging.error("ERROR response doesn't mach HttpResponse %s" %
+        logging.error(
+            "ERROR response doesn't mach HttpResponse %s" %
             response.request.path)
 
     if response.request != self.responding_request:
-        logging.error("ERROR response doesn't mach responding request %s" %
+        logging.error(
+            "ERROR response doesn't mach responding request %s" %
             response.request.path)
 
     if self.responding_response:
-        logging.error("ERROR responding_response is BUSY, of %s" %
+        logging.error(
+            "ERROR responding_response is BUSY, of %s" %
             response.request.path)
     self.responding_response = response
 
 
     def enqueue_request(self, new_request):
         self.dl_requests.append(new_request)
-        if self.config.maximum_simultaneous_requests > 0 and \
-                len(self.dl_requests) > self.config.maximum_simultaneous_requests:
+        max_req = self.config.maximum_simultaneous_requests
+        if max_req > 0 and len(self.dl_requests) > max_req:
             # Close the channel by maximum simultaneous requests reached.
             body = 'Please change your behavior.' \
                 ' You have reached the maximum simultaneous requests (%d).' % (
-                    self.config.maximum_simultaneous_requests)
+                    max_req)
             request = HTTPRequestParser(self)
             request.error = InternalServerError(body)
             response = HttpErrorResponse(request)

ginsfsm/protocols/sockjs/server/basehandler.py

 from ginsfsm.protocols.sockjs.server.session import ConnectionInfo
 from ginsfsm.protocols.wsgi.webob.async_response import AsyncResponse
 from ginsfsm.deferred import Deferred
+from ginsfsm.compat import bytes_
 
 #----------------------------------------------------------------#
 #                   Base Views
     def set_body(self, body):
         if self.async_response:
             pass  # be care writing to body in async mode
-        self.response.body = body
+        self.response.body = bytes_(body, "utf-8")
 
     def enable_cache(self):
         """Enable client-side caching for the current request"""
         """ Write data.
             It will use the configurated response: async or sync.
         """
-        self.response.write(data)
+        self.response.write(bytes_(data))
 
     def flush(self, callback=None):
         """Flushes the current output buffer to the network.

ginsfsm/protocols/sockjs/server/c_static.py

 
 import random
 import hashlib
-from sys import maxsize
 from pyramid.view import view_config
 
 from ginsfsm.protocols.sockjs.server.proto import json
 )
 from ginsfsm.gobj import GObj
 from ginsfsm.gconfig import GConfig
+from ginsfsm.protocols.sockjs.server.util import (
+    str_to_bytes,
+    MAXSIZE,
+)
 
 #----------------------------------------------------------------#
 #                   Info GClass
         self.preflight()
         self.disable_cache()
 
+        disabled_transports = \
+            self.context.sockjs_server.config.disabled_transports
+
         info = {
-            'websocket':
-                'websocket' not in self.context.sockjs_server.config.disabled_transports,
+            'websocket': 'websocket' not in disabled_transports,
             'cookie_needed': self.context.sockjs_server.cookie_needed,
             'origins': ['*:*'],
-            'entropy': random.randint(0, maxsize),
+            'entropy': random.randint(0, MAXSIZE),
         }
         response.body = json.dumps(info).encode()
         return response
     """SockJS IFrame page handler"""
     def get(self):
         response = self.response
-        data = IFRAME_TEXT % self.context.sockjs_server.config.sockjs_url
-        data = data.encode()
+        data = str_to_bytes(
+            IFRAME_TEXT % self.context.sockjs_server.config.sockjs_url)
         hsh = hashlib.md5(data).hexdigest()
 
         #value = self.request.headers.get('If-None-Match')

ginsfsm/protocols/sockjs/server/c_transport_eventsource.py

 """
     EventSource transport implementation.
 """
+import logging
 from pyramid.view import view_config
 
 from ginsfsm.gobj import GObj
             self.notify_sent(len(msg))
             self.write(msg)
             self.flush(callback=self.send_complete)
-        except:
+        except Exception:
+            logging.exception(
+                "ERROR EventsourceStreamingTransport send_pack")
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
             if self.session:  # detach session

ginsfsm/protocols/sockjs/server/c_transport_htmlfile.py

 """
     HtmlFile transport implementation.
 """
+import logging
 from pyramid.view import view_config
 
 from ginsfsm.protocols.sockjs.server.proto import json_encode
             self.notify_sent(len(msg))
             self.write(msg)
             self.flush(callback=self.send_complete)
-        except:
+        except Exception:
+            logging.exception(
+                "ERROR HtmlfileStreamingTransport send_pack")
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
             if self.session:  # detach session

ginsfsm/protocols/sockjs/server/c_transport_jsonp.py

 from ginsfsm.compat import url_unquote_plus
 from ginsfsm.protocols.sockjs.server import proto
 from ginsfsm.protocols.sockjs.server.basehandler import SessionHandler
+from ginsfsm.protocols.sockjs.server.util import bytes_to_str
 
 #----------------------------------------------------------------#
 #                   GJsonpSend GClass
         if session is None:
             return HTTPNotFound('Session not found')
 
-        data = self.request.body_file.read()
+        data = self.request.body_file.read()  # decode? v1.0.0
+        data = bytes_to_str(data)
         ctype = self.request.headers.get('Content-Type', '').lower()
         if ctype == 'application/x-www-form-urlencoded':
             if not data.startswith('d='):
             data = url_unquote_plus(data[2:])
 
         if not data:
-            logging.debug('ERROR jsonp_send: Payload expected.')
+            logging.error('ERROR jsonp_send: Payload expected.')
             return HTTPServerError("Payload expected.")
 
         try:
             messages = proto.json_decode(data)
-        except:
-            logging.debug('ERROR jsonp_send: Invalid json encoding')
+        except Exception:
+            logging.exception('ERROR jsonp_send: Invalid json encoding')
             return HTTPServerError("Broken JSON encoding.")
 
         try:
             self.write(msg)
             self.flush()
             self.send_complete()
-        except:
+        except Exception:
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
+            logging.exception(
+                "ERROR JsonpPollingTransport send_pack")
             if self.session:
                 self.session.delayed_close()
             raise

ginsfsm/protocols/sockjs/server/c_transport_websocket.py

 from ginsfsm.protocols.wsgi.webob.websocket_response import WebsocketResponse
 from ginsfsm.deferred import Deferred
 from ginsfsm.utils import hexdump
+from ginsfsm.protocols.sockjs.server.util import bytes_to_str
 
 
 #----------------------------------------------------------------#
             return
 
         try:
-            msg = json_decode(message)
+            msg = json_decode(bytes_to_str(message))  # ? v1.0.0
 
         except Exception:
             logging.exception(
         try:
             self.write_message(message, binary)
         except IOError:
+            logging.exception(
+                "ERROR WebsocketTransport send_pack")
             self.context.gaplic.add_callback(self.on_close)
 
     def session_closed(self):
         try:
             self.session.on_message(message)
         except Exception:
-            logging.exception('RawWebSocket')
+            logging.exception('ERROR RawWebSocket')
 
             # Close running connection
-            self._detach()
+            # self._detach() ? v1.0.0
             self.gsock.mt_drop()
 
     def on_close(self, event):
         try:
             self.write_message(message, binary)
         except IOError:
+            logging.exception(
+                "ERROR RawWebsocketTransport send_pack")
             self.context.gaplic.add_callback(self.on_close)
 
     def session_closed(self):

ginsfsm/protocols/sockjs/server/c_transport_xhr.py

 from ginsfsm.gconfig import GConfig
 from ginsfsm.protocols.sockjs.server import proto
 from ginsfsm.protocols.sockjs.server.basehandler import SessionHandler
+from ginsfsm.protocols.sockjs.server.util import bytes_to_str
 
 #----------------------------------------------------------------#
 #                   GXhrSend GClass
 
         data = self.request.body_file.read()
         if not data:
-            logging.debug('ERROR xhr_send: Payload expected.')
+            logging.error('ERROR xhr_send: Payload expected.')
             return HTTPServerError("Payload expected.")
 
         try:
-            messages = proto.json_decode(data)
-        except:
-            logging.debug('ERROR xhr_send: Invalid json encoding')
+            messages = proto.json_decode(bytes_to_str(data))  # ? v1.0.0
+        except Exception:
+            logging.exception(
+                'ERROR xhr_send: Invalid json encoding',
+            )
             return HTTPServerError("Broken JSON encoding.")
 
         try:
             session.on_messages(messages)
         except Exception:
-            logging.exception('ERROR xhr_send: on_message() failed')
+            logging.exception(
+                'ERROR xhr_send: on_message() failed',
+            )
             return HTTPServerError('ERROR xhr_send: on_message() failed')
 
         self.set_status(204)
             self.write(message + '\n')
             self.flush()
             self.send_complete()
-        except:
+        except Exception:
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
+            logging.exception("ERROR XhrPollingTransport send_pack")
             if self.session:
                 self.session.delayed_close()
 
             self.notify_sent(len(message))
             self.write(message + '\n')
             self.flush(callback=self.send_complete)
-        except:
+        except Exception:
             # If connection dropped, make sure we close offending session
             # instead of propagating error all way up.
             if self.session:  # detach session
                 self.session.delayed_close()
                 self.session.remove_handler(self)
                 self.session = None
+            logging.exception("ERROR XhrStreamingTransport send_pack")
 
     def send_complete(self):
         """

ginsfsm/protocols/sockjs/server/c_websocket.py

 from ginsfsm.gobj import GObj
 from ginsfsm.c_timer import GTimer
 from ginsfsm.compat import (
-    binary_type,
+    bytes_,
 )
 from ginsfsm.circular_fifo import CircularFIFO
 from ginsfsm.buffers import OverflowableBuffer
         headers.append("")
 
         header_str = "\r\n".join(headers)
-        data = ginsfsm.escape.utf8(header_str)
+        data = bytes_(header_str)
 
         self.send_event(self.gsock, 'EV_SEND_DATA', data=data)
 
         """
         # Websocket only supports GET method
         if self.request.method != "GET":
-            data = ginsfsm.escape.utf8(
+            data = bytes_(
                 "HTTP/1.1 405 Method Not Allowed\r\n"
                 "Allow: GET\r\n"
                 "Connection: Close\r\n"
 
         # Upgrade header should be present and should be equal to WebSocket
         if environ.get("HTTP_UPGRADE", "").lower() != "websocket":
-            data = ginsfsm.escape.utf8(
+            data = bytes_(
                 "HTTP/1.1 400 Bad Request\r\n"
                 "Connection: Close\r\n"
                 "\r\n"
         # Connection header should be upgrade.
         # Some proxy servers/load balancers
         # might mess with it.
-        connection = list(map(lambda s: s.strip().lower(),
-                         environ.get("HTTP_CONNECTION", "").split(",")))
+        connection = list(
+            map(
+                lambda s: s.strip().lower(),
+                environ.get("HTTP_CONNECTION", "").split(",")
+            )
+        )
         if "upgrade" not in connection:
-            data = ginsfsm.escape.utf8(
+            data = bytes_(
                 "HTTP/1.1 400 Bad Request\r\n"
                 "Connection: Close\r\n"
                 "\r\n"
         supported_version = ("7", "8", "13")
         if not environ.get(
                 "HTTP_SEC_WEBSOCKET_VERSION") in supported_version:
-            data = ginsfsm.escape.utf8(
+            data = bytes_(
                 "HTTP/1.1 426 Upgrade Required\r\n"
                 "Sec-WebSocket-Version: 13\r\n\r\n"
             )
             "HTTP_SEC_WEBSOCKET_VERSION"
         )
         if not all(list(map(lambda f: environ.get(f), fields))):
-            data = ginsfsm.escape.utf8(
+            data = bytes_(
                 "HTTP/1.1 400 Bad Request\r\n"
                 "Connection: Close\r\n"
                 "\r\n"
             return False
 
         key = environ.get("HTTP_SEC_WEBSOCKET_KEY", None)
-        if not key or len(base64.b64decode(ginsfsm.escape.utf8(key))) != 16:
-            data = ginsfsm.escape.utf8(
+        if not key or len(base64.b64decode(bytes_(key))) != 16:
+            data = bytes_(
                 "HTTP/1.1 400 Bad Request\r\n"
                 "Connection: Close\r\n"
                 "\r\n"
                 subprotocol_header = "Sec-WebSocket-Protocol: %s\r\n" % (
                     selected)
 
-        data = ginsfsm.escape.utf8(
+        data = bytes_(
             "HTTP/1.1 101 Switching Protocols\r\n"
             "Upgrade: websocket\r\n"
             "Connection: Upgrade\r\n"
         given the value for Sec-WebSocket-Key.
         """
         sha1 = hashlib.sha1()
-        sha1.update(ginsfsm.escape.utf8(key))
+        sha1.update(bytes_(key))
         sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")  # Magic value
         return ginsfsm.escape.native_str(base64.b64encode(sha1.digest()))
 
         self._write_frame(
             True,
             OPCODE_CONTROL_CLOSE,
-            STRUCT_2BYTES.pack(status) + reason
+            STRUCT_2BYTES.pack(status) + bytes_(reason)
         )
 
     def send(self, message, binary=False):
             h_opcode = 0x2
         else:
             h_opcode = 0x1
-        message = ginsfsm.escape.utf8(message)
-        assert isinstance(message, binary_type)
+        message = bytes_(message, "utf-8")
         self._write_frame(True, h_opcode, message)
 
     def _write_frame(self, h_fin, h_opcode, data):

ginsfsm/protocols/sockjs/server/proto.py

-from datetime import datetime
+# -*- coding: utf-8 -*-
+"""
+    sockjs.proto
+    ~~~~~~~~~~~~~~~~~~~~
 
-_days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
-_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
-           'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
+    SockJS protocol related functions
+"""
+import logging
 
-# json
-# -----------
+# TODO: Add support for ujson module once they can accept unicode strings
 
-# Fastest
+# Try to find best json encoder available
 try:
-    import ujson as json
-    kwargs = {}  # pragma: no cover
-except ImportError:  # pragma: no cover
-    def dthandler(obj):
-        if isinstance(obj, datetime):
-            now = obj.timetuple()
-            return '%s, %02d %s %04d %02d:%02d:%02d -0000' % (
-                _days[now[6]], now[2],
-                _months[now[1] - 1], now[0], now[3], now[4], now[5])
+    # Check for simplejson
+    import simplejson
 
-    kwargs = {'default': dthandler, 'separators': (',', ':')}
+    json_encode = lambda data: simplejson.dumps(data, separators=(',', ':'))
+    json_decode = lambda data: simplejson.loads(data)
+    JSONDecodeError = ValueError
 
-    # Faster
-    try:
-        import simplejson as json
-    except ImportError:
-        # Slowest
-        import json
+    logging.debug('sockjs will use simplejson module')
+except ImportError:
+    # Use slow json
+    import json
 
+    logging.debug('sockjs will use json module')
 
+    json_encode = lambda data: json.dumps(data, separators=(',', ':'))
+    json_decode = lambda data: json.loads(data)
+    JSONDecodeError = ValueError
+
+# Protocol handlers
 CONNECT = 'o'
 DISCONNECT = 'c'
 MESSAGE = 'm'
         Closing reason
     """
     return 'c[%d,"%s"]' % (code, reason)
-
-
-json_decode = json.loads
-
-
-def json_encode(data):
-    return json.dumps(data, **kwargs)
-
-

ginsfsm/protocols/sockjs/server/session.py

 import logging
 
 from ginsfsm.protocols.sockjs.server import sessioncontainer, periodic, proto
-from ginsfsm.escape import utf8
+from ginsfsm.protocols.sockjs.server.util import bytes_to_str
 
 
 class ConnectionInfo(object):
         """
         self.server = server
         self.stats = server.stats
+        self.gaplic = server.gaplic
 
         self.send_expects_json = False
 
             try:
                 self.conn.on_close()
             except:
-                logging.debug("Failed to call on_close().", exc_info=True)
+                logging.exception("Failed to call on_close().")
             finally:
                 self.state = CLOSED
                 self.close_reason = (code, message)
     def delayed_close(self):
         """Delayed close - won't close immediately, but on next ioloop tick."""
         self.state = CLOSING
-        self.server.io_loop.add_callback(self.close)
+        self.server.gaplic.add_callback(self.close)
 
     def get_close_reason(self):
         """Return last close reason tuple.
         `conn`
             Default connection class
         `server`
-            `SockJSRouter` instance
+            `GSockJSRouter` instance
         `session_id`
             Session id
         `expiry`
 
         # Heartbeat related stuff
         self._heartbeat_timer = None
-        self._heartbeat_interval = self.server.config.heartbeat_delay * 1000
+        #self._heartbeat_interval = self.server.config.heartbeat_delay * 1000
+        self._heartbeat_interval = self.server.config.heartbeat_delay  # TODO?
 
         self._immediate_flush = self.server.config.immediate_flush
         self._pending_flush = False
         `stats`
             If set to True, will update statistics after operation completes
         """
-        self.send_jsonified(proto.json_encode(utf8(msg)), stats)
+        self.send_jsonified(proto.json_encode(bytes_to_str(msg)), stats)
 
     def send_jsonified(self, msg, stats=True):
         """Send JSON-encoded message
         `stats`
             If set to True, will update statistics after operation completes
         """
-        msg = utf8(msg)
+        msg = bytes_to_str(msg)
 
         if self._immediate_flush:
             if self.handler and self.handler.active and not self.send_queue:
             self.send_queue += msg
 
             if not self._pending_flush:
-                self.server.io_loop.add_callback(self.flush)
+                self.server.gaplic.add_callback(self.flush)
                 self._pending_flush = True
 
         if stats:
         """Reset hearbeat timer"""
         self.stop_heartbeat()
 
-        self._heartbeat_timer = periodic.Callback(self._heartbeat,
-                                                  self._heartbeat_interval,
-                                                  self.server.io_loop)
+        self._heartbeat_timer = periodic.Callback(
+            self._heartbeat,
+            self._heartbeat_interval,
+            self.server.gaplic
+        )
         self._heartbeat_timer.start()
 
     def stop_heartbeat(self):

ginsfsm/protocols/sockjs/server/sessioncontainer.py

 from ginsfsm.utils import random_key
 
 
-def _random_key():
-    """Return random session key"""
-    i = md5()
-    i.update('%s%s' % (random(), time()))
-    return i.hexdigest()
-
-
 class SessionMixin(object):
     """Represents one session object stored in the session container.
     Derive from this object to store additional data.
         ``expiry``
             Expiration time. If not provided, will never expire.
         """
-        self.session_id = session_id or _random_key()
+        self.session_id = session_id or random_key()
         self.promoted = None
         self.expiry = expiry
 
     def __lt__(self, other):
         return self.expiry_date < other.expiry_date
 
-    __cmp__ =  __lt__
+    __cmp__ = __lt__
 
     def __repr__(self):
         return '%f %s %d' % (getattr(self, 'expiry_date', -1),

ginsfsm/protocols/sockjs/server/stats.py

         self.pack_sent_ps = MovingAverage()
         self.pack_recv_ps = MovingAverage()
 
+        # TODO: implement the periodic callbacks
         #self._callback = ioloop.PeriodicCallback(self._update,
         #                                         1000,
         #                                         io_loop)

ginsfsm/protocols/sockjs/server/util.py

+import sys
+
+PY3 = sys.version_info[0] == 3
+
+if PY3:
+    MAXSIZE = sys.maxsize
+
+    def bytes_to_str(b):
+        if isinstance(b, bytes):
+            return str(b, 'utf8')
+        return b
+
+    def str_to_bytes(s):
+        if isinstance(s, bytes):
+            return s
+        return s.encode('utf8')
+
+    import urllib.parse
+    unquote_plus = urllib.parse.unquote_plus
+else:
+    if sys.platform == "java":
+        # Jython always uses 32 bits.
+        MAXSIZE = int((1 << 31) - 1)
+    else:
+        # It's possible to have sizeof(long) != sizeof(Py_ssize_t).
+        class X(object):
+            def __len__(self):
+                return 1 << 31
+        try:
+            len(X())
+        except OverflowError:
+            # 32-bit
+            MAXSIZE = int((1 << 31) - 1)
+        else:
+            # 64-bit
+            MAXSIZE = int((1 << 63) - 1)
+            del X
+
+    def bytes_to_str(s):
+        if isinstance(s, unicode):
+            return s.encode('utf-8')
+        return s
+
+    def str_to_bytes(s):
+        if isinstance(s, unicode):
+            return s.encode('utf8')
+        return s
+
+    import urllib
+    unquote_plus = urllib.unquote_plus
 # -*- coding: utf-8 -*-
 import logging
-from ginsfsm.compat import urlparse
-from ginsfsm.compat import text_type
+from ginsfsm.compat import (
+    urlparse,
+    text_type,
+    bytes_,
+)
 from time import time
 from hashlib import md5
 from random import random
 def random_key():
     """Return random session key"""
     i = md5()
-    s = '%s%s' % (random(), time())
-    i.update(s.encode(encoding='utf-8', errors='strict'))
+    i.update(bytes_('%s%s' % (random(), time())))
     return i.hexdigest()