Source

nx-web / nxweb / apps / wmanws / worker_manager.py

import sets
from Queue import Queue
import nxweb.data.dkvs.attribute as attribute_dkvs
import nxweb.data.dkvs.task as task_dkvs
import nxweb.data.dkvs.pubkey as pubkey_dkvs
import json
from helpers import *
import copy
from exception import *

class WorkerManager():

    def __init__(self, notify_all_monitors_callback):
        self.online_workers = {}
        self.queued_workers = {}
        self.attribute_index = {}
        self.round_robin_index = 0
        self.task_queue = []
        self.finished_tasks = []
        self.new_unassigned_task_index = {}
        self.total_num_task = 0
        self.notify_all_monitors_callback = notify_all_monitors_callback
        Monitor.initialize(notify_all_monitors_callback)

#-------------------------Workers---------------------------------------------#
    def logging_in(self, attributes):
        """Called when a worker makes a login requet,
        returns the newly generated worker token"""
        worker = Worker(attributes)
        worker_token = worker.token
        if self.online_workers.get(worker_token) is not None or self.queued_workers.get(worker_token) is not None:
            raise DuplicatedWorker()
        self.queued_workers[worker_token] = worker
        return worker.token

    def online(self, worker_token, websocket, ip_addr):
        """Called when a worker makes a websocket connection,
        verifies the validity of the worker token """

        if self.queued_workers.get(worker_token) is None:
            raise NonexistentWorker()
        worker = copy.copy(self.queued_workers[worker_token])
        del self.queued_workers[worker_token]
        worker.websocket = websocket
        worker.ip = ip_addr
        self.online_workers[worker_token] = worker


        #for indexing pourposes
        if worker.attributes:
            for (k, values) in  worker.attributes.items():
                if not hasattr(values, "__iter__"):
                    values = [values]

                for v in values:
                    if self.attribute_index.get(k) is None:
                        self.attribute_index[k]={v:[worker_token]}
                    elif self.attribute_index.get(k).get(v) is None:
                        self.attribute_index[k][v]=[worker_token]
                    else:
                        self.attribute_index[k][v].append(worker_token)
        self.worker_ready_for_new_task(worker_token)
        self.notify_all_monitors_callback()

    def worker_ready_for_new_task(self, worker_token):
        """This function is called when a worker is available for task processing"""

        for task_criteria_list in self.new_unassigned_task_index.keys():
            criteria = dict(task_criteria_list)
            if worker_token in self.workers_with_all_criteria(criteria):

                try:
                    program_key, task_key = self.new_unassigned_task_index.get(task_criteria_list).get_nowait()
                    worker = self.online_workers.get(worker_token)
                    if worker is None:
                        raise NonexistentWorker()
                    task_dkvs.assign_worker(task_key, worker_token)
                    worker.assign_task(program_key, task_key)
                    return
                except:
                    pass

    def offline(self, worker_token):
        """Called when a worker websocket connections is terminated"""
        worker = self.online_workers.get(worker_token)
        worker.disconnected()

        if self.online_workers.get(worker_token) is None:
            raise NonexistentWorker()
        del self.online_workers[worker_token]

        for task_key in worker.get_unfinished_tasks():
            program_key = task_dkvs.get_metadata(task_key).get('program_key')
            Monitor.requeue_task(task_key)
            self.new_task(program_key, task_key)
            task_dkvs.set_status_requeued(task_key)
            worker.remove_task(task_key)

        #for indexing pourposes
        if worker.attributes:
            for (k, values) in  worker.attributes.iteritems():
                 if not hasattr(values, "__iter__"):
                    values = [values]

                 for v in values:
                     self.attribute_index[k][v].remove(worker_token)
        ##########
        self.notify_all_monitors_callback()
#-----------------------------Tasks--------------------------------------
    def new_task(self, program_key, task_key):
        """Called when a new task has been created"""
        #Logica:
        #1ro encolo la tarea como sin asignar
        #2ndo Busco si hay algun worker sin tareas para asignarsela
        #si no hay ninguno sin tareas, no se la asigno a nadie
        #3cada vez que un worker termina una tarea pide otra asi que eventualmente
        #se ejecutara
        if task_dkvs.get_metadata(task_key) is None:
            raise Exception

        self.task_queue.append(task_key)
        criteria = task_dkvs.get_metadata(task_key).get('worker_criteria')

        Monitor.new_task(task_key, program_key)

        self.new_unassigned_task(program_key, task_key, criteria)

        for worker_token, worker_obj in self.online_workers.iteritems():
            if len(worker_obj.assigned_tasks) <= 0:
                self.worker_ready_for_new_task(worker_token)
                break

    def task_finished(self, task_key, worker_token):
        """Called when a task has been finished """

        self.task_queue.remove(task_key)
        self.online_workers.get(worker_token).remove_task(task_key)
        self.worker_ready_for_new_task(worker_token)
        Monitor.task_done(task_key)

    def task_finished_with_error(self, task_key, worker_token):
        """Called when a task has been finished but an Error ocurred
        on the program"""

        self.task_queue.remove(task_key)
        self.online_workers.get(worker_token).remove_task(task_key)
        self.worker_ready_for_new_task(worker_token)
        Monitor.task_failed(task_key)

    def new_unassigned_task(self, program_key, task_key, criteria):
        """Used to queue a task prior to be assigned to a worker """

        key = sets.ImmutableSet(None)
        if criteria is not None:
            key = sets.ImmutableSet(criteria.items())

        if self.new_unassigned_task_index.get(key) is None:
            self.new_unassigned_task_index[key] = Queue()

        self.new_unassigned_task_index[key].put_nowait((program_key, task_key))

    def get_pending_tasks(self):
        return self.task_queue
#---------------------Worker Selection--------------------------------

    def workers_with_criteria(self, name, value):
        """This function returns a list of worker token_sessions
        that have the criteria with the value specified"""
        if not attribute_dkvs.are_valid({name:value}):
            return None

        if self.attribute_index.get(name) is None:
            return None
        else:
            return self.attribute_index.get(name).get(value)

    def workers_with_all_criteria(self,criteria):
        """This function returns a list of worker token_sessions
        that have all the criteria with the values specified

        criteria={"O.S.":"Windows7","architecture":"x86"}"""

        if criteria is None:
            return self.online_workers.keys()

        if not attribute_dkvs.are_valid(criteria):
            return []
        results = []
        for (k, v) in criteria.items():
            r = self.workers_with_criteria(k, v)
            if r is None:
                return [];
            results.append(r)

        result = set(results[0])
        for i in results:
            result = set(result) & set(i)
        return list(result)


#---------------------------Monitors----------------------------

    def get_worker_monitor(self):
        """Used to send the info to the monitors """

        def task_short_print(task_comp_key):

            pubkey_key, task_key = task_comp_key.split("+")
            task_metadata = task_dkvs.get_metadata(task_comp_key)
            if task_metadata is None:
                return ""
            task = task_metadata['program_key'].split("+")
            task.append(repr(task_metadata['counter']))
            #[user]-[program_name]-[task_number]
            return "-".join(task)

        monitor_data = []
        for (worker_token, worker) in self.online_workers.iteritems():
            task_monitor = []

            for task_comp_key in worker.assigned_tasks:

                task = task_short_print(task_comp_key)
                task_monitor.append(task)
            if len(task_monitor)==0:
                task_monitor = ""
            else:
                task_monitor = ",".join(task_monitor)
            monitor_data.append({"worker_id":repr(worker.worker_number),
                                 "worker_ip":worker.ip,
                                 "assigned_tasks":task_monitor})
        return monitor_data

    def get_task_list_monitor(self):
        """Used to send the info to the monitors """
        return Monitor.get_task_list()

    def get_tasks_done_list_monitor(self):
        """Used to send the info to the monitors """
        return Monitor.get_tasks_done_list()