Source

nx-web / nxweb / apps / wmanws / helpers.py

Full commit
import nxweb.data.dkvs as dkvs
from exception import *
import uuid

class Task():

    counter = 0

    def __init__(self, task_key, owner_user, program_name, program_version):
            #[user] [program_name] [program_version] [task_number] [status] [assigned_worker]
        self.task_key = task_key
        self.owner_user = owner_user
        self.program_name = program_name
        self.program_version = program_version
        self.task_number = Task.counter
        Task.counter += 1
        self.status = "Queued"
        self.assigned_worker = ""

    def assign(self, worker):
        self.assigned_worker = worker
        if self.status == "Re-Queued":
            self.status = "Re-Processing"
        else:
            self.status = "Processing"

    def requeue(self):
        self.status = "Re-Queued"
        self.assigned_worker = ""

    def done(self):
        if self.status == "Re-Processing":
            self.status = "Done (re-tried)"
        else:
            self.status = "Done"
    def error(self):
        self.status = "Error"

    def get_long_data(self):
        return {"Owner":self.owner_user,
                "Program":self.program_name,
                "Version":self.program_version,
                "TaskNumber":self.task_number,
                "Status":self.status,
                "AssignedWorker":self.assigned_worker}

class Monitor():

    tasks = {}
    task_queue_list = []
    tasks_done_list = []
    notify_all_monitors_callback = None

    @staticmethod
    def initialize(notify_all_monitors_callback_a):
        global notify_all_monitors_callback
        notify_all_monitors_callback = notify_all_monitors_callback_a

    @staticmethod
    def new_task(task_key, program_key):
        if Monitor.tasks.get(task_key) is None:
            owner_user, program_name, program_version = program_key.split("+")
            task = Task(task_key, owner_user, program_name, program_version)
            Monitor.tasks[task_key] = task
            Monitor.task_queue_list.append(task)
        else:
            Monitor.tasks.get(task_key).requeue()

        notify_all_monitors_callback()

    @staticmethod
    def assign_task(task_key, worker):
        task = Monitor.tasks[task_key]
        task.assign(worker)

        notify_all_monitors_callback()

    @staticmethod
    def requeue_task(task_key):
        Monitor.tasks[task_key].requeue()
        notify_all_monitors_callback()

    @staticmethod
    def task_done(task_key):
        task = Monitor.tasks[task_key]
        task.done()
        Monitor.tasks_done_list.append(task)
        Monitor.task_queue_list.remove(task)
        notify_all_monitors_callback()

    @staticmethod
    def task_failed(task_key):
        task = Monitor.tasks[task_key]
        task.error()
        Monitor.tasks_done_list.append(task)
        Monitor.task_queue_list.remove(task)
        notify_all_monitors_callback()

    @staticmethod
    def get_task_list():
        r = []
        for task in Monitor.task_queue_list:
            r.append(task.get_long_data())
        return r

    @staticmethod
    def get_tasks_done_list():
        r = []
        for task in Monitor.tasks_done_list:
            r.append(task.get_long_data())
        r.reverse()
        return r

class Worker():
    counter = 0
    def __init__(self, attributes):

        if not dkvs.attribute.are_valid(attributes):
            raise InvalidWorkerAttrbute()

        self.token          = str(uuid.uuid1())
        self.websocket      = None
        self.assigned_tasks = []
        self.attributes     = attributes
        self.worker_number = Worker.counter
        self.ip = ""
        Worker.counter += 1

    def assign_task(self,program_key, task_key):
        try:
            self.assigned_tasks.append(task_key)
            self.websocket.send(program_key+":"+task_key)
            Monitor.assign_task(task_key, self.worker_number)
        except:
            raise WorkerCommunicationError()

    def remove_task(self, task_key):
        self.assigned_tasks.remove(task_key)

    def disconnected(self):
        self.websocket.close()
        try:
            self.websocket.close()
            print "closing websocket"
        except:
            pass
            #raise WorkerCommunicationError()

    def get_unfinished_tasks(self):
        return self.assigned_tasks