Source

nx-web / nxweb / apps / wmanws / __init__.py

Full commit
import json
from threading import Timer
from exception import *
from worker_manager import WorkerManager

"""Worker Manager Isntance Reference """
_worker_manager = None

"""List of all monitor websocket connections """
_monitor_websockets = []

_websocket_timer = {}

def get_worker_manager():
    """Initialize and returns reference to the Worker Manager Instance"""
    global _worker_manager
    if _worker_manager is None:
        _worker_manager = WorkerManager(notify_all_monitors)
    return _worker_manager

def notify_all_monitors():
    """This function sends all the "monitor data" to the monitor interface,
    through the websocket """
    global _monitor_websockets
    worker_manager = get_worker_manager()
    data = {"worker_monitor": worker_manager.get_worker_monitor(),
            "task_queue":     worker_manager.get_task_list_monitor(),
            "tasks_done":     worker_manager.get_tasks_done_list_monitor()}
    data = json.dumps(data)
    for ws in _monitor_websockets:
        ws.send(data)

def accept_ws_connection(environ, start_response):
    """Handles the websocket connection request"""
    global _monitor_websockets
    worker_manager = get_worker_manager()

    debug("New Websocket Connection")
    websocket = environ.get("wsgi.websocket")

    if websocket is None:
        return http_handler(environ, start_response)

    debug("reading websocket message")

    message = None
    try:
        message = websocket.receive()
        message = json.loads(message)
    except:
        websocket.close()
        return

    if message == "monitor":
        #Si recibimos el string monitor a travez del websocket
        #lo tratamos como la connection con un monitor
        debug("Initiating Monitor Connection "+str(len(_monitor_websockets)))
        _monitor_websockets.append(websocket)
        notify_all_monitors()
        try:
            websocket.receive()
        except:
            pass
        websocket.close()
        _monitor_websockets.remove(websocket)
        debug("Closing Monitor Connection")
    else:
        #Asumimos que el mensaje que nos mandan es el token del worker
        worker_token = message
        worker_manager.online(worker_token, websocket, environ.get('REMOTE_ADDR'))
        debug("initiating worker connection")

        _websocket_timer[worker_token] = Timer(5.0, worker_manager.offline,args=[worker_token])
        _websocket_timer[worker_token].start()

        while True:
            message = None
            try:
                message = websocket.receive()
            except:
                debug("##connection closed .. error on communication")
                if worker_manager.is_online(worker_token):
                    worker_manager.offline(worker_token)
                break

            if message is None:
                debug("##connection closed by the worker")
                if worker_manager.is_online(worker_token):
                    worker_manager.offline(worker_token)
                break
            elif message == "error":
                done_task_key = websocket.receive()
                debug("#finished_tasks with error#")
                worker_manager.task_finished_with_error(done_task_key, worker_token)
                continue
            elif message == "keepalive":
#                print "keep alive ######"
                _websocket_timer[worker_token].cancel()
                _websocket_timer[worker_token] = Timer(5.0, worker_manager.offline,args=[worker_token])
                _websocket_timer[worker_token].start()
            elif message in worker_manager.get_pending_tasks():
                done_task_key = message
                worker_manager.task_finished(done_task_key, worker_token)
                continue
            else:
                debug("##Handle recieved message")
                raise UnhandledWorkerMessage()

def http_handler(environ, start_response):
    if environ["PATH_INFO"].strip("/") == "version":
        start_response("200 OK", [])
        return ['wmanws/1.0']
    else:
        start_response("400 Bad Request", [])
        return ["WebSocket connection is expected here."]


def debug(msg):
    print "WMANWS: " + msg