xnet-hacks / xnet / slave.py

import sys
import json
import eventlet
import eventlet.wsgi
import eventlet.greenthread as thread
from eventlet.green import subprocess
from eventlet import event
from xnet import defaults
from xnet.xjob import XJobSpec, XResult, XJobInfo
from xnet.conn import ServerConnection
from xnet.stream import StreamHandler
# maybe these two should be put somewhere else
from xnet.helpers import (
    log, json_response, text_response,
    json_encode
)

from flask import Flask, request

app = Flask(__name__)
app.debug = True


class SlaveStreamHandler(StreamHandler):
    def __init__(self):
        super(SlaveStreamHandler, self).__init__()

    def prep(self):
        return {'conn': ServerConnection()}

    def push_func(self, xjobid, stdout, token, prep):
        conn = prep['conn']
        data = json_encode({xjobid: stdout})
        response = conn.PUT("/putpartialstdout", data,
                            headers={'content-range': token})
        if not response.read() == "Stored.":
            # handle errors
            pass

def pullxjob(conn):
    log("pullxjob: initiating request")
    try:
        response = conn.POST("/pull")
    except:
        log("[WARN] server %s does not seem to be online" % conn.hostspec)
        return None
    content = response.read()
    data = json.loads(content)
    xjobid = data['xjobid']
    if xjobid == "no":
        return None
    xjobspec = XJobSpec.from_json(data['spec'])
    xjobspec.xjobid = xjobid
    return xjobspec

def xjobfinished(conn, result):
    log("submitting result %s", result)
    data = json_encode(result)
    response = conn.POST("/result", data)
    ack = response.read()
    log("server accepted, says: %s", ack)
    return ack

@app.route("/listactive", methods=['GET'])
def listactive():
    return json_encode(app.streamh.xjob_infos)

@app.route("/observexjob", methods=['GET'])
def observexjob():
    xjobid = request.args.get('xjobid', type=int)
    xjobinfo = app.streamh.xjob_infos.get(xjobid)
    if xjobinfo:
        return text_response(app, xjobinfo.stdout)
    else:
        return text_response(app, "job with id %s is not active" % xjobid)

class ProcessManager:
    def __init__(self):
        self.active = {}
        self.stdinqueue = {}

    def start(self, xjobspec):
        args = xjobspec.args
        popen = subprocess.Popen(args, stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
        xjobinfo = XJobInfo(xjobspec.xjobid)
        self.active[xjobspec.xjobid] = popen
        self.register(xjobinfo)
        self.observe(xjobspec.xjobid)

    def register(self, xjobinfo):
        # conn is no global anymore and might be useful as instance variable
        conn = ServerConnection()
        response = conn.POST("/registerxjob", json_encode(xjobinfo))
        log("registered with server and got response: %s", response.read())
        return response

    def observe(self, xjobid, spawn=True):
        if spawn:
            worker = thread.spawn(self.observe, xjobid, spawn=False)
            worker.link(self.handle_worker_exit)
        else:
            worker = thread.getcurrent()
            app.streamh.add(xjobid, worker)
            app.streamh.start_push(xjobid)
            proc = self.getpopen(xjobid)
            while True:
                stdout = proc.stdout.readline()
                if self.stdinqueue.get(xjobid):
                    stdin = self.stdinqueue[xjobid][0]
                    proc.stdin.write(stdin)
                if not stdout:
                    retcode = self.postresult(xjobid)
                    return retcode
                else:
                    app.streamh.push(xjobid, stdout)
                thread.sleep(0.01)

    def handle_worker_exit(self, gt, arglist=[]):
        retcode = gt.wait()
        log("Thread has exited with return code %s" % retcode)

    def getpopen(self, xjobid):
        if xjobid not in self.active:
            raise ValueError(xjobid)
        return self.active[xjobid]

    def postresult(self, xjobid):
        popen = self.getpopen(xjobid)
        remaining, err = popen.communicate()
        out = "".join(app.streamh.xjob_infos[xjobid].stdout.values()) + remaining
        retcode = popen.wait()
        del self.active[xjobid]
        app.streamh.terminate(xjobid)
        result = XResult(xjobid=xjobid,
                         out=out, err=err, retcode=retcode)
        conn = ServerConnection()
        xjobfinished(conn, result)
        return retcode

def poll_loop():
    while True:
        if len(app.pm.active) < defaults.client_max_procs:
            conn = ServerConnection()
            xjobspec = pullxjob(conn)
            if not xjobspec:
                log("no xjobs available, sleeping a bit")
                eventlet.sleep(1.0)
                continue
            log("got xjob description %s", repr(xjobspec))
            app.pm.start(xjobspec)
        eventlet.sleep(0.5)

app.pm = ProcessManager()
app.streamh = SlaveStreamHandler()

def main(args=None):
    if args is None:
        args = sys.argv[1:]
    
    thread.spawn(poll_loop)

    sock = eventlet.listen(('', defaults.client_port))
    eventlet.wsgi.server(sock, app)

if __name__ == '__main__':
    main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.