xnet-hacks / xnet / server.py

"""
xnet server prototype
"""

import eventlet.wsgi
import json
import eventlet.greenthread as thread
from xnet import defaults
from xnet.xjob import (
    XJobStorage, XJobSpec,
    XResult, XResultStorage,
    XJobInfo,
)
from xnet.stream import StreamHandler
from xnet.helpers import (
    log, json_response, text_response,
    json_encode
)

from flask import Flask, request

app = Flask(__name__)
app.xjobstorage = XJobStorage()
app.resultstorage = XResultStorage()
app.streamh = StreamHandler()
app.debug = True
app.debug_log_format = "[server:%(levelname)s] %(message)s"
   
@app.route("/", methods=["POST"])
def submitxjob():
    xjobdata = request.data
    xjobspecdata = json.loads(xjobdata)
    spec = XJobSpec(xjobspecdata['args'])
    xjobid = app.xjobstorage.enqueue(spec)
    app.logger.info("submitxjob: added xjob id=%s" % xjobid)
    return json_response(app, xjobid=xjobid)

@app.route("/registerxjob", methods=["POST"])
def registerxjob():
    xjobdata = json.loads(request.data)
    jobinfo = XJobInfo.from_json(xjobdata)
    try:
        app.xjobstorage.register(jobinfo)
        app.streamh.add(jobinfo.xjobid)
        response = "registered job with id %s" % jobinfo.xjobid
        return text_response(app, response)
    except KeyError:
        response = "job with id %s already registered" % jobinfo.xjobid
        return text_response(app, response)

@app.route("/", methods=["GET"])
def listxjobs():
    xjobs = {
        'queued': app.xjobstorage.queued,
        'pulled': app.xjobstorage.pulled,
        'active': app.xjobstorage.active,
        'finished': app.xjobstorage.finished,
    }
    return json_response(app, **xjobs)

@app.route("/pull", methods=["POST"])
def pullxjob():
    xjobid = app.xjobstorage.pull()
    if xjobid is not None:
        app.logger.info("pullxjobs: popped xjob id=%s" % xjobid)
        xjob = app.xjobstorage.get(xjobid)
    else:
        app.logger.info("pullxjobs: no xjobs available")
        return json_response(app, xjobid="no")
    return json_response(app, xjobid=xjobid, spec=xjob)

@app.route("/result", methods=['POST'])
def xjobfinished(): 
    resultdata = json.loads(request.data)
    result = XResult.from_json(resultdata)
    xresultid = app.resultstorage.add(result)
    app.xjobstorage.finish(xjobid=result.xjobid, xresultid=xresultid)
    app.streamh.terminate(int(result.xjobid))
    app.logger.info("added result %s" % result)
    return text_response(app, "Thanks.")

@app.route("/getstdout/<int:xjobid>", methods=['GET'])
def getstdout(xjobid):
    stdout = app.streamh.get_stdout(xjobid)
    if stdout is None:
        return text_response(app, "Job with id %s is not active" % xjobid)
    else:
        try:
            req_token = int(request.headers['range'])
        except KeyError:
            return json_response(app, xjobid=xjobid, stdout=stdout)

        if max(stdout.keys()) <= req_token:
            evt = app.streamh.get_current_event(xjobid)
            while True:
                if evt.ready():
                    stdout, token, evt = evt.wait()
                    if app.streamh.isterminator(stdout):
                        return text_response(app, ("Job with id %s is not"
                                                   "active" % xjobid))
                    if token > req_token:
                        return json_response(app, xjobid=xjobid,
                                             stdout={token: stdout})
                thread.sleep(0.1)
        else:
            response = {}
            for key, value in stdout.iteritems():
                if key > req_token:
                    response[key] = value.rstrip('\n')
        return json_response(app, xjobid=xjobid, stdout=response)

@app.route("/getresult/<int:xjobid>", methods=['GET'])
def getresult(xjobid):
    result = app.resultstorage.get(xjobid=xjobid)
    app.logger.info("returning result %s" % result)
    xjobspec = app.xjobstorage.get(xjobid=xjobid)
    return json_encode({'result': result, 'xjobspec': xjobspec})

@app.route("/putpartialstdout", methods=['PUT'])
def putpartial():
    s_token = request.headers['content-range']
    token = int(s_token)
    data = json.loads(request.data)
    (xjobid,) = data.keys()
    if not app.streamh.isactive(int(xjobid)):
        return text_response(app, ("Job is not active,"
                                   "registration must have failed "
                                   "or the job has completed"))
    else:
        try:
            stdout = data[xjobid]
            app.streamh.push(int(xjobid), stdout, explicit_token=token)
        except KeyError:
            return text_response(app, "Segment was already submitted")
        return text_response(app, "Stored.")

def main():
    ""
    sock = eventlet.listen(('', defaults.server_port))
    eventlet.wsgi.server(sock, app)

if __name__ == '__main__':
    main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.