Ernesto Menéndez avatar Ernesto Menéndez committed c41e681 Merge

merge juanjose into default

Comments (0)

Files changed (10)

             },
             "sessions" : {
                 "db" : 1,
-                "ttl" : 300
+                "ttl" : 300000
             }
         }
     },
     "apps" : {
         "cman" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23601,
             "pyramid_configurator_settings" : {
                 "mako.directories" : "nxweb.assets:templates",
             }
         },
         "cmanws" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23602
         },
         "auth" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23603,
             "pyramid_configurator_settings" : {
                 "mako.directories" : "nxweb.assets:templates",
             }
         },
         "admin" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23604,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret",
             }
         },
         "wman" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23605,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret"
             }
         },
         "wmanws" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23606
         },
         "daac" : {
-            "host" : "0.0.0.0",
+            "host" : "192.168.1.3",
             "port" : 23607,
             "pyramid_configurator_settings" : {
                 "auth_policy_secret" : "something secret"
Add a comment to this file

nxweb/apps/admin/html/public/blender_test.py.zip

Binary file modified.

nxweb/apps/daac/views/__init__.py

     worker_token = req.headers.get('worker_token')
     data = req.body
     task.write_output_data(task_key, worker_token, data)
+    task.set_error_status(task_key, worker_token)
 
     # TODO: remover codigo temporal: activar evento de tarea completada
     pkid, task_id = split_key(task_key)

nxweb/apps/wmanws/__init__.py

-import sets
 import json
-import copy
-import nxweb.data.dkvs.attribute as attribute_dkvs
-import nxweb.data.dkvs.task as task_dkvs
-import nxweb.data.dkvs.pubkey as pubkey_dkvs
-import uuid
-from Queue import Queue
+
 from exception import *
+from worker_manager import WorkerManager
 
+"""Worker Manager Isntance Reference """
 _worker_manager = None
+
+"""List of all monitor websocket connections """
 _monitor_websockets = []
+
 def get_worker_manager():
+    """Initialize and returns reference to the Worker Manager Instance"""
     global _worker_manager
     if _worker_manager is None:
-        _worker_manager = WorkerManager()
+        _worker_manager = WorkerManager(notify_all_monitors)
     return _worker_manager
 
-
-def http_handler(environ, start_response):
-    if environ["PATH_INFO"].strip("/") == "version":
-        start_response("200 OK", [])
-        return ['wmanws/1.0']
-    else:
-        start_response("400 Bad Request", [])
-        return ["WebSocket connection is expected here."]
-
+def notify_all_monitors():
+    """This function sends all the "monitor data" to the monitor interface,
+    through the websocket """
+    global _monitor_websockets
+    worker_manager = get_worker_manager()
+    data = {"worker_monitor": worker_manager.get_worker_monitor(),
+            "task_queue":     worker_manager.get_task_list_monitor(),
+            "tasks_done":     worker_manager.get_tasks_done_list_monitor()}
+    data = json.dumps(data)
+    for ws in _monitor_websockets:
+        ws.send(data)
 
 def accept_ws_connection(environ, start_response):
+    """Handles the websocket connection request"""
     global _monitor_websockets
     worker_manager = get_worker_manager()
 
         return http_handler(environ, start_response)
 
     debug("reading websocket message")
-    worker_token = None
-    #try:
+
     message = websocket.receive()
-    print message
     message = json.loads(message)
-    #except:
-        #raise WorkerCommunicationError()
 
     if message == "monitor":
+        #Si recibimos el string monitor a travez del websocket
+        #lo tratamos como la connection con un monitor
         debug("Initiating Monitor Connection "+str(len(_monitor_websockets)))
         _monitor_websockets.append(websocket)
-        get_worker_manager().notify_all_monitors()
+        notify_all_monitors()
         try:
             websocket.receive()
         except:
         _monitor_websockets.remove(websocket)
         debug("Closing Monitor Connection")
     else:
+        #Asumimos que el mensaje que nos mandan es el token del worker
         worker_token = message
         worker_manager.online(worker_token, websocket, environ.get('REMOTE_ADDR'))
         debug("initiating worker connection")
             try:
                 message = websocket.receive()
             except:
-                raise WorkerCommunicationError()
+                debug("##connection closed .. error on communication")
+                worker_manager.offline(worker_token)
+                break
 
             if message is None:
                 debug("##connection closed by the worker")
                 worker_manager.offline(worker_token)
                 break
+            elif message == "error":
+                done_task_key = websocket.receive()
+                debug("#finished_tasks with error#")
+                worker_manager.task_finished_with_error(done_task_key, worker_token)
+                continue
             elif message in worker_manager.get_pending_tasks():
                 done_task_key = message
                 worker_manager.task_finished(done_task_key, worker_token)
                 debug("##Handle recieved message")
                 raise UnhandledWorkerMessage()
 
+def http_handler(environ, start_response):
+    if environ["PATH_INFO"].strip("/") == "version":
+        start_response("200 OK", [])
+        return ['wmanws/1.0']
+    else:
+        start_response("400 Bad Request", [])
+        return ["WebSocket connection is expected here."]
+
 
 def debug(msg):
     print "WMANWS: " + msg
-
-class WorkerManager():
-
-    def notify_all_monitors(self):
-        global _monitor_websockets
-        data = {"worker_monitor": self.get_worker_monitor(),
-                "task_queue": Monitor.get_task_list(),
-                "tasks_done": Monitor.get_tasks_done_list()}
-        data = json.dumps(data)
-        for ws in _monitor_websockets:
-            ws.send(data)
-
-    def task_short_print(self, task_comp_key):
-        pubkey_key, task_key = task_comp_key.split("+")
-        task_metadata = task_dkvs.get_metadata(task_comp_key)
-
-
-        task = task_metadata['program_key'].split("+")
-        task.append(repr(task_metadata['counter']))
-        #[user] [program_name] [task_number]
-        return "-".join(task)
-
-
-    def get_worker_monitor(self):
-        monitor_data = []
-        for (worker_token, worker) in self.online_workers.iteritems():
-            task_monitor = []
-
-            for task_comp_key in worker.assigned_tasks:
-
-                task = self.task_short_print(task_comp_key)
-                task_monitor.append(task)
-            if len(task_monitor)==0:
-                task_monitor = ""
-            else:
-                task_monitor = task_monitor[0]
-            monitor_data.append({"worker_id":repr(worker.worker_number),"worker_ip":worker.ip,"assigned_tasks":task_monitor})
-        return monitor_data
-
-    def get_task_queue_monitor(self):
-        return Monitor.get_task_list()
-
-    def get_finished_tasks_monitor(self):
-        return Monitor.get_tasks_done_list()
-
-    def __init__(self):
-        self.online_workers = {}
-        self.queued_workers = {}
-        self.attribute_index = {}
-        self.round_robin_index = 0
-        self.task_queue = []
-        self.finished_tasks = []
-        self.new_unassigned_task_index = {}
-        self.total_num_task = 0
-
-    def logging_in(self, attributes):
-
-        worker = Worker(attributes)
-        worker_token = worker.token
-        if self.online_workers.get(worker_token) is not None or self.queued_workers.get(worker_token) is not None:
-            raise DuplicatedWorker()
-        self.queued_workers[worker_token] = worker
-        return worker.token
-
-    def online(self, worker_token, websocket, ip_addr):
-
-        if self.queued_workers.get(worker_token) is None:
-            raise NonexistentWorker()
-        worker = copy.copy(self.queued_workers[worker_token])
-        del self.queued_workers[worker_token]
-        worker.websocket = websocket
-        worker.ip = ip_addr
-        self.online_workers[worker_token] = worker
-
-
-        #for indexing pourposes
-        if worker.attributes:
-            for (k, values) in  worker.attributes.items():
-                if not hasattr(values, "__iter__"):
-                    values = [values]
-
-                for v in values:
-                    if self.attribute_index.get(k) is None:
-                        self.attribute_index[k]={v:[worker_token]}
-                    elif self.attribute_index.get(k).get(v) is None:
-                        self.attribute_index[k][v]=[worker_token]
-                    else:
-                        self.attribute_index[k][v].append(worker_token)
-        self.worker_ready_for_new_task(worker_token)
-        self.notify_all_monitors()
-
-
-    def offline(self, worker_token):
-        worker = self.online_workers.get(worker_token)
-        worker.disconnected()
-
-        if self.online_workers.get(worker_token) is None:
-            raise NonexistentWorker()
-        del self.online_workers[worker_token]
-
-        for task_key in worker.get_unfinished_tasks():
-            program_key = task_dkvs.get_metadata(task_key).get('program_key')
-            Monitor.requeue_task(task_key)
-            self.new_task(program_key, task_key)
-            task_dkvs.set_status_requeued(task_key)
-            worker.remove_task(task_key)
-
-        #for indexing pourposes
-        if worker.attributes:
-            for (k, values) in  worker.attributes.iteritems():
-                 if not hasattr(values, "__iter__"):
-                    values = [values]
-
-                 for v in values:
-                     self.attribute_index[k][v].remove(worker_token)
-        ##########
-        self.notify_all_monitors()
-
-    def new_task(self, program_key, task_key):
-        #Logica:
-        #1ro encolo la tarea como sin asignar
-        #2ndo Busco si hay algun worker sin tareas para asignarsela
-        #si no hay ninguno sin tareas, no se la asigno a nadie
-        #3cada vez que un worker termina una tarea pide otra asi que eventualmente
-        #se ejecutara
-        if task_dkvs.get_metadata(task_key) is None:
-            raise Exception
-
-        self.task_queue.append(task_key)
-        criteria = task_dkvs.get_metadata(task_key).get('worker_criteria')
-
-        Monitor.new_task(task_key, program_key)
-
-        self.new_unassigned_task(program_key, task_key, criteria)
-
-        for worker_token, worker_obj in self.online_workers.iteritems():
-            if len(worker_obj.assigned_tasks) <= 0:
-                self.worker_ready_for_new_task(worker_token)
-                break
-
-
-    def task_finished(self, task_key, worker_token):
-        #self.finished_tasks.append(WorkerManager.task_pretty_print(task_key))
-        self.task_queue.remove(task_key)
-
-        self.online_workers.get(worker_token).remove_task(task_key)
-
-        self.worker_ready_for_new_task(worker_token)
-        Monitor.task_done(task_key)
-
-
-    def worker_ready_for_new_task(self, worker_token):
-        for task_criteria_list in self.new_unassigned_task_index.keys():
-            criteria = dict(task_criteria_list)
-            if worker_token in self.workers_with_all_criteria(criteria):
-
-                try:
-                    program_key, task_key = self.new_unassigned_task_index.get(task_criteria_list).get_nowait()
-                    self.assing_task(program_key, task_key, worker_token)
-                    return
-                except:
-                    pass
-
-
-#########################
-    def workers_with_criteria(self, name, value):
-        """This function returns a list of worker token_sessions
-        that have the criteria with the value specified"""
-        if not attribute_dkvs.are_valid({name:value}):
-            return None
-
-        if self.attribute_index.get(name) is None:
-            return None
-        else:
-            return self.attribute_index.get(name).get(value)
-
-    def workers_with_all_criteria(self,criteria):
-        """This function returns a list of worker token_sessions
-        that have all the criteria with the values specified
-
-        criteria={"O.S.":"Windows7","architecture":"x86"}"""
-
-        if criteria is None:
-            return self.online_workers.keys()
-
-        if not attribute_dkvs.are_valid(criteria):
-            return []
-        results = []
-        for (k, v) in criteria.items():
-            r = self.workers_with_criteria(k, v)
-            if r is None:
-                return [];
-            results.append(r)
-
-        result = set(results[0])
-        for i in results:
-            result = set(result) & set(i)
-        return list(result)
-
-
-    def choose_round_robin_worker(self, criteria):
-
-        keys = None
-        keys = self.workers_with_all_criteria(criteria)
-
-        if len(keys) == 0:
-            raise NoWorkerConnected()
-        self.round_robin_index = (self.round_robin_index + 1) % len(keys)
-        if  keys[self.round_robin_index] is None:
-            raise NoWorkerConnected()
-        return keys[self.round_robin_index]
-
-    def assing_task(self, program_key, task_key, worker_token):
-        worker = self.online_workers.get(worker_token)
-        if worker is None:
-            raise NonexistentWorker()
-        task_dkvs.assign_worker(task_key, worker_token)
-        worker.assign_task(program_key, task_key)
-
-
-
-    def new_unassigned_task(self, program_key, task_key, criteria):
-
-        key = sets.ImmutableSet(None)
-        if criteria is not None:
-            key = sets.ImmutableSet(criteria.items())
-
-        if self.new_unassigned_task_index.get(key) is None:
-            self.new_unassigned_task_index[key] = Queue()
-
-        self.new_unassigned_task_index[key].put_nowait((program_key, task_key))
-
-    def get_pending_tasks(self):
-        return self.task_queue
-
-class Task():
-
-    counter = 0
-
-    def __init__(self, task_key, owner_user, program_name, program_version):
-            #[user] [program_name] [program_version] [task_number] [status] [assigned_worker]
-        self.task_key = task_key
-        self.owner_user = owner_user
-        self.program_name = program_name
-        self.program_version = program_version
-        self.task_number = Task.counter
-        Task.counter += 1
-        self.status = "Queued"
-        self.assigned_worker = ""
-
-    def assign(self, worker):
-        self.assigned_worker = worker
-        if self.status == "Re-Queued":
-            self.status = "Re-Processing"
-        else:
-            self.status = "Processing"
-
-    def requeue(self):
-        self.status = "Re-Queued"
-        self.assigned_worker = ""
-
-    def done(self):
-        if self.status == "Re-Processing":
-            self.status = "Done (re-tried)"
-        else:
-            self.status = "Done"
-
-    def get_long_data(self):
-        return {"Owner":self.owner_user,
-                "Program":self.program_name,
-                "Version":self.program_version,
-                "TaskNumber":self.task_number,
-                "Status":self.status,
-                "AssignedWorker":self.assigned_worker}
-
-    #def get_short_data(self):
-        #return [self.owner_user,
-                #self.program_name,
-                #self.task_number]
-
-class Monitor():
-
-    tasks = {}
-    task_queue_list = []
-    tasks_done_list = []
-
-    @staticmethod
-    def new_task(task_key, program_key):
-        if Monitor.tasks.get(task_key) is None:
-            owner_user, program_name, program_version = program_key.split("+")
-            task = Task(task_key, owner_user, program_name, program_version)
-            Monitor.tasks[task_key] = task
-            Monitor.task_queue_list.append(task)
-        else:
-            Monitor.tasks.get(task_key).requeue()
-
-        get_worker_manager().notify_all_monitors()
-
-    @staticmethod
-    def assign_task(task_key, worker):
-        task = Monitor.tasks[task_key]
-        task.assign(worker)
-
-        get_worker_manager().notify_all_monitors()
-
-    @staticmethod
-    def requeue_task(task_key):
-        Monitor.tasks[task_key].requeue()
-        get_worker_manager().notify_all_monitors()
-
-    @staticmethod
-    def task_done(task_key):
-        task = Monitor.tasks[task_key]
-        task.done()
-        Monitor.tasks_done_list.append(task)
-        Monitor.task_queue_list.remove(task)
-        get_worker_manager().notify_all_monitors()
-
-    @staticmethod
-    def get_task_list():
-        r = []
-        for task in Monitor.task_queue_list:
-            r.append(task.get_long_data())
-        return r
-
-    @staticmethod
-    def get_tasks_done_list():
-        r = []
-        for task in Monitor.tasks_done_list:
-            r.append(task.get_long_data())
-        r.reverse()
-        return r
-
-class Worker():
-    counter = 0
-    def __init__(self, attributes):
-
-        if not attribute_dkvs.are_valid(attributes):
-            raise InvalidWorkerAttrbute()
-
-        self.token          = str(uuid.uuid1())
-        self.websocket      = None
-        self.assigned_tasks = []
-        self.attributes     = attributes
-        self.worker_number = Worker.counter
-        self.ip = ""
-        Worker.counter += 1
-
-    def assign_task(self,program_key, task_key):
-        try:
-            self.assigned_tasks.append(task_key)
-            self.websocket.send(program_key+":"+task_key)
-            Monitor.assign_task(task_key, self.worker_number)
-        except:
-            raise WorkerCommunicationError()
-
-    def remove_task(self, task_key):
-        self.assigned_tasks.remove(task_key)
-
-    def disconnected(self):
-        try:
-            self.websocket.close()
-        except:
-            raise WorkerCommunicationError()
-
-    def get_unfinished_tasks(self):
-        return self.assigned_tasks

nxweb/apps/wmanws/helpers.py

+import nxweb.data.dkvs as dkvs
+from exception import *
+import uuid
+
+class Task():
+
+    counter = 0
+
+    def __init__(self, task_key, owner_user, program_name, program_version):
+            #[user] [program_name] [program_version] [task_number] [status] [assigned_worker]
+        self.task_key = task_key
+        self.owner_user = owner_user
+        self.program_name = program_name
+        self.program_version = program_version
+        self.task_number = Task.counter
+        Task.counter += 1
+        self.status = "Queued"
+        self.assigned_worker = ""
+
+    def assign(self, worker):
+        self.assigned_worker = worker
+        if self.status == "Re-Queued":
+            self.status = "Re-Processing"
+        else:
+            self.status = "Processing"
+
+    def requeue(self):
+        self.status = "Re-Queued"
+        self.assigned_worker = ""
+
+    def done(self):
+        if self.status == "Re-Processing":
+            self.status = "Done (re-tried)"
+        else:
+            self.status = "Done"
+    def error(self):
+        self.status = "Error"
+
+    def get_long_data(self):
+        return {"Owner":self.owner_user,
+                "Program":self.program_name,
+                "Version":self.program_version,
+                "TaskNumber":self.task_number,
+                "Status":self.status,
+                "AssignedWorker":self.assigned_worker}
+
+class Monitor():
+
+    tasks = {}
+    task_queue_list = []
+    tasks_done_list = []
+    notify_all_monitors_callback = None
+
+    @staticmethod
+    def initialize(notify_all_monitors_callback_a):
+        global notify_all_monitors_callback
+        notify_all_monitors_callback = notify_all_monitors_callback_a
+
+    @staticmethod
+    def new_task(task_key, program_key):
+        if Monitor.tasks.get(task_key) is None:
+            owner_user, program_name, program_version = program_key.split("+")
+            task = Task(task_key, owner_user, program_name, program_version)
+            Monitor.tasks[task_key] = task
+            Monitor.task_queue_list.append(task)
+        else:
+            Monitor.tasks.get(task_key).requeue()
+
+        notify_all_monitors_callback()
+
+    @staticmethod
+    def assign_task(task_key, worker):
+        task = Monitor.tasks[task_key]
+        task.assign(worker)
+
+        notify_all_monitors_callback()
+
+    @staticmethod
+    def requeue_task(task_key):
+        Monitor.tasks[task_key].requeue()
+        notify_all_monitors_callback()
+
+    @staticmethod
+    def task_done(task_key):
+        task = Monitor.tasks[task_key]
+        task.done()
+        Monitor.tasks_done_list.append(task)
+        Monitor.task_queue_list.remove(task)
+        notify_all_monitors_callback()
+
+    @staticmethod
+    def task_failed(task_key):
+        task = Monitor.tasks[task_key]
+        task.error()
+        Monitor.tasks_done_list.append(task)
+        Monitor.task_queue_list.remove(task)
+        notify_all_monitors_callback()
+
+    @staticmethod
+    def get_task_list():
+        r = []
+        for task in Monitor.task_queue_list:
+            r.append(task.get_long_data())
+        return r
+
+    @staticmethod
+    def get_tasks_done_list():
+        r = []
+        for task in Monitor.tasks_done_list:
+            r.append(task.get_long_data())
+        r.reverse()
+        return r
+
+class Worker():
+    counter = 0
+    def __init__(self, attributes):
+
+        if not dkvs.attribute.are_valid(attributes):
+            raise InvalidWorkerAttrbute()
+
+        self.token          = str(uuid.uuid1())
+        self.websocket      = None
+        self.assigned_tasks = []
+        self.attributes     = attributes
+        self.worker_number = Worker.counter
+        self.ip = ""
+        Worker.counter += 1
+
+    def assign_task(self,program_key, task_key):
+        try:
+            self.assigned_tasks.append(task_key)
+            self.websocket.send(program_key+":"+task_key)
+            Monitor.assign_task(task_key, self.worker_number)
+        except:
+            raise WorkerCommunicationError()
+
+    def remove_task(self, task_key):
+        self.assigned_tasks.remove(task_key)
+
+    def disconnected(self):
+        try:
+            self.websocket.close()
+        except:
+            pass
+            #raise WorkerCommunicationError()
+
+    def get_unfinished_tasks(self):
+        return self.assigned_tasks

nxweb/apps/wmanws/worker_manager.py

+import sets
+from Queue import Queue
+import nxweb.data.dkvs.attribute as attribute_dkvs
+import nxweb.data.dkvs.task as task_dkvs
+import nxweb.data.dkvs.pubkey as pubkey_dkvs
+import json
+from helpers import *
+import copy
+from exception import *
+
+class WorkerManager():
+
+    def __init__(self, notify_all_monitors_callback):
+        self.online_workers = {}
+        self.queued_workers = {}
+        self.attribute_index = {}
+        self.round_robin_index = 0
+        self.task_queue = []
+        self.finished_tasks = []
+        self.new_unassigned_task_index = {}
+        self.total_num_task = 0
+        self.notify_all_monitors_callback = notify_all_monitors_callback
+        Monitor.initialize(notify_all_monitors_callback)
+
+#-------------------------Workers---------------------------------------------#
+    def logging_in(self, attributes):
+        """Called when a worker makes a login requet,
+        returns the newly generated worker token"""
+        worker = Worker(attributes)
+        worker_token = worker.token
+        if self.online_workers.get(worker_token) is not None or self.queued_workers.get(worker_token) is not None:
+            raise DuplicatedWorker()
+        self.queued_workers[worker_token] = worker
+        return worker.token
+
+    def online(self, worker_token, websocket, ip_addr):
+        """Called when a worker makes a websocket connection,
+        verifies the validity of the worker token """
+
+        if self.queued_workers.get(worker_token) is None:
+            raise NonexistentWorker()
+        worker = copy.copy(self.queued_workers[worker_token])
+        del self.queued_workers[worker_token]
+        worker.websocket = websocket
+        worker.ip = ip_addr
+        self.online_workers[worker_token] = worker
+
+
+        #for indexing pourposes
+        if worker.attributes:
+            for (k, values) in  worker.attributes.items():
+                if not hasattr(values, "__iter__"):
+                    values = [values]
+
+                for v in values:
+                    if self.attribute_index.get(k) is None:
+                        self.attribute_index[k]={v:[worker_token]}
+                    elif self.attribute_index.get(k).get(v) is None:
+                        self.attribute_index[k][v]=[worker_token]
+                    else:
+                        self.attribute_index[k][v].append(worker_token)
+        self.worker_ready_for_new_task(worker_token)
+        self.notify_all_monitors_callback()
+
+    def worker_ready_for_new_task(self, worker_token):
+        """This function is called when a worker is available for task processing"""
+
+        for task_criteria_list in self.new_unassigned_task_index.keys():
+            criteria = dict(task_criteria_list)
+            if worker_token in self.workers_with_all_criteria(criteria):
+
+                try:
+                    program_key, task_key = self.new_unassigned_task_index.get(task_criteria_list).get_nowait()
+                    worker = self.online_workers.get(worker_token)
+                    if worker is None:
+                        raise NonexistentWorker()
+                    task_dkvs.assign_worker(task_key, worker_token)
+                    worker.assign_task(program_key, task_key)
+                    return
+                except:
+                    pass
+
+    def offline(self, worker_token):
+        """Called when a worker websocket connections is terminated"""
+        worker = self.online_workers.get(worker_token)
+        worker.disconnected()
+
+        if self.online_workers.get(worker_token) is None:
+            raise NonexistentWorker()
+        del self.online_workers[worker_token]
+
+        for task_key in worker.get_unfinished_tasks():
+            program_key = task_dkvs.get_metadata(task_key).get('program_key')
+            Monitor.requeue_task(task_key)
+            self.new_task(program_key, task_key)
+            task_dkvs.set_status_requeued(task_key)
+            worker.remove_task(task_key)
+
+        #for indexing pourposes
+        if worker.attributes:
+            for (k, values) in  worker.attributes.iteritems():
+                 if not hasattr(values, "__iter__"):
+                    values = [values]
+
+                 for v in values:
+                     self.attribute_index[k][v].remove(worker_token)
+        ##########
+        self.notify_all_monitors_callback()
+#-----------------------------Tasks--------------------------------------
+    def new_task(self, program_key, task_key):
+        """Called when a new task has been created"""
+        #Logica:
+        #1ro encolo la tarea como sin asignar
+        #2ndo Busco si hay algun worker sin tareas para asignarsela
+        #si no hay ninguno sin tareas, no se la asigno a nadie
+        #3cada vez que un worker termina una tarea pide otra asi que eventualmente
+        #se ejecutara
+        if task_dkvs.get_metadata(task_key) is None:
+            raise Exception
+
+        self.task_queue.append(task_key)
+        criteria = task_dkvs.get_metadata(task_key).get('worker_criteria')
+
+        Monitor.new_task(task_key, program_key)
+
+        self.new_unassigned_task(program_key, task_key, criteria)
+
+        for worker_token, worker_obj in self.online_workers.iteritems():
+            if len(worker_obj.assigned_tasks) <= 0:
+                self.worker_ready_for_new_task(worker_token)
+                break
+
+    def task_finished(self, task_key, worker_token):
+        """Called when a task has been finished """
+
+        self.task_queue.remove(task_key)
+        self.online_workers.get(worker_token).remove_task(task_key)
+        self.worker_ready_for_new_task(worker_token)
+        Monitor.task_done(task_key)
+
+    def task_finished_with_error(self, task_key, worker_token):
+        """Called when a task has been finished but an Error ocurred
+        on the program"""
+
+        self.task_queue.remove(task_key)
+        self.online_workers.get(worker_token).remove_task(task_key)
+        self.worker_ready_for_new_task(worker_token)
+        Monitor.task_failed(task_key)
+
+    def new_unassigned_task(self, program_key, task_key, criteria):
+        """Used to queue a task prior to be assigned to a worker """
+
+        key = sets.ImmutableSet(None)
+        if criteria is not None:
+            key = sets.ImmutableSet(criteria.items())
+
+        if self.new_unassigned_task_index.get(key) is None:
+            self.new_unassigned_task_index[key] = Queue()
+
+        self.new_unassigned_task_index[key].put_nowait((program_key, task_key))
+
+    def get_pending_tasks(self):
+        return self.task_queue
+#---------------------Worker Selection--------------------------------
+
+    def workers_with_criteria(self, name, value):
+        """This function returns a list of worker token_sessions
+        that have the criteria with the value specified"""
+        if not attribute_dkvs.are_valid({name:value}):
+            return None
+
+        if self.attribute_index.get(name) is None:
+            return None
+        else:
+            return self.attribute_index.get(name).get(value)
+
+    def workers_with_all_criteria(self,criteria):
+        """This function returns a list of worker token_sessions
+        that have all the criteria with the values specified
+
+        criteria={"O.S.":"Windows7","architecture":"x86"}"""
+
+        if criteria is None:
+            return self.online_workers.keys()
+
+        if not attribute_dkvs.are_valid(criteria):
+            return []
+        results = []
+        for (k, v) in criteria.items():
+            r = self.workers_with_criteria(k, v)
+            if r is None:
+                return [];
+            results.append(r)
+
+        result = set(results[0])
+        for i in results:
+            result = set(result) & set(i)
+        return list(result)
+
+
+#---------------------------Monitors----------------------------
+
+    def get_worker_monitor(self):
+        """Used to send the info to the monitors """
+
+        def task_short_print(task_comp_key):
+
+            pubkey_key, task_key = task_comp_key.split("+")
+            task_metadata = task_dkvs.get_metadata(task_comp_key)
+            if task_metadata is None:
+                return ""
+            task = task_metadata['program_key'].split("+")
+            task.append(repr(task_metadata['counter']))
+            #[user]-[program_name]-[task_number]
+            return "-".join(task)
+
+        monitor_data = []
+        for (worker_token, worker) in self.online_workers.iteritems():
+            task_monitor = []
+
+            for task_comp_key in worker.assigned_tasks:
+
+                task = task_short_print(task_comp_key)
+                task_monitor.append(task)
+            if len(task_monitor)==0:
+                task_monitor = ""
+            else:
+                task_monitor = ",".join(task_monitor)
+            monitor_data.append({"worker_id":repr(worker.worker_number),
+                                 "worker_ip":worker.ip,
+                                 "assigned_tasks":task_monitor})
+        return monitor_data
+
+    def get_task_list_monitor(self):
+        """Used to send the info to the monitors """
+        return Monitor.get_task_list()
+
+    def get_tasks_done_list_monitor(self):
+        """Used to send the info to the monitors """
+        return Monitor.get_tasks_done_list()

nxweb/data/dkvs/__init__.py

         delete_all_keys(bucket)
 
 def delete_all_keys(bucket_name, sleeptime=2, num_pause=100):
+    global _server
     #print "###if keeps crashing run 'ulimit -n 4096' first to change the limit of open files"
     bucket = BUCKET_GETTERS.get(bucket_name)()
     #keys = get_all_keys(bucket_name)
-    keys = bucket.get_keys()
+    #keys = bucket.get_keys()
+
+    host = _server.get('host')
+    if host is None:
+        host = "localhost"
+    port = _server.get('port')
+
+    import StringIO
+    import pycurl
+    import json
+    c =  pycurl.Curl()
+    c.setopt(pycurl.URL, "http://%s:%s/riak/%s?keys=true&props=false" % (host,port,bucket_name))
+
+    b = StringIO.StringIO()
+    c.setopt(pycurl.WRITEFUNCTION, b.write)
+    c.setopt(pycurl.FOLLOWLOCATION, 1)
+    c.setopt(pycurl.MAXREDIRS, 5)
+    c.perform()
+    data = b.getvalue()
+    data = json.loads(data)
+    keys = data.get('keys')
+
     length = len(keys)
     i = 0
     print "Deleting %s Items" % length

nxweb/data/dkvs/task/__init__.py

 
 def set_status_requeued(task_key):
     metadata.set_status_requeued(task_key)
+
+def set_error_status(task_key, worker_token):
+    metadata.set_status_error(task_key)

nxweb/data/dkvs/task/metadata.py

     task.set_data(task_data)
     task.store()
 
+def set_status_error(task_key):
+    task = _get_bucket().get(task_key)
+    task_data = task.get_data()
+    task_data['status'] = "Error"
+    task.set_data(task_data)
+    task.store()
+
 def assign_worker(task_key, worker_token):
     task = _get_bucket().get(task_key)
     task_data = task.get_data()
 
     cache.initialize(**config['redis_cache'])
     dkvs.initialize(**config['riak'])
-    #admin.initialize(wmanws_config=config['apps']['wmanws'])
 
     for bucket_name in ['task_metadata', 'task_input_data_chunk', 'task_output_data_chunk']:
         print "Empty %s Bucket" % bucket_name
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.