1. Nick Jennings
  2. belvedere

Commits

Nick Jennings  committed c9d4def

updated listener channel from _sid_ to _listener_, implemented shared cloud updates, now each session listens to two channels. one is sid (just for that socket) the other is with user_token, which can make an announcement to all sessions of that user

  • Participants
  • Parent commits cb308dc
  • Branches default

Comments (0)

Files changed (3)

File notify/command_functions.py

View file
     :param app:        listener application instance
     :param json_data:  data returned from the call to core
     """
-    d = json.loads(json_data)
+    d = app.parse_json(json_data)
     print "CF: share_cloud_response - called"
     
     cloud_shares_channel = "%s_cloudshares" % app.prefix

File notify/listener.py

View file
 import tornadoredis
 import redis
 import time
+import pprint
 
 
 
             static_path="/var/www"
         )
         tornado.web.Application.__init__(self, handlers, **settings)
-
+        
+    def parse_json(self, json_data):
+        try:
+            d = json.loads(json_data)
+        except ValueError:
+            #self._send_error('', '', 'bad json')
+            d = {}
+        return d
 
 
 
         super(ListenHandler, self).__init__(*args, **kwargs)
         self.user_token = False
         self.sid = ''
+        self.user_token = ''
         
         
     def open(self):
 
         response = self.RESPONSE_TPL % ('register', rid, 'success', data)
         self.callback(response)
+        print "adding listener for USER_TOKEN"
+        self._listen_results(self.user_token)
 
 
     def _unregister(self):
  
     @tornado.web.asynchronous
     @tornado.gen.engine
-    def _listen_results(self):
+    def _listen_results(self, suffix = None):
         """ 
         Subscribes to a special channel for this sid, to get announcements of 
         completed jobs from workers, which it can then send back to the client.
+        
+        :param suffix:   if this is specified, it will use this instead of sid 
+        as the suffix of the channel name
         """
         self.sdb = tornadoredis.Client()
         self.sdb.connect()
-        self.subscribe_channel = '%s_sid_%s' % (self.application.prefix, self.sid)
+        if not suffix:
+            suffix = self.sid
+            
+        self.subscribe_channel = '%s_listener_%s' % (self.application.prefix, suffix)
         print "... subscribing to [%s]" % (self.subscribe_channel)
         yield tornado.gen.Task(self.sdb.subscribe, self.subscribe_channel)
         self.sdb.listen(self._on_result)
                 d = json.loads(json_data)
             except TypeError:
                 continue
-                
-            if 'clouds' in d:
-                for c in d['clouds']:
-                    if 'id' in c and str(c['id']) in cloud_shares:
-                        cloud_listeners_channel = '%s_cloudlisteners_%s' % (self.prefix, c['id'])
-                        listeners = list(self.rdb.smembers(cloud_listeners_channel))
-                        pprint.pprint(listeners)
-                        try:
-                            index = listeners.index(source_sid)
-                        except ValueError:
-                            index = None
-                        print "[%s - index: %s]" % (source_sid, index)
-                        if index:
-                            listeners.pop(index) # remove source sid, just update other listeners
-                        
-                        # ready to send updates
-                        for l in listeners:
-                            l_channel = "%s_sid_%s" % (self.prefix, l)
-                            result = "0:%s:%s" % (command, json_data) 
-                            print "SEND_UPDATE: [%s] [%s]" % (l_channel, command)
-                            self.rdb.publish(l_channel, result)
+            
+            print "SEND_UPDATES: %s" % command
+            if command == 'save_canvas':
+                if 'clouds' in d:
+                    for c in d['clouds']:
+                        if 'id' in c and str(c['id']) in cloud_shares:
+                            cloud_listeners_channel = '%s_cloudlisteners_%s' % (self.prefix, c['id'])
+                            listeners = list(self.rdb.smembers(cloud_listeners_channel))
+                            pprint.pprint(listeners)
+                            try:
+                                index = listeners.index(source_sid)
+                            except ValueError:
+                                index = None
+                            print "[%s - index: %s]" % (source_sid, index)
+                            if index:
+                                listeners.pop(index) # remove source sid, just update other listeners
                             
+                            # ready to send updates
+                            for l in listeners:
+                                l_channel = "%s_listener_%s" % (self.prefix, l)
+                                result = "0:%s:%s" % (command, json_data) 
+                                print "SEND_UPDATE: [%s] [%s]" % (l_channel, command)
+                                self.rdb.publish(l_channel, result)
+            elif command == 'share_cloud':
+                if 'share_cloud' in d:
+                    if 'user_list_add' in d['share_cloud']:
+                        for k in d['share_cloud']['user_list_add']:
+                            a = d['share_cloud']['user_list_add'][k]
+                            if 'cloud_id' in a and 'disamb_source_memberid' in a:
+                                print "SEND_UPDATE for [%s %s %s]" % (command, a['cloud_id'], a['user_token'])
+                                l_channel = '%s_listener_%s' % (self.prefix, a['user_token'])
+                                result = "0:%s:%s" % (command, json.dumps(a))
+                                self.rdb.publish(l_channel, result)
+                    if 'user_list_remove' in d['share_cloud']:
+                        for k in d['share_cloud']['user_list_add']:
+                            a = d['share_cloud']['user_list_add'][k]
+                            if 'cloud_id' in a and 'disamb_source_memberid' in a:
+                                print "SEND_UPDATE for [%s %s %s]" % (command, a['cloud_id'], a['user_token'])
+                                l_channel = '%s_listener_%s' % (self.prefix, a['user_token'])
+                                result = "0:%s:%s" % (command, json.dumps(a))
+                                self.rdb.publish(l_channel, result)
                             
                             
 
-
-
 if __name__ == '__main__':
     print ("initializing server")
     application = Application()

File notify/worker.py

View file
         cmd = [cmd_exe]
         proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
         output = proc.communicate(data)
-        publish_channel = '%s_sid_%s' % (self.prefix, sid)
+        publish_channel = '%s_listener_%s' % (self.prefix, sid)
         result = "%s:%s:%s" % (rid, command, output[0])
         print "%s Worker[%s]: publish_channel[%s] completed" % (self.prefix, self.type, publish_channel)
         self.rdb.publish(publish_channel, result)