Commits

Marcin Lulek committed 4b671cf

move to queues

  • Participants
  • Parent commits 9654d9a

Comments (0)

Files changed (2)

gevent_cometd/app_views.py

             # for chrome issues
             # yield ' ' * 1024
             # wait for this to wake up
-            connection.event.wait(config['wake_connections_after'])
+            messages = []
+            # block for first message - wake up after a while 
+            try:
+                messages.append(connection.queue.get(timeout=config['wake_connections_after']))
+            except gevent.queue.Empty as e:
+                pass
+            # get more messages if enqueued takes up total 0.25s
+            while True:
+                try:
+                    messages.append(connection.queue.get(timeout=0.25))
+                except gevent.queue.Empty as e:
+                    break
             if request.params.get('callback'):
-                yield request.params.get('callback') + '(' + json.dumps(connection.messages, cls=util.DateTimeEncoder) + ')'
+                yield request.params.get('callback') + '(' + json.dumps(messages, cls=util.DateTimeEncoder) + ')'
             else:
-                yield json.dumps(connection.messages, cls=util.DateTimeEncoder)
-            connection.messages = []
+                yield json.dumps(messages, cls=util.DateTimeEncoder)
         res = Response(app_iter=yield_response(), request=request,
                        content_type='application/json')
         util.add_cors_headers(res)
         connection.websocket = websocket
         try:
             while True:
-                connection.event.wait(config['wake_connections_after'])
+                messages = []
+                # get first message
+                try:
+                    messages.append(connection.queue.get(timeout=config['wake_connections_after']))
+                except gevent.queue.Empty as e:
+                    pass
+                # get more messages if enqueued takes up total 0.25s
+                while True:
+                    try:
+                        messages.append(connection.queue.get(timeout=0.25))
+                    except gevent.queue.Empty as e:
+                        break
                 # mark as last active
                 # ensures they dont get GC'd even if we are not sending anything
-                to_send = connection.messages
-                connection.messages = []
-                websocket.send(json.dumps(to_send, cls=util.DateTimeEncoder))
+                websocket.send(json.dumps(messages, cls=util.DateTimeEncoder))
                 user_inst = user.User.by_name(connection.user)
                 user_inst.last_active = datetime.datetime.utcnow()
                 connection.last_active = datetime.datetime.utcnow()

gevent_cometd/user.py

 import gevent_cometd
 import datetime
 import gevent
+import gevent.queue
 import logging
 import uuid
 import json
     def add_message(self, message):
         # mark active
         self.last_active = datetime.datetime.utcnow()
-        for conn_id in self.connections:
+        conns = self.connections[:]
+        for conn_id in conns:
             connection = Connection.by_id(conn_id)
             if connection:
                 connection.add_message(message)
+        del conns
 
 
     def __repr__(self):
         self.user = user  # hold user id/name of connection
         self.last_active = datetime.datetime.utcnow()
         self.id = conn_id or unicode(uuid.uuid4())
-        self.messages = []
-        self.event = gevent.event.Event()
+        self.queue = gevent.queue.Queue()
 
     def __repr__(self):
         return '<Connection: id:%s, owner:%s>' % (self.id, self.user)
         import gevent_cometd.util as util
         gevent_cometd.stats['total_messages'] += 1
         self.last_update = datetime.datetime.utcnow()
-        self.messages.append(message)
-        self.event.set()
-        self.event.clear()
+        self.queue.put(message)
 
     @classmethod
     def by_id(cls, conn_id):
             if delta.seconds >= gevent_cometd.config['gc_conns_after']:
                 conn = gevent_cometd.connections.pop(key)
                 # wake up connection
-                conn.event.set()
-                conn.event.clear()
+                
                 if hasattr(conn, 'websocket'):
                     try:
                         conn.websocket.close()