Source

nx-web / nxweb / data / dkvs / task / __init__.py

import uuid

import nxweb.data.dkvs.attribute as attribute
import nxweb.data.dkvs.program as program

from nxweb.data.dkvs import compose_key
from nxweb.data.dkvs.exception import *

from . import input_data_chunk as input_
from . import output_data_chunk as output
from . import metadata


def create(pkid, program_key, worker_criteria, input_data_chunk):
    task_id = str(uuid.uuid4())
    task_key = compose_key(pkid, task_id)

    if program.get_metadata(program_key) is None:
        raise InvalidProgram()

    if not attribute.are_valid(worker_criteria):
        raise InvalidAttribute()

    metadata.create(task_key, worker_criteria, program_key)
    input_.create(task_key, input_data_chunk)

    return task_key


def delete(task_key):
    output.delete(task_key)
    input_.delete(task_key)
    metadata.delete(task_key)


def assign_worker(task_key, worker_token):
    metadata.assign_worker(task_key, worker_token)


def get_metadata(task_key):
    return metadata.get(task_key)


def get_input_data(task_key, worker_token):
    assigned_worker_token = metadata.get(task_key).get('assigned_worker_token')
    if assigned_worker_token != worker_token or worker_token is None:
        raise WorkerNotAssignedToTask()
    data = input_.get(task_key)
    return data


def write_output_data(task_key, worker_token, data):
    assigned_worker_token = metadata.get(task_key).get('assigned_worker_token')
    if assigned_worker_token != worker_token or worker_token is None:
        raise WorkerNotAssignedToTask()
    output.create(task_key, data)


def get_output_data(task_key):
    data = output.get(task_key)
    delete(task_key)    # talvez no sea muy buena idea eliminar todos los datos inmediatamente, buscar otra solucion
    return data

def set_status_requeued(task_key):
    metadata.set_status_requeued(task_key)

def set_error_status(task_key, worker_token):
    metadata.set_status_error(task_key)