Ernesto Menéndez avatar Ernesto Menéndez committed f5d15ba Merge

merge juanjose into default

Comments (0)

Files changed (6)

     },
     "apps" : {
         "cman" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23601,
             "pyramid_configurator_settings" : {
                 "mako.directories" : "nxweb.assets:templates",
             }
         },
         "cmanws" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23602
         },
         "auth" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23603,
             "pyramid_configurator_settings" : {
                 "mako.directories" : "nxweb.assets:templates",
             }
         },
         "admin" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23604,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret",
             }
         },
         "wman" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23605,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret"
             }
         },
         "wmanws" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23606
         },
         "daac" : {
-            "host" : "192.168.1.3",
+            "host" : "192.168.10.101",
             "port" : 23607,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret"

nxweb/apps/wmanws/__init__.py

 import json
-
+from threading import Timer
 from exception import *
 from worker_manager import WorkerManager
 
 """List of all monitor websocket connections """
 _monitor_websockets = []
 
+_websocket_timer = {}
+
 def get_worker_manager():
     """Initialize and returns reference to the Worker Manager Instance"""
     global _worker_manager
 
     debug("reading websocket message")
 
-    message = websocket.receive()
-    message = json.loads(message)
+    message = None
+    try:
+        message = websocket.receive()
+        message = json.loads(message)
+    except:
+        websocket.close()
+        return
 
     if message == "monitor":
         #Si recibimos el string monitor a travez del websocket
         worker_token = message
         worker_manager.online(worker_token, websocket, environ.get('REMOTE_ADDR'))
         debug("initiating worker connection")
+
+        _websocket_timer[worker_token] = Timer(5.0, worker_manager.offline,args=[worker_token])
+        _websocket_timer[worker_token].start()
+
         while True:
             message = None
             try:
                 message = websocket.receive()
             except:
                 debug("##connection closed .. error on communication")
-                worker_manager.offline(worker_token)
+                if worker_manager.is_online(worker_token):
+                    worker_manager.offline(worker_token)
                 break
 
             if message is None:
                 debug("##connection closed by the worker")
-                worker_manager.offline(worker_token)
+                if worker_manager.is_online(worker_token):
+                    worker_manager.offline(worker_token)
                 break
             elif message == "error":
                 done_task_key = websocket.receive()
                 debug("#finished_tasks with error#")
                 worker_manager.task_finished_with_error(done_task_key, worker_token)
                 continue
+            elif message == "keepalive":
+#                print "keep alive ######"
+                _websocket_timer[worker_token].cancel()
+                _websocket_timer[worker_token] = Timer(5.0, worker_manager.offline,args=[worker_token])
+                _websocket_timer[worker_token].start()
             elif message in worker_manager.get_pending_tasks():
                 done_task_key = message
                 worker_manager.task_finished(done_task_key, worker_token)

nxweb/apps/wmanws/helpers.py

         self.assigned_tasks.remove(task_key)
 
     def disconnected(self):
+        self.websocket.close()
         try:
             self.websocket.close()
+            print "closing websocket"
         except:
             pass
             #raise WorkerCommunicationError()

nxweb/apps/wmanws/worker_manager.py

                      self.attribute_index[k][v].remove(worker_token)
         ##########
         self.notify_all_monitors_callback()
+
+    def is_online(self, worker_token):
+        self.online_workers.get(worker_token) is not None
 #-----------------------------Tasks--------------------------------------
     def new_task(self, program_key, task_key):
         """Called when a new task has been created"""

nxweb/data/dkvs/__init__.py

 import riak
 import time
-
+import StringIO
+import requests
+import json
 from nxweb.data.dkvs.exception import InvalidId
 
 
         host = "localhost"
     port = _server.get('port')
 
-    import StringIO
-    import pycurl
-    import json
-    c =  pycurl.Curl()
-    c.setopt(pycurl.URL, "http://%s:%s/riak/%s?keys=true&props=false" % (host,port,bucket_name))
 
-    b = StringIO.StringIO()
-    c.setopt(pycurl.WRITEFUNCTION, b.write)
-    c.setopt(pycurl.FOLLOWLOCATION, 1)
-    c.setopt(pycurl.MAXREDIRS, 5)
-    c.perform()
-    data = b.getvalue()
-    data = json.loads(data)
-    keys = data.get('keys')
+    r = requests.get("http://%s:%s/riak/%s?keys=true&props=false" % (host,port,bucket_name))
+    keys = r.json.get('keys')
+
 
     length = len(keys)
     i = 0
     wmanws_conf = config['apps']['wmanws']
     wmanws_wsgi_server = WSGIServer((wmanws_conf['host'], wmanws_conf['port']), wmanws.accept_ws_connection, handler_class=geventwebsocket.WebSocketHandler)
     # -------------------------------------------------------------------------
-
+    import gevent
+#    print dir(gevent)
+#    print gevent.__path__
     # daac  -------------------------------------------------------------------
     daac_conf = config['apps']['daac']
     daac_wsgi_app = daac.make_wsgi_app(**daac_conf['pyramid_configurator_settings'])
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.