Source

mycloud / src / mycloud / util.py

Full commit
#!/usr/bin/env python

import SimpleXMLRPCServer
import cPickle
import logging
import mycloud.thread
import socket
import sys
import traceback
import types
import xmlrpclib
from SocketServer import ThreadingMixIn

class StreamLogger(object):
  '''Read lines from a file object in a separate thread.
  
  These are then logged on the local host with a given prefix.'''
  def __init__(self, prefix, buffer=True):
    self.prefix = prefix
    self.buffer = buffer
    self.lines = []

  def start(self, stream):
    self.thread = mycloud.thread.spawn(self.run, stream)

  def run(self, stream):
    while 1:
      line = stream.readline()
      if not line:
        break

      if not self.buffer:
        print >> sys.stderr, self.prefix + ' --- ' + line.strip()

      self.lines.append(line.strip())

  def dump(self):
    return (self.prefix + ' --- ' +
            ('\n' + self.prefix + ' --- ').join(self.lines))

  def join(self):
    self.thread.wait()


def to_tuple(arglist):
  for i, args in enumerate(arglist):
    if not isinstance(args, types.TupleType):
      arglist[i] = (args,)

  return arglist

def find_open_port():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.bind(("", 0))
  s.listen(1)
  port = s.getsockname()[1]
  s.close()
  return port

class RemoteException(object):
  def __init__(self, type, value, tb):
    self.type = type
    self.value = value
    self.tb = traceback.format_exc(tb)

class XMLServer(ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
  def __init__(self, *args, **kw):
    SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, *args, **kw)

  def server_bind(self):
    logging.info('Binding to address %s', self.server_address)
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)

# reference to the worker being used
WORKER = None

class ClientProxy(object):
  def __init__(self, host, port, objid):
    self.host = host
    self.port = port
    self.objid = objid
    self.server = None

  def get_server(self):
    logging.info('Connecting to %s %d', self.host, self.port)
    if not self.server:
      self.server = xmlrpclib.ServerProxy('http://%s:%d' % (self.host, self.port),
                                          allow_none=True)
    logging.info('Connection established to %s %d', self.host, self.port)
    return self.server

  def invoke(self, method, *args, **kw):
    return cPickle.loads(
             self.get_server().invoke(self.objid, method, *args, **kw).data)

def Proxy(obj):
  key = WORKER.wrap(obj)
  logging.info('Wrapped id %s', key)
  return ClientProxy(WORKER.host, WORKER.port, key)