Commits

Russell Power committed fc8a21a

Better logging; capture stderr messages from remote workers
and log them all at once in the case of exceptions.

  • Participants
  • Parent commits 755cacf

Comments (0)

Files changed (5)

File src/mycloud/__init__.py

 mapreduce = mycloud.mapreduce
 
 Cluster = mycloud.cluster.Cluster
-
-logging.basicConfig(stream=sys.stderr,
-                    format='%(asctime)s %(funcName)s %(message)s',
-                    level=logging.INFO)

File src/mycloud/cluster.py

 import logging
 import mycloud.connections
 import mycloud.util
+import mycloud.worker
 import sys
 import threading
 import traceback
 
-
-
-class RemoteException(object):
-  def __init__(self, type, value, tb):
-    self.type = type
-    self.value = value
-    self.tb = traceback.format_exc(tb)
-
 class Machine(object):
   def __init__(self, host, cores):
     self.host = host
       stdin.write(self.pickle)
       self.result = cPickle.load(stdout)
       self.stderr_logger.join()
-      if isinstance(self.result, RemoteException):
+      if isinstance(self.result, mycloud.util.RemoteException):
         raise Exception, 'Remote:: %s' % self.result.tb
-    except:
+    except Exception, e:
       logging.info('Exception occurred during remote execution of %d.',
                    self.idx, exc_info=1)
+      self.result = e
+      self.stderr_logger.dump()
     else:
       logging.info('Task %d finished.', self.idx)
 

File src/mycloud/mapreduce.py

 import collections
 import logging
 import sys
+import types
 
 def shard_for_key(k, num_shards):
   return hash(k) % num_shards

File src/mycloud/util.py

 #!/usr/bin/env python
 import logging
 import threading
+import traceback
 import types
 
 class StreamLogger(object):
   These are then logged on the local host with a given prefix.'''
   def __init__(self, prefix):
     self.prefix = prefix
+    self.lines = []
 
   def start(self, stream):
     self.thread = threading.Thread(target=self.run, args=(stream,))
       line = stream.readline()
       if not line:
         break
-      logging.info('(%s) ----- :: %s', self.prefix, line.strip())
+      self.lines.append(line.strip())
+
+  def dump(self):
+    logging.info(self.prefix + ' --- ' +
+                 ('\n' + self.prefix + ' --- ').join(self.lines))
 
   def join(self):
     self.thread.join()
       arglist[i] = (args,)
 
   return arglist
+
+class RemoteException(object):
+  def __init__(self, type, value, tb):
+    self.type = type
+    self.value = value
+    self.tb = traceback.format_exc(tb)

File src/mycloud/worker.py

 '''Worker stub - executes tasks remotely.'''
 from cloud.serialization import cloudpickle
 import cPickle
-import mycloud
 import sys
 import traceback
 
+import mycloud.util
 
 def execute_task():
   print >> sys.stderr, 'Task execution start.'
     print >> sys.stderr, 'Done!'
   except:
     print >> sys.stderr, 'Exception!', traceback.print_exc()
-    cloudpickle.dump(mycloud.cluster.RemoteException(*sys.exc_info()), sys.stdout)
+    cloudpickle.dump(mycloud.util.RemoteException(*sys.exc_info()), sys.stdout)
   print >> sys.stderr, 'Task execution finished.'