Commits

Russell Power  committed 0d3cf0a

Better handling of status updates and remote logging;

Playing around with curses/tkinter interface for worker status.

  • Participants
  • Parent commits ceb6561

Comments (0)

Files changed (7)

     author="Russell Power",
     author_email="power@cs.nyu.edu",
     license="BSD",
-    version="0.22",
+    version="0.24",
     url="http://rjpower.org/mycloud",
     package_dir={ '' : 'src' },
     packages=[ 'mycloud' ],

File src/mycloud/__init__.py

 #!/usr/bin/env python
 
 import mycloud.cluster
+from mycloud import mapreduce
+from mycloud import resource
 
 Cluster = mycloud.cluster.Cluster

File src/mycloud/cluster.py

 #!/usr/bin/env python
 
-from cloud.serialization import cloudpickle
+try:
+  import Tkinter
+except:
+  Tkinter = None
+
+from cloud.serialization import cloudpickle, pickledebug
 import Queue
 import cPickle
+import collections
+import json
 import logging
 import mycloud.connections
 import mycloud.thread
 import sys
 import traceback
 import xmlrpclib
+import yaml
 
 mycloud.thread.init()
 
   '''A piece of work to be executed.'''
   def __init__(self, name, index, function, args, kw):
     self.idx = index
-    self.pickle = xmlrpclib.Binary(cloudpickle.dumps((function, args, kw)))
+    logging.info('Serializing %s %s %s', function, args, kw)
+    self.pickle = xmlrpclib.Binary(
+      pickledebug.dumps((function, args, kw)))
     self.result = None
     self.done = False
 
   def run(self, client):
     logging.info('Starting task %s on %s', self.idx, client)
     self.client = client
-    result_data = self.client.execute_task(self.pickle)
-    self.result = cPickle.loads(result_data.data)
-    self.done = True
-    logging.info('Task %d finished', self.idx)
-    return self.result
+    try:
+      result_data = self.client.execute_task(self.pickle)
+      self.result = cPickle.loads(result_data.data)
+      self.done = True
+      logging.info('Task %d finished', self.idx)
+      return self.result
+    except xmlrpclib.Fault, e:
+      raise Exception(e.faultString)
 
   def wait(self):
     while not self.result:
   
 A Server is created for each core on a machine, and executes tasks as
 machine resources become available.'''
-  def __init__(self, cluster, host, index):
-    self.cluster = cluster
+  def __init__(self, controller, host, index):
+    self.controller = controller
     self.host = host
     self.index = index
 
     return self.thread
 
   def client(self):
-    logging.info('Created proxy to %s:%d', self.host, self.port)
+    logging.debug('Created proxy to %s:%d', self.host, self.port)
     return xmlrpclib.ServerProxy('http://%s:%d' % (self.host, self.port))
 
   def _run_task(self, task):
       task.run(self.client())
     except:
       #logging.exception('Failed to run task')
-      self.cluster.report_exception(sys.exc_info())
+      self.controller.report_exception(sys.exc_info())
       self.stdin.close()
       self.stdout.close()
     finally:
     assert self.machines
     assert self.tmp_prefix
 
+    self.root_window = None
+    self.tk_thread = None
+    self.status_labels = {}
+
+#    if Tkinter:
+#      self.root_window = Tkinter.Tk()
+#      self.root_window.columnconfigure(0, minsize=200, pad=20)
+#      self.root_window.columnconfigure(1, minsize=800)
+#    else:
+    logging.info('Tkinter not available for status.', exc_info=1)
+
     self.start()
 
   def __del__(self):
   def report_exception(self, exc):
     self.exceptions.append(exc)
 
-  def log_exceptions(self):
-    for e in self.exceptions:
-      exc_dump = ['remote exception: ' + line
-               for line in traceback.format_exception(*e)]
-      logging.info('\n'.join(exc_dump))
+  def check_exceptions(self):
+    '''Check if any remote exceptions have been thrown.  Log locally and rethrow.
+
+If an exception is found, the controller is shutdown and all exceptions are reported
+prior to raising a ClusterException.'''
+    if self.exceptions:
+      mycloud.connections.SSH.shutdown()
+
+      counts = collections.defaultdict(int)
+
+      for e in self.exceptions:
+        exc_dump = '\n'.join(traceback.format_exception(*e))
+        counts[exc_dump] += 1
+
+      for exc_dump, count in sorted(counts.items(), key=lambda t: t[1]):
+        logging.info('Remote exception (occurred %d times):' % count)
+        logging.info('%s', '\nREMOTE:'.join(exc_dump.split('\n')))
+
+      raise ClusterException
 
   def start(self):
     self.log_server = mycloud.util.LoggingServer(self)
     random.shuffle(self.servers)
     logging.info('Started %d servers...', len(servers))
 
-  def input(self, type, pattern):
-    '''Return a cluster set of cluster inputs for the given pattern.'''
-    return mycloud.resource.input(type, pattern)
+  def show_status(self):
+    if not self.root_window:
+      return
 
-  def output(self, type, name, shards):
-    return mycloud.resource.output(type, name, shards)
+    try:
+      row = 0
+      formatter = logging.Formatter()
+      for hostport, rlog in sorted(self.log_server.message_map.items()):
+        if not hostport in self.status_labels:
+          self.status_labels[hostport] = (
+            Tkinter.Label(self.root_window),
+            Tkinter.Entry(self.root_window, width=100))
 
-  def show_status(self):
-    return
-    for (host, port), rlog in self.log_server.message_map.items():
-      print >> sys.stderr, '(%s, %s) -- %s' % (host, port, rlog.msg)
+        hp_label, msg_label = self.status_labels[hostport]
+        hp_label['text'] = '%s:%s' % hostport
+
+        msg = formatter.format(rlog)
+        msg = msg.replace('\n', ' ')
+
+        msg_label.delete(0, Tkinter.END)
+        msg_label.insert(0, msg)
+
+        hp_label.grid(row=row, column=0)
+        msg_label.grid(row=row, column=1)
+
+        row += 1
+    except:
+      logging.info('Failed to update status.', exc_info=1)
+
+    self.root_window.update()
 
   def map(self, f, arglist, name='worker'):
     assert len(arglist) > 0
     # this way we can stop early in case we encounter an exception
     try:
       while 1:
+        self.check_exceptions()
+        self.show_status()
+        mycloud.thread.sleep(1)
+
         for s in self.servers:
           if not s.ready:
             continue
           t = task_queue.get_nowait()
           s.start_task(t)
 
-        if self.exceptions:
-          self.log_exceptions()
-          raise ClusterException
-
-        mycloud.thread.sleep(0.1)
-        self.show_status()
-
     except Queue.Empty:
       pass
 
     for t in tasks:
       while not t.done:
-        if self.exceptions:
-          self.log_exceptions()
-          raise ClusterException
-        mycloud.thread.sleep(0.1)
+        self.show_status()
+        self.check_exceptions()
+        mycloud.thread.sleep(1)
 
     logging.info('Done.')
     return [t.result for t in tasks]

File src/mycloud/connections.py

     logging.info('Closing all SSH connections')
     for connection in SSH.connections.values():
       connection.close()
-
+    SSH.connections = {}
 
 class Local(object):
   @staticmethod

File src/mycloud/mapreduce.py

 import mycloud.merge
 import mycloud.thread
 import mycloud.util
+import threading
 import types
 import xmlrpclib
 
                num_mappers,
                num_reducers,
                map_buffer_size=100,
-               reduce_buffer_size=100e6):
+               reduce_buffer_size=50e6):
     self.mapper = mapper
     self.reducer = reducer
     self.tmp_prefix = tmp_prefix
       self.flush()
 
   def flush(self, final=False):
+    logging.info('Flushing...')
     for shard in range(self.num_reducers):
       shard_output = self.output_tmp[shard]
       if not final and not shard_output:
       mapper = self.mapper
 
     reader = self.input.reader()
-    for k, v in reader:
-      #logging.info('Read %s', k)
+    logger = mycloud.util.PeriodicLogger(period=5)
+    for count, (k, v) in enumerate(reader):
+      logger.info('(%5d) - Read: %s', count, k)
       mapper(k, v, self.output)
-      #logging.info('Mapped %s', k)
+      logger.info('(%5d) - Mapped %s', count, k)
     self.flush(final=True)
     logging.info('Map of %s finished.', self.input)
 
     if is_finished:
       self.maps_finished[mapper] = 1
 
-    self.buffer_size += len(block)
     for k, v in json.loads(block):
       self.buffer.append((k, v))
 
-    if self.buffer_size > self.reduce_buffer_size:
-      self.flush()
+    with self.lock:
+      self.buffer_size += len(block)
+      if self.buffer_size > self.reduce_buffer_size:
+        self.flush()
+      self.buffer_size = 0
 
   def flush(self):
     logging.info('Reducer flushing - %s', self.buffer_size)
     del bt
 
     self.map_tmp.append(tf)
-    self.buffer_size = 0
     logging.info('Flush finished to %s', tf.name)
 
   def start_server(self):
     logging.info('Starting server...')
+    self.lock = threading.Lock()
     self.proxy_server = mycloud.util.ProxyServer()
     self.serving_thread = mycloud.thread.spawn(self.proxy_server.serve_forever)
 
     while sum(self.maps_finished) != self.num_mappers:
       logging.info('Waiting for map data %d/%d',
                    sum(self.maps_finished), self.num_mappers)
-      mycloud.thread.sleep(0.01)
+      mycloud.thread.sleep(1)
 
     self.flush()
 
 
 
 class MapReduce(object):
-  def __init__(self, cluster, mapper, reducer, input, output):
-    self.cluster = cluster
+  def __init__(self, controller, mapper, reducer, input, output):
+    self.controller = controller
     self.mapper = mapper
     self.reducer = reducer
     self.input = input
                                reducer=self.reducer,
                                num_mappers=len(self.input),
                                num_reducers=len(self.output),
-                               tmp_prefix=self.cluster.tmp_prefix)
+                               tmp_prefix=self.controller.tmp_prefix)
                   for i in range(len(self.output)) ]
 
-      reduce_tasks = self.cluster.map(lambda r: r.start_server(), reducers)
+      reduce_tasks = self.controller.map(lambda r: r.start_server(), reducers)
 
       mappers = [MapHelper(index=i,
                            input=self.input[i],
                            reducer=self.reducer,
                            num_mappers=len(self.input),
                            num_reducers=len(self.output),
-                           tmp_prefix=self.cluster.tmp_prefix)
+                           tmp_prefix=self.controller.tmp_prefix)
                  for i in range(len(self.input)) ]
 
-      self.cluster.map(lambda m: m.run(), mappers)
+      self.controller.map(lambda m: m.run(), mappers)
 
       mycloud.thread.sleep(1)
 

File src/mycloud/util.py

 #!/usr/bin/env python
 
 from SimpleXMLRPCServer import SimpleXMLRPCServer
-from SocketServer import UDPServer
+from SocketServer import UDPServer, ThreadingMixIn
 from cloud.serialization import cloudpickle
 import cPickle
 import logging
+import mycloud.thread
 import os
 import socket
 import struct
 import types
 import xmlrpclib
 
+class PeriodicLogger(object):
+  def __init__(self, period):
+    self.last = 0
+    self.period = period
+
+  def info(self, msg, *args, **kw):
+    now = time.time()
+    if now - self.last < self.period:
+      return
+
+    logging.info(msg, *args, **kw)
+    self.last = now
+
 def create_tempfile(dir, suffix):
   os.system("mkdir -p '%s'" % dir)
   return tempfile.NamedTemporaryFile(dir=dir, suffix=suffix)
 
+class CursesHandler(logging.Handler):
+  def __init__(self, window):
+    logging.Handler.__init__(self)
+    self.window = window
+
+  def emit(self, record):
+    try:
+      for line in self.format(record).split('\n'):
+        self.window.addstr(line)
+        self.window.addch('\n')
+    except:
+      pass
+
+    self.window.refresh()
+
 class LoggingServer(UDPServer):
+  '''Listen for UDP log messages on the default port.
+  
+If a record containing an exception is found, report it to the controller.
+'''
   log_output = ""
 
-  def __init__(self, cluster):
+  def __init__(self, controller):
     host = '0.0.0.0'
     port = logging.handlers.DEFAULT_UDP_LOGGING_PORT
 
     UDPServer.__init__(self, (host, port), None)
     self.timeout = 0.1
-    self.cluster = cluster
+    self.controller = controller
 
     # for each distinct host, keep track of the last message sent
     self.message_map = {}
     self.message_map[client_address] = record
 
     if record.exc_info:
-      self.cluster.report_exception(record.exc_info)
+      self.controller.report_exception(record.exc_info)
 #      logging.info('Exception from %s.', srchost)
     else:
-      record.msg = 'Remote(%s) -- ' % srchost + record.msg
-#      logging.getLogger().handle(record)
+      logging.getLogger().handle(record)
 
 
 def to_tuple(arglist):
     self.value = value
     self.tb = traceback.format_exc(tb)
 
-class XMLServer(SimpleXMLRPCServer):
+class XMLServer(ThreadingMixIn, SimpleXMLRPCServer):
   def __init__(self, *args, **kw):
     SimpleXMLRPCServer.__init__(self, *args, **kw)
+    self.daemon_threads = True
 
   def _dispatch(self, method, params):
     try:
       return getattr(self, method)(*params)
     except:
-      logging.exception('Error during dispatch!')
-      return xmlrpclib.Fault('Error while invoking method.',
-                             '\n'.join(traceback.format_exc()))
+      logging.info('Exception during dispatch.', exc_info=1)
+      raise xmlrpclib.Fault(faultCode=1, faultString=traceback.format_exc())
 
   def server_bind(self):
     logging.info('Binding to address %s', self.server_address)
 
   def wrap(self, obj):
     self.wrapped_objects[id(obj)] = obj
-    logging.info('Wrapped id %s', id(obj))
+    #logging.info('Wrapped object %s', id(obj))
     return ProxyObject(socket.gethostname(), self.server_address[1], id(obj))
 
   def invoke(self, objid, method, *args, **kw):
     try:
-      logging.debug('Invoking object method...')
+      #logging.info('Invoking method: %s', method)
       result = getattr(self.wrapped_objects[objid], method)(*args, **kw)
-      logging.debug('Success.')
+      #logging.info('Successfully invoked %s', method)
       return xmlrpclib.Binary(cloudpickle.dumps(result))
     except:
-      logging.exception('Error during invocation!')
-      return xmlrpclib.Fault('Error while invoking method.',
-                             '\n'.join(traceback.format_exc()))
+      logging.info('Exception during dispatch of %s.', method, exc_info=1)
+      raise
 
 
 class ProxyObject(object):
         result = self.get_server().invoke(self.objid, method, *args, **kw)
         return cPickle.loads(result.data)
       except:
-        logging.exception('Failed to invoke remote method %s; trying again.' % method)
-        time.sleep(5)
-    raise Exception('Failed to invoke remote method %s on %s' % (method, self.host))
+        logging.info('Failed to invoke remote method %s on %s.  Trying again.',
+                     method, self.host, exc_info=1)
+        mycloud.thread.sleep(5)
+    raise Exception('Failed to invoke remote method %s on %s' %
+                    (method, self.host))

File src/mycloud/worker.py

   log_prefix = '/tmp/%s-worker-%03d' % (socket.gethostname(), index)
 
   logging.basicConfig(stream=open(log_prefix + '.log', 'w'),
-                      format='%(asctime)s %(funcName)s %(message)s',
+                      format='%(asctime)s %(filename)s:%(funcName)s %(message)s',
                       level=logging.INFO)
 
   if opts.logger_host: