Source

mycloud / src / mycloud / worker.py

Full commit
#!/usr/bin/env python
from cloud.serialization import cloudpickle
import cPickle
import cStringIO
import logging
import mycloud.thread
import mycloud.util
import select
import socket
import sys
import threading
import xmlrpclib

mycloud.thread.init()

__doc__ = '''Worker for executing cluster tasks.'''

logging.basicConfig(stream=sys.stderr,
                      format='%(asctime)s %(funcName)s %(message)s',
                      level=logging.INFO)

class Worker(object):
  def __init__(self, host, port):
    self.host = host
    self.port = port
    self.wrapped_objects = {}

  def execute_task(self, pickled):
    f, args, kw = cPickle.loads(pickled.data)
    logging.info('Executing task %s %s %s', f, args, kw)
    result = f(*args, **kw)
    dump = cloudpickle.dumps(result)
#    logging.info('Got result!')
    return xmlrpclib.Binary(dump)

  def hello(self):
    return 'alive'

  def wrap(self, obj):
    self.wrapped_objects[id(obj)] = obj
    return id(obj)

  def invoke(self, objid, method, *args, **kw):
    #logging.info('Invoking %s %s %s %s',
    #             self.wrapped_objects[objid], method, args, kw)
    return xmlrpclib.Binary(
             cloudpickle.dumps(
               getattr(self.wrapped_objects[objid], method)(*args, **kw)))

def dump_stderr(src, dst):
  while 1:
    data = src.get_value()
    src.truncate()
    dst.write(data)
    mycloud.thread.sleep(1)


if __name__ == '__main__':
  # Open a server on an open port, and inform our caller
  old_stderr = sys.stderr
  sys.stderr = cStringIO.StringIO()

  stderr_log = threading.Thread(target=dump_stderr, args=(sys.stderr, old_stderr))
  stderr_log.setDaemon(True)
  stderr_log.start()

  myport = mycloud.util.find_open_port()
  xmlserver = mycloud.util.XMLServer(('0.0.0.0', myport), allow_none=True)
  xmlserver.timeout = 1

  worker = Worker(socket.gethostname(), myport)
  mycloud.util.WORKER = worker

  print myport
  sys.stdout.flush()

  xmlserver.register_function(worker.execute_task, 'execute_task')
  xmlserver.register_function(worker.invoke, 'invoke')
  xmlserver.register_function(worker.hello, 'hello')

  # handle requests until our stdout is closed - (our controller shutdown or crashed)
  while 1:
    try:
      xmlserver.handle_request()
    except:
      logging.info('Error handling request!!!', exc_info=1)