Ernesto Menéndez avatar Ernesto Menéndez committed 5c5b177 Merge

merge juanjose into default

Comments (0)

Files changed (15)

nxweb/apps/admin/__init__.py

 from nxweb.apps.admin.security import groupfinder
 from nxweb.utils.ws import WebService
 
+#_wmanws_config = {}
+#def initialize(wmanws_config):
+    #_wmanws_config = wmanws_config
+    #print repr(wmanws_config)
+
+
 def make_wsgi_app(**settings):
     """ This function returns a Pyramid WSGI application.
     """
+    global _wmanws_config
     authentication_policy = AuthTktAuthenticationPolicy(
                                 settings['auth_policy_secret'],
                                callback=groupfinder,
                            )
     authorization_policy = ACLAuthorizationPolicy()
-
     config = Configurator(settings=settings,
         root_factory='nxweb.apps.admin.models.RootFactory')
 
     project = ws.make_resource('project','/project')
     project.get('nxweb.apps.admin.views.project.get')
 
+    monitor = ws.make_resource('monitor','/monitor')
+    monitor.get('nxweb.apps.admin.views.monitor.get_wmanws_settings')
+
     #Carpeta de los html de el frontend
     directory = os.path.abspath(os.path.dirname(__file__) + "/html/")
     config.add_static_view(name='/', path=directory)

nxweb/apps/admin/html/index.html

     <script src="js/program-controller.js"></script>
     <script src="js/pkey-controller.js"></script>
     <script src="js/project-controller.js"></script>
+    <script src="js/monitor-controller.js"></script>
 </head>
 <body>
 
                 <li><a href ng-href="/#/pkeys" ng-show="menu!=null && menu.indexOf('pkey') != -1">key List</a></li>
                 <li><a href ng-href="/#/programs" ng-show="menu!=null && menu.indexOf('program') != -1">Program List</a></li>
                 <li><a href ng-href="/#/project" ng-show="menu!=null && menu.indexOf('project') != -1">Project Config</a></li>
+                <li><a href ng-href="/#/monitor">Monitor</a></li>
             </ul>
 
             <script type="text/javascript">

nxweb/apps/admin/html/js/app.js

       when('/pkey/:key', {templateUrl: 'partials/pkey.html',controller: PkeyEditCtrl}).
       when('/pkeys', {templateUrl: 'partials/pkey-list.html',controller: PkeyListCtrl}).
       when('/project', {templateUrl: 'partials/project.html',controller: ProjectCtrl}).
+      when('/monitor', {templateUrl: 'partials/monitor.html',controller: MonitorCtrl}).
       otherwise({redirectTo: '/login'});
 }]);
 

nxweb/apps/admin/html/js/monitor-controller.js

+function MonitorCtrl($scope, $location, Monitor) {
+
+    var workerDataSource = new kendo.data.DataSource({
+        pageSize: 10,
+        // describe the result format
+        schema: {
+            // the data which the data source will be bound to is in the "results" field
+            data: "data",
+            total: "total"
+        }
+    });
+
+    $("#worker_grid").kendoGrid({
+    dataSource: workerDataSource,
+    autoBind: false,
+    pageable: {
+        pageSizes: [5, 10, 20]
+    },
+    sortable: true,
+    columns: [
+        {
+            field: "worker_id",
+            title: "Worker Id"
+        },
+        {
+            field: "worker_ip",
+            title: "Worker IP"
+        },
+        {
+            field: "assigned_tasks",
+            title: "Assigned Tasks"
+
+        }]
+    });
+
+    var task_queue_DataSource = new kendo.data.DataSource({
+        pageSize: 5,
+        // describe the result format
+        schema: {
+            // the data which the data source will be bound to is in the "results" field
+            data: "data",
+            total: "total"
+        }
+    });
+
+    $("#task_queue_grid").kendoGrid({
+    dataSource: task_queue_DataSource,
+    autoBind: false,
+    pageable: {
+        pageSizes: [5, 10, 20]
+    },
+    sortable: true,
+    columns: [
+        {
+            field: "Owner"
+        },
+        {
+            field: "Program"
+        },
+        {
+            field: "Version"
+        },
+        {
+            field: "TaskNumber"
+        },
+        {
+            field: "Status"
+        },
+        {
+            field: "AssignedWorker"
+        }]
+    });
+
+    var tasks_done_DataSource = new kendo.data.DataSource({
+        pageSize: 5,
+        // describe the result format
+        schema: {
+            // the data which the data source will be bound to is in the "results" field
+            data: "data",
+            total: "total"
+        }
+    });
+
+    $("#tasks_done_grid").kendoGrid({
+    dataSource: tasks_done_DataSource,
+    autoBind: false,
+    pageable: {
+        pageSizes: [5, 10, 20]
+    },
+    sortable: true,
+    columns: [
+        {
+            field: "Owner"
+        },
+        {
+            field: "Program"
+        },
+        {
+            field: "Version"
+        },
+        {
+            field: "TaskNumber"
+        },
+        {
+            field: "Status"
+        },
+        {
+            field: "AssignedWorker"
+        }]
+    });
+
+    Monitor.get({query:"wmanws_config"},function(resp, getHeaders) {
+        wmanws_host = resp.data.host;
+        wmanws_port = resp.data.port;
+        wmanws_uri = "ws://"+wmanws_host+":"+wmanws_port;
+        websocket = null;
+        interval = null;
+        connect();
+
+        function connect() {
+            clearInterval(interval);
+            websocket = new WebSocket(wmanws_uri);
+            websocket.onopen = function(evt) { onOpen(evt) };
+            websocket.onclose = function(evt) { onClose(evt) };
+            websocket.onmessage = function(evt) { onMessage(evt) };
+            websocket.onerror = function(evt) { onError(evt) };
+        }
+
+        function onOpen(evt) {
+            websocket.send('"monitor"');
+        }
+
+        function onClose(evt) {
+//             connect();
+            interval = setInterval(function(){connect()},1000);
+            workerDataSource.data([]);
+            task_queue_DataSource.data([]);
+            tasks_done_DataSource.data([]);
+        }
+
+        function onMessage(evt) {
+            monitor = eval('(' + evt.data + ')')
+            workerDataSource.data(monitor.worker_monitor);
+            task_queue_DataSource.data(monitor.task_queue);
+            tasks_done_DataSource.data(monitor.tasks_done);
+        }
+
+        function onError(evt) {}
+
+    })
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

nxweb/apps/admin/html/js/services.js

     factory('Program', function($resource){ return $resource('/program', {}, methods); }).
     factory('Pkey', function($resource){ return $resource('/pkey', {}, methods); }).
     factory('Pkeys', function($resource){ return $resource('/pkeys', {}, methods); }).
-    factory('Project', function($resource){ return $resource('/project', {}, methods); });
+    factory('Project', function($resource){ return $resource('/project', {}, methods); }).
+    factory('Monitor', function($resource){ return $resource('/monitor', {}, methods); });

nxweb/apps/admin/html/partials/monitor.html

+<h3>Worker List</h3>
+<div id="worker_grid"></div>
+<h3>Task Queue</h3>
+<div id="task_queue_grid"></div>
+<h3>Tasks Done</h3>
+<div id="tasks_done_grid"></div>
Add a comment to this file

nxweb/apps/admin/html/public/blender_test.py.zip

Binary file modified.

nxweb/apps/admin/html/window.html

     <script src="js/program-controller.js"></script>
     <script src="js/pkey-controller.js"></script>
     <script src="js/project-controller.js"></script>
+    <script src="js/monitor-controller.js"></script>
 </head>
 <body>
 

nxweb/apps/admin/views/monitor.py

+from nxweb.utils.ws.response import (ok, error)
+
+def get_wmanws_settings(req):
+    wmanws_config = req.registry.settings.get('wmanws_config')
+    return ok(req, data=wmanws_config)

nxweb/apps/wman/__init__.py

     config = Configurator(settings=settings)
     config.include('nxweb.utils.ws')
     ws = WebService(config)
-    
+
     session = ws.make_resource('session','/session')
     session.put('nxweb.apps.wman.views.session_put')
 
     new_task = ws.make_resource('new_task','/new_task')
     new_task.get('nxweb.apps.wman.views.new_task')
-    
+
+    monitor = ws.make_resource('monitor','/monitor')
+    monitor.get('nxweb.apps.wman.views.get_monitor_data')
+
+
     return config.make_wsgi_app()

nxweb/apps/wman/views/__init__.py

     wmanws.get_worker_manager().new_task("1+PrimerPrograma+1.0", task_key)
     #f.close()
 
-    return ok(req)
+    return ok(req)
+
+def get_monitor_data(req):
+
+    query = req.params.get("query")
+    if query is None:
+        return error(req)
+
+    wman = wmanws.get_worker_manager()
+
+    if query == "worker":
+        monitor_data = wman.get_worker_monitor()
+    elif query == "task_queue":
+        monitor_data = wman.get_task_queue_monitor()
+    elif query == "tasks_done":
+        monitor_data = wman.get_finished_tasks_monitor()
+
+    return ok(req, data=monitor_data)

nxweb/apps/wmanws/__init__.py

 import copy
 import nxweb.data.dkvs.attribute as attribute_dkvs
 import nxweb.data.dkvs.task as task_dkvs
+import nxweb.data.dkvs.pubkey as pubkey_dkvs
 import uuid
 from Queue import Queue
 from exception import *
 
 _worker_manager = None
+_monitor_websockets = []
 def get_worker_manager():
     global _worker_manager
     if _worker_manager is None:
 
 
 def accept_ws_connection(environ, start_response):
+    global _monitor_websockets
     worker_manager = get_worker_manager()
 
     debug("New Websocket Connection")
     if websocket is None:
         return http_handler(environ, start_response)
 
-    debug("reading worker token")
+    debug("reading websocket message")
     worker_token = None
-    try:
-        worker_token = json.loads(websocket.receive())
-    except:
-        raise WorkerCommunicationError()
+    #try:
+    message = websocket.receive()
+    print message
+    message = json.loads(message)
+    #except:
+        #raise WorkerCommunicationError()
 
-    worker_manager.online(worker_token, websocket)
+    if message == "monitor":
+        debug("Initiating Monitor Connection "+str(len(_monitor_websockets)))
+        _monitor_websockets.append(websocket)
+        get_worker_manager().notify_all_monitors()
+        try:
+            websocket.receive()
+        except:
+            pass
+        websocket.close()
+        _monitor_websockets.remove(websocket)
+        debug("Closing Monitor Connection")
+    else:
+        worker_token = message
+        worker_manager.online(worker_token, websocket, environ.get('REMOTE_ADDR'))
+        debug("initiating worker connection")
+        while True:
+            message = None
+            try:
+                message = websocket.receive()
+            except:
+                raise WorkerCommunicationError()
 
-
-
-    while True:
-        message = None
-        try:
-            message = websocket.receive()
-        except:
-            raise WorkerCommunicationError()
-
-        if message is None:
-            debug("##connection closed by the worker")
-            worker_manager.offline(worker_token)
-            break
-        elif message in worker_manager.get_pending_tasks():
-            done_task_key = message
-            worker_manager.task_finished(done_task_key, worker_token)
-            continue
-        else:
-            debug("##Handle recieved message")
-            raise UnhandledWorkerMessage()
+            if message is None:
+                debug("##connection closed by the worker")
+                worker_manager.offline(worker_token)
+                break
+            elif message in worker_manager.get_pending_tasks():
+                done_task_key = message
+                worker_manager.task_finished(done_task_key, worker_token)
+                continue
+            else:
+                debug("##Handle recieved message")
+                raise UnhandledWorkerMessage()
 
 
 def debug(msg):
     print "WMANWS: " + msg
 
 class WorkerManager():
+
+    def notify_all_monitors(self):
+        global _monitor_websockets
+        data = {"worker_monitor": self.get_worker_monitor(),
+                "task_queue": Monitor.get_task_list(),
+                "tasks_done": Monitor.get_tasks_done_list()}
+        data = json.dumps(data)
+        for ws in _monitor_websockets:
+            ws.send(data)
+
+    def task_short_print(self, task_comp_key):
+        pubkey_key, task_key = task_comp_key.split("+")
+        task_metadata = task_dkvs.get_metadata(task_comp_key)
+
+
+        task = task_metadata['program_key'].split("+")
+        task.append(repr(task_metadata['counter']))
+        #[user] [program_name] [task_number]
+        return "-".join(task)
+
+
+    def get_worker_monitor(self):
+        monitor_data = []
+        for (worker_token, worker) in self.online_workers.iteritems():
+            task_monitor = []
+
+            for task_comp_key in worker.assigned_tasks:
+
+                task = self.task_short_print(task_comp_key)
+                task_monitor.append(task)
+            if len(task_monitor)==0:
+                task_monitor = ""
+            else:
+                task_monitor = task_monitor[0]
+            monitor_data.append({"worker_id":repr(worker.worker_number),"worker_ip":worker.ip,"assigned_tasks":task_monitor})
+        return monitor_data
+
+    def get_task_queue_monitor(self):
+        return Monitor.get_task_list()
+
+    def get_finished_tasks_monitor(self):
+        return Monitor.get_tasks_done_list()
+
     def __init__(self):
         self.online_workers = {}
         self.queued_workers = {}
         self.attribute_index = {}
         self.round_robin_index = 0
         self.task_queue = []
+        self.finished_tasks = []
         self.new_unassigned_task_index = {}
         self.total_num_task = 0
 
         self.queued_workers[worker_token] = worker
         return worker.token
 
-    def online(self, worker_token, websocket):
-
-        print "Task Queue size: %s" % len(self.task_queue)
-        print self.new_unassigned_task_index
+    def online(self, worker_token, websocket, ip_addr):
 
         if self.queued_workers.get(worker_token) is None:
             raise NonexistentWorker()
         worker = copy.copy(self.queued_workers[worker_token])
         del self.queued_workers[worker_token]
         worker.websocket = websocket
+        worker.ip = ip_addr
         self.online_workers[worker_token] = worker
 
 
                     else:
                         self.attribute_index[k][v].append(worker_token)
         self.worker_ready_for_new_task(worker_token)
+        self.notify_all_monitors()
 
 
     def offline(self, worker_token):
 
         for task_key in worker.get_unfinished_tasks():
             program_key = task_dkvs.get_metadata(task_key).get('program_key')
+            Monitor.requeue_task(task_key)
             self.new_task(program_key, task_key)
-            #self.new_unassigned_task(program_key, task_key, criteria)
+            task_dkvs.set_status_requeued(task_key)
             worker.remove_task(task_key)
 
         #for indexing pourposes
                  for v in values:
                      self.attribute_index[k][v].remove(worker_token)
         ##########
+        self.notify_all_monitors()
 
     def new_task(self, program_key, task_key):
         #Logica:
 
         self.task_queue.append(task_key)
         criteria = task_dkvs.get_metadata(task_key).get('worker_criteria')
+
+        Monitor.new_task(task_key, program_key)
+
         self.new_unassigned_task(program_key, task_key, criteria)
 
-        #keys = self.workers_with_all_criteria(criteria)
-        #for worker_token in keys:
-            #if self.online_workers.get(worker_token) is None:
-                #continue
-
-            #if len(self.online_workers.get(worker_token).assigned_tasks) <= 1:
-                #self.worker_ready_for_new_task(worker_token)
-                #break
         for worker_token, worker_obj in self.online_workers.iteritems():
-            if len(worker_obj.assigned_tasks) == 0:
-                print "#################################################################################"
+            if len(worker_obj.assigned_tasks) <= 0:
                 self.worker_ready_for_new_task(worker_token)
                 break
 
 
     def task_finished(self, task_key, worker_token):
+        #self.finished_tasks.append(WorkerManager.task_pretty_print(task_key))
         self.task_queue.remove(task_key)
 
         self.online_workers.get(worker_token).remove_task(task_key)
 
         self.worker_ready_for_new_task(worker_token)
+        Monitor.task_done(task_key)
 
 
     def worker_ready_for_new_task(self, worker_token):
     def get_pending_tasks(self):
         return self.task_queue
 
+class Task():
 
+    counter = 0
 
+    def __init__(self, task_key, owner_user, program_name, program_version):
+            #[user] [program_name] [program_version] [task_number] [status] [assigned_worker]
+        self.task_key = task_key
+        self.owner_user = owner_user
+        self.program_name = program_name
+        self.program_version = program_version
+        self.task_number = Task.counter
+        Task.counter += 1
+        self.status = "Queued"
+        self.assigned_worker = ""
 
+    def assign(self, worker):
+        self.assigned_worker = worker
+        if self.status == "Re-Queued":
+            self.status = "Re-Processing"
+        else:
+            self.status = "Processing"
 
+    def requeue(self):
+        self.status = "Re-Queued"
+        self.assigned_worker = ""
+
+    def done(self):
+        if self.status == "Re-Processing":
+            self.status = "Done (re-tried)"
+        else:
+            self.status = "Done"
+
+    def get_long_data(self):
+        return {"Owner":self.owner_user,
+                "Program":self.program_name,
+                "Version":self.program_version,
+                "TaskNumber":self.task_number,
+                "Status":self.status,
+                "AssignedWorker":self.assigned_worker}
+
+    #def get_short_data(self):
+        #return [self.owner_user,
+                #self.program_name,
+                #self.task_number]
+
+class Monitor():
+
+    tasks = {}
+    task_queue_list = []
+    tasks_done_list = []
+
+    @staticmethod
+    def new_task(task_key, program_key):
+        if Monitor.tasks.get(task_key) is None:
+            owner_user, program_name, program_version = program_key.split("+")
+            task = Task(task_key, owner_user, program_name, program_version)
+            Monitor.tasks[task_key] = task
+            Monitor.task_queue_list.append(task)
+        else:
+            Monitor.tasks.get(task_key).requeue()
+
+        get_worker_manager().notify_all_monitors()
+
+    @staticmethod
+    def assign_task(task_key, worker):
+        task = Monitor.tasks[task_key]
+        task.assign(worker)
+
+        get_worker_manager().notify_all_monitors()
+
+    @staticmethod
+    def requeue_task(task_key):
+        Monitor.tasks[task_key].requeue()
+        get_worker_manager().notify_all_monitors()
+
+    @staticmethod
+    def task_done(task_key):
+        task = Monitor.tasks[task_key]
+        task.done()
+        Monitor.tasks_done_list.append(task)
+        Monitor.task_queue_list.remove(task)
+        get_worker_manager().notify_all_monitors()
+
+    @staticmethod
+    def get_task_list():
+        r = []
+        for task in Monitor.task_queue_list:
+            r.append(task.get_long_data())
+        return r
+
+    @staticmethod
+    def get_tasks_done_list():
+        r = []
+        for task in Monitor.tasks_done_list:
+            r.append(task.get_long_data())
+        r.reverse()
+        return r
 
 class Worker():
+    counter = 0
     def __init__(self, attributes):
 
         if not attribute_dkvs.are_valid(attributes):
         self.websocket      = None
         self.assigned_tasks = []
         self.attributes     = attributes
+        self.worker_number = Worker.counter
+        self.ip = ""
+        Worker.counter += 1
 
     def assign_task(self,program_key, task_key):
         try:
             self.assigned_tasks.append(task_key)
             self.websocket.send(program_key+":"+task_key)
+            Monitor.assign_task(task_key, self.worker_number)
         except:
             raise WorkerCommunicationError()
 

nxweb/data/dkvs/task/__init__.py

     data = output.get(task_key)
     delete(task_key)    # talvez no sea muy buena idea eliminar todos los datos inmediatamente, buscar otra solucion
     return data
+
+def set_status_requeued(task_key):
+    metadata.set_status_requeued(task_key)

nxweb/data/dkvs/task/metadata.py

 
 _get_bucket = BUCKET_GETTERS[_bucket_name]
 
+counter_dic = {}
 
 def create(task_key, worker_criteria,program_key):
+    #pubkey_key+program_key
+    counter_key = task_key.split("+")[0]+"+"+program_key
+    counter = counter_dic.get(counter_key)
+    if counter is None:
+        counter = 0
+
     data = {
         "worker_criteria": worker_criteria,
         "assigned_worker_token": -1,
-        "program_key": program_key
+        "program_key": program_key,
+        "counter": counter,
+        "status": "queued"
     }
 
+    counter = counter + 1
+    counter_dic[counter_key]=counter
+
     _get_bucket().new(task_key, data=data).store()
 
     return task_key
 def delete(task_key):
     _get_bucket().get(task_key).delete()
 
+def set_status_requeued(task_key):
+    task = _get_bucket().get(task_key)
+    task_data = task.get_data()
+    task_data['status'] = "re-queued"
+    task.set_data(task_data)
+    task.store()
 
 def assign_worker(task_key, worker_token):
     task = _get_bucket().get(task_key)
         print "task_key = %s, worker_token = %s" % (task_key, worker_token)
 
     task_data['assigned_worker_token'] = worker_token
+
+    if task_data['status'] == "queued":
+        task_data['status'] = "in process"
+    else:
+        task_data['status'] = "retry in process"
     task.set_data(task_data)
     task.store()
 
     cache.initialize(**config['redis_cache'])
     dkvs.initialize(**config['riak'])
+    #admin.initialize(wmanws_config=config['apps']['wmanws'])
 
     for bucket_name in ['task_metadata', 'task_input_data_chunk', 'task_output_data_chunk']:
         print "Empty %s Bucket" % bucket_name
 
     # admin -------------------------------------------------------------------
     admin_conf = config['apps']['admin']
+    admin_conf['pyramid_configurator_settings'].update({"wmanws_config":config['apps']['wmanws']})
     admin_wsgi_app = admin.make_wsgi_app(**admin_conf['pyramid_configurator_settings'])
     admin_wsgi_server = WSGIServer((admin_conf['host'], admin_conf['port']), admin_wsgi_app)
     # -------------------------------------------------------------------------
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.