Source

mycloud / src / mycloud / worker.py

#!/usr/bin/env python

__doc__ = '''Worker for executing cluster tasks.'''

from cloud.serialization import cloudpickle
import cPickle
import os
import sys
import traceback

def log(*args):
  print >> sys.stderr, ' '.join([str(a) for a in args])

def execute_task():
  '''Execute a function and it's arguments, as read from stdin.'''

  # capture stdin and stdout in case user code attempts to access them
  input = sys.stdin
  output = sys.stdout

  sys.stdin = open('/dev/null', 'r')
  sys.stdout = open('/dev/null', 'w')

  log('Task execution start.')
  try:
    log('Loading function and arguments.')
    f, args = cPickle.load(input)
    log('Done.  Executing function....')
    result = f(*args)
    log('Done. Returning result.')
    cloudpickle.dump(result, output)
    log('Done!')
  except:
    log('Exception!', traceback.print_exc())
    cloudpickle.dump(Exception(traceback.format_exc()), output)
  log('Task execution finished.')


if __name__ == '__main__':
  execute_task()