Source

mycloud / src / mycloud / util.py

Full commit
#!/usr/bin/env python

from SimpleXMLRPCServer import SimpleXMLRPCServer
from SocketServer import UDPServer
from cloud.serialization import cloudpickle
import cPickle
import logging
import os
import socket
import struct
import tempfile
import time
import traceback
import types
import xmlrpclib

def create_tempfile(dir, suffix):
  os.system("mkdir -p '%s'" % dir)
  return tempfile.NamedTemporaryFile(dir=dir, suffix=suffix)

class LoggingServer(UDPServer):
  log_output = ""

  def __init__(self, cluster):
    host = '0.0.0.0'
    port = logging.handlers.DEFAULT_UDP_LOGGING_PORT

    UDPServer.__init__(self, (host, port), None)
    self.timeout = 0.1
    self.cluster = cluster

    # for each distinct host, keep track of the last message sent
    self.message_map = {}

  def server_bind(self):
    logging.info('LoggingServer binding to address %s', self.server_address)
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    UDPServer.server_bind(self)

  def finish_request(self, request, client_address):
    packet, socket = request

    rlen = struct.unpack('>L', packet[:4])[0]

    if len(packet) != rlen + 4:
      logging.error('Received invalid logging packet. %s %s',
                    len(packet), rlen)

    record = logging.makeLogRecord(cPickle.loads(packet[4:]))
    srchost = client_address[0]

    self.message_map[client_address] = record

    if record.exc_info:
      self.cluster.report_exception(record.exc_info)
#      logging.info('Exception from %s.', srchost)
    else:
      record.msg = 'Remote(%s) -- ' % srchost + record.msg
#      logging.getLogger().handle(record)


def to_tuple(arglist):
  for i, args in enumerate(arglist):
    if not isinstance(args, types.TupleType):
      arglist[i] = (args,)

  return arglist

def find_open_port():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.bind(("", 0))
  s.listen(1)
  port = s.getsockname()[1]
  s.close()
  return port

class RemoteException(object):
  def __init__(self, type, value, tb):
    self.type = type
    self.value = value
    self.tb = traceback.format_exc(tb)

class XMLServer(SimpleXMLRPCServer):
  def __init__(self, *args, **kw):
    SimpleXMLRPCServer.__init__(self, *args, **kw)

  def _dispatch(self, method, params):
    try:
      return getattr(self, method)(*params)
    except:
      logging.exception('Error during dispatch!')
      return xmlrpclib.Fault('Error while invoking method.',
                             '\n'.join(traceback.format_exc()))

  def server_bind(self):
    logging.info('Binding to address %s', self.server_address)
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    SimpleXMLRPCServer.server_bind(self)


class ProxyServer(XMLServer):
  def __init__(self):
    self.wrapped_objects = {}
    XMLServer.__init__(self, ('0.0.0.0', find_open_port()))

  def wrap(self, obj):
    self.wrapped_objects[id(obj)] = obj
    logging.info('Wrapped id %s', id(obj))
    return ProxyObject(socket.gethostname(), self.server_address[1], id(obj))

  def invoke(self, objid, method, *args, **kw):
    try:
      logging.debug('Invoking object method...')
      result = getattr(self.wrapped_objects[objid], method)(*args, **kw)
      logging.debug('Success.')
      return xmlrpclib.Binary(cloudpickle.dumps(result))
    except:
      logging.exception('Error during invocation!')
      return xmlrpclib.Fault('Error while invoking method.',
                             '\n'.join(traceback.format_exc()))


class ProxyObject(object):
  def __init__(self, host, port, objid):
    self.host = host
    self.port = port
    self.objid = objid
    self.server = None

  def get_server(self):
    if self.server is None:
#      logging.info('Connecting to %s %d', self.host, self.port)
      self.server = xmlrpclib.ServerProxy('http://%s:%d' % (self.host, self.port))
#      logging.info('Connection established to %s %d', self.host, self.port)
    return self.server

  def invoke(self, method, *args, **kw):
    for i in range(10):
      try:
        result = self.get_server().invoke(self.objid, method, *args, **kw)
        return cPickle.loads(result.data)
      except:
        logging.exception('Failed to invoke remote method %s; trying again.' % method)
        time.sleep(5)
    raise Exception('Failed to invoke remote method %s on %s' % (method, self.host))