Commits

Russell Power committed 31a964c

Reducers now open a listening port and accept incoming map data,
rather then requiring communication via the filesystem.

Switch to using eventlet, as native threading was behaving erratically.

  • Participants
  • Parent commits 1ae305f

Comments (0)

Files changed (9)

     url="http://rjpower.org/mycloud",
     package_dir={ '' : 'src' },
     packages=[ 'mycloud' ],
-    install_requires=[
+    requires=[
       'distribute',
-      'ssh>=1.7.9',
-      'blocked-table>=1.04',
-      'cloud>=2.0',
+      'eventlet',
+      'pycrypto',
+      'ssh',
+      'blocked-table',
+      'cloud',
     ],
 )

src/mycloud/__init__.py

-from cloud.serialization import cloudpickle
-from os.path import join
-import Queue
-import atexit
-import cPickle
-import glob
-import logging
 import mycloud.cluster
 import mycloud.connections
 import mycloud.mapreduce
 import mycloud.merge
 import mycloud.resource
 import mycloud.util
-import sys
-import threading
-import traceback
-import types
-
 
 resource = mycloud.resource
 mapreduce = mycloud.mapreduce

src/mycloud/cluster.py

 #!/usr/bin/env python
 
 from cloud.serialization import cloudpickle
+from eventlet import sleep
+from signal import *
 import Queue
 import cPickle
 import logging
 import mycloud.connections
 import mycloud.util
+import os
+import random
 import sys
-import threading
 import time
 import traceback
 
 class Task(object):
   '''A piece of work to be executed.'''
-  def __init__(self, idx, function, args):
+  def __init__(self, idx, function, args, buffer=False):
     self.idx = idx
     self.pickle = cloudpickle.dumps((function, args))
     self.result = None
-    self.stderr_logger = mycloud.util.StreamLogger('Task %05d' % self.idx)
+    self.stderr_logger = mycloud.util.StreamLogger('Task %05d' % self.idx, buffer)
 
-  def join(self):
-    self.thread.join()
+  def run(self, host, client):
+    self.host = host
+    self.client = client
 
-  def run(self, client):
     logging.info('Starting task %d', self.idx)
-    try:
-      stdin, stdout, stderr = client.invoke(
-        sys.executable, '-m', 'mycloud.worker')
+    stdin, stdout, stderr = client.invoke(
+      sys.executable, '-m', 'mycloud.worker')
 
-      self.stderr_logger.start(stderr)
-      logging.info('Writing %s', len(self.pickle))
-      stdin.write(self.pickle)
-      self.result = cPickle.load(stdout)
-      self.stderr_logger.join()
-      if isinstance(self.result, Exception):
-        raise self.result
-    finally:
-      logging.info('Task %d finished.', self.idx)
+    self.stderr_logger.start(stderr)
+    stdin.write(self.pickle)
+    self.result = cPickle.load(stdout)
+    self.stderr_logger.join()
+    if isinstance(self.result, Exception):
+      raise self.result
+    logging.info('Task %d finished.', self.idx)
 
 
 class ServerProxy(object):
 A ServerProxy is created for each core on a machine, and executes tasks as
 machine resources become available.'''
   def __init__(self, host):
+    self.host = host
     self.client = mycloud.connections.SSHConnection.connect(host)
     self.exception = None
 
-  def start(self, task_queue):
+  def start_task_queue(self, task_queue):
     logging.info('Starting server proxy.')
-    self.thread = threading.Thread(target=self.run, args=(task_queue,))
-    self.thread.start()
+    self.thread = mycloud.thread.spawn(self._read_queue, task_queue)
+    sleep(0)
+    return self.thread
 
-  def run(self, task_queue):
-    '''Execute tasks from the task queue.
-    
-If an exception is raised by a task, halt and set self.exception.
+  def start_task(self, task):
+    self.thread = mycloud.thread.spawn(self._run_task, task)
+    sleep(0)
+    return self.thread
 
-Stops when the queue becomes empty.'''
+  def _run_task(self, task):
     try:
-      while 1:
-        logging.info('Retrieving task from queue...')
-        task = task_queue.get_nowait()
-        task.run(self.client)
-        task_queue.task_done()
-    except Queue.Empty:
-      return
+      task.run(self.host, self.client)
     except Exception, e:
       logging.info('Exception occurred during remote execution of task %d.' +
              'stderr dump: \n\n%s\n\n',
              exc_info=1)
       self.exception = e
 
-  def join(self):
-    self.thread.join()
+  def _read_queue(self, task_queue):
+    '''Execute tasks from the task queue.
+    
+If an exception is raised by a task, halt and set self.exception.
+
+Stops when the queue becomes empty.'''
+    try:
+      while 1:
+        task = task_queue.get_nowait()
+        self._run_task(task)
+        task_queue.task_done()
+    except Queue.Empty:
+      return
+
 
 
 class Cluster(object):
       for i in xrange(cores):
         s = ServerProxy(host)
         servers.append(s)
+    random.shuffle(servers)
     self.servers = servers
     logging.info('Started %d servers...', len(servers))
 
   def output(self, type, name, shards):
     return mycloud.resource.output(type, name, shards)
 
+  def run(self, f, arglist):
+    assert len(arglist) < len(self.servers)
+    arglist = mycloud.util.to_tuple(arglist)
+
+    tasks = [Task(i, f, arg, buffer=False) for i, arg in enumerate(arglist)]
+    for s, task in zip(self.servers, tasks):
+      s.start_task(task)
+    return tasks
+
+
   def map(self, f, arglist):
     assert len(arglist) > 0
     idx = 0
     logging.info('Mapping %d tasks against %d servers',
                  len(tasks), len(self.servers))
 
-    for s in self.servers:
-      s.start(task_queue)
+    threads = [s.start_task_queue(task_queue) for s in self.servers]
 
     # instead of joining on the task_queue immediately, we poll the server threads
     # this way we can stop early in case we encounter an exception
       for s in self.servers:
         if s.exception:
           raise s.exception
-      time.sleep(0.1)
+      sleep(0.1)
 
     task_queue.join()
-    for s in self.servers:
-      s.join()
+    for t in threads:
+      t.join()
 
     return [t.result for t in tasks]

src/mycloud/mapreduce.py

 #!/usr/bin/env python
 
+from eventlet import sleep
 from mycloud.merge import Merger
 from os.path import join
+import SimpleXMLRPCServer
 import blocked_table
 import collections
+import json
 import logging
+import socket
 import sys
+import tempfile
+import time
 import types
+import xmlrpclib
+
+REDUCER_PORT_BASE = 40000
 
 def shard_for_key(k, num_shards):
   return hash(k) % num_shards
 def sum_reducer(k, values):
   yield k, sum(values)
 
+class XMLServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
+  def __init__(self, *args, **kw):
+    SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, *args, **kw)
 
-class MRInfo(object):
+  def shutdown(self):
+    self.__shutdown_request = True
+
+  def server_bind(self):
+    logging.info('Binding to address %s', self.server_address)
+    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
+
+class MRHelper(object):
   def __init__(self,
                mapper,
                reducer,
                tmp_prefix,
-               num_reduce_outputs,
-               map_buffer_size=500000,
-               reduce_buffer_size=500000):
+               num_mappers,
+               num_reducers,
+               map_buffer_size=1000,
+               reduce_buffer_size=10e6):
     self.mapper = mapper
     self.reducer = reducer
     self.tmp_prefix = tmp_prefix
-    self.num_reduce_outputs = num_reduce_outputs
+    self.num_mappers = num_mappers
+    self.num_reducers = num_reducers
     self.map_buffer_size = map_buffer_size
     self.reduce_buffer_size = reduce_buffer_size
 
 
-class MapHelper(object):
-  def __init__(self, mrinfo, index, input):
-    self.mrinfo = mrinfo
+class MapHelper(MRHelper):
+  def __init__(self, index, input, reducers, **kw):
+    MRHelper.__init__(self, **kw)
+
     self.input = input
     self.index = index
     self.output_tmp = collections.defaultdict(list)
-    self.output_index = 0
-    self.outputs = []
+    self.buffer_size = 0
+    self.reducers = reducers
 
   def output(self, k, v):
-    shard = shard_for_key(k, self.mrinfo.num_reduce_outputs)
+    shard = shard_for_key(k, self.num_reducers)
     self.output_tmp[shard].append((k, v))
-    if len(self.output_tmp) > self.mrinfo.map_buffer_size:
+    self.buffer_size += 1
+
+    if self.buffer_size > self.map_buffer_size:
       self.flush()
 
-  def create_mapper_output(self, shard):
-    output_file = join(self.mrinfo.tmp_prefix,
-                       'mapper_%d' % self.index,
-                       'reducer_%d' % shard,
-                       'output_%d' % self.output_index)
-    self.output_index += 1
-    return output_file
-
-  def flush(self):
-    for shard in range(self.mrinfo.num_reduce_outputs):
+  def flush(self, final=False):
+    logging.info('Flushing map %d', self.index)
+    for shard in range(self.num_reducers):
       shard_output = self.output_tmp[shard]
-      if not shard_output: continue
-
-      output = blocked_table.TableBuilder(
-                 self.create_mapper_output(shard), 'none')
-
-      for k, v in sorted(shard_output):
-        output.add(k, v)
-
-      self.outputs.append((shard, output.name))
-      del output
+      logging.info('Writing to reducer')
+      self.reduce_proxy[shard].write_map_output(self.index,
+                                                json.dumps(shard_output),
+                                                final)
 
     self.output_tmp.clear()
+    self.buffer_size = 0
+    logging.info('Flush finished.')
 
   def run(self):
-    logging.info('Reading from reader: %s', self.input)
-    if isinstance(self.mrinfo.mapper, types.ClassType):
-      mapper = self.mrinfo.mapper(self.mrinfo, self.index, self.input)
+    self.reduce_proxy = [
+      xmlrpclib.ServerProxy('http://%s:%d' % (self.reducers[i], REDUCER_PORT_BASE + i))
+      for i in range(len(self.reducers))]
+
+
+    logging.info('Reading from: %s', self.input)
+    if isinstance(self.mapper, types.ClassType):
+      mapper = self.mapper(self.mrinfo, self.index, self.input)
     else:
-      mapper = self.mrinfo.mapper
+      mapper = self.mapper
 
     for k, v in self.input.reader():
       for mk, mv in mapper(k, v):
         self.output(mk, mv)
-    self.flush()
+    self.flush(final=True)
 
-    return self.outputs
 
+class ReduceHelper(MRHelper):
+  def __init__(self, index, output, **kw):
+    MRHelper.__init__(self, **kw)
 
-class ReduceHelper(object):
-  def __init__(self, mrinfo, index, reducer_inputs, output):
-    self.mrinfo = mrinfo
     self.index = index
-    self.reducer_inputs = reducer_inputs
+    self.buffer_size = 0
+    self.map_tmp = []
+    self.maps_finished = [0] * self.num_mappers
     self.output = output
 
+  def write_map_output(self, mapper, block, is_finished):
+    logging.info('Reading from mapper %d %d', mapper, is_finished)
+    if is_finished:
+      self.maps_finished[mapper] = 1
+
+    self.buffer_size += len(block)
+    for k, v in json.loads(block):
+      self.map_tmp.append((k, v))
+
+    if self.buffer_size > self.reduce_buffer_size:
+      self.flush_map()
+
+    if sum(self.maps_finished) == self.num_mappers:
+      logging.info('Shutting down reducer.')
+      self.server.shutdown()
+
+  def flush_map(self):
+    tf = tempfile.NamedTemporaryFile(suffix='reducer-tmp')
+    bt = blocked_table.BlockedTableBuilder(tf.name)
+    self.map_tmp.sort()
+    for k, v in self.map_tmp:
+      bt.add(k, v)
+    del bt
+
+    self.map_tmp.append(tf)
+
+
   def run(self):
+    # Read map outputs until all mappers have finished executing.
+    self.server = XMLServer(('0.0.0.0', REDUCER_PORT_BASE + self.index),
+                            allow_none=True)
+    self.server.register_function(self.write_map_output)
+    self.server.serve_forever()
+
+    inputs = [blocked_table.BlockedTable(tf.name) for tf in self.map_tmp]
     out = self.output.writer()
-    inputs = []
 
-    if isinstance(self.mrinfo.reducer, types.ClassType):
-      reducer = self.mrinfo.reducer(self.mrinfo, self.index, self.output)
+    if isinstance(self.reducer, types.ClassType):
+      reducer = self.reducer(self.mrinfo, self.index, self.output)
     else:
-      reducer = self.mrinfo.reducer
-
-    for f in self.reducer_inputs:
-      inputs.append(blocked_table.Table(f).iterator())
+      reducer = self.reducer
 
     logging.info('Reducing over %s temporary map inputs.', len(inputs))
     for k, v in Merger(inputs):
     self.output = output
 
   def run(self):
-    logging.info('%s %s', self.input, self.output)
+    logging.info('Inputs: %s...', self.input[:10])
+    logging.info('Outputs: %s...', self.output[:10])
 
-    mrinfo = MRInfo(tmp_prefix=self.cluster.fs_prefix + '/tmp/mr',
-                    num_reduce_outputs=len(self.output),
-                    mapper=self.mapper,
-                    reducer=self.reducer)
+    reducers = [ReduceHelper(index=i,
+                             output=self.output[i],
+                             mapper=self.mapper,
+                             reducer=self.reducer,
+                             num_mappers=len(self.input),
+                             num_reducers=len(self.output),
+                             tmp_prefix=self.cluster.fs_prefix + '/tmp/mr')
+                for i in range(len(self.output)) ]
 
 
-    mappers = [MapHelper(mrinfo,
-                         index=index,
-                         input=ifile)
-               for index, ifile in enumerate(self.input)]
+    reduce_tasks = self.cluster.run(lambda r: r.run(), reducers)
+    sleep(5)
 
-    map_outputs = flatten(self.cluster.map(lambda m: m.run(), mappers))
-    reduce_inputs = collections.defaultdict(list)
-    for shard, file in map_outputs:
-      reduce_inputs[shard].append(file)
+    mappers = [MapHelper(index=i,
+                         input=self.input[i],
+                         reducers=[t.host for t in reduce_tasks],
+                         mapper=self.mapper,
+                         reducer=self.reducer,
+                         num_mappers=len(self.input),
+                         num_reducers=len(self.output),
+                         tmp_prefix=self.cluster.fs_prefix + '/tmp/mr')
+               for i in range(len(self.input)) ]
 
-    reducers = [ReduceHelper(mrinfo,
-                             index=index,
-                             reducer_inputs=reduce_inputs[index],
-                             output=ofile)
-                for index, ofile in enumerate(self.output)]
-
-    return self.cluster.map(lambda r: r.run(), reducers)
+    self.cluster.map(lambda m: m.run(), mappers)

src/mycloud/resource.py

   def __init__(self, filename):
     self.filename = filename
 
+  def __repr__(self):
+    return self.__class__.__name__ + ':' + self.filename
 
 class CSV(Resource):
   class Writer(object):
         yield i, i
 
   def __init__(self, range):
+    Resource.__init__(self, 'range(%d)' % len(range))
     self.range = range
 
   def reader(self):

src/mycloud/thread.py

+#!/usr/bin/env python
+
+import eventlet
+import eventlet.debug
+
+eventlet.debug.hub_listener_stacks(True)
+eventlet.debug.hub_exceptions(True)
+eventlet.debug.hub_blocking_detection(True)
+
+def spawn(f, *args):
+  return eventlet.spawn(f, *args)

src/mycloud/util.py

 #!/usr/bin/env python
 
-import threading
+import logging
 import traceback
 import types
+import mycloud.thread
 
 class StreamLogger(object):
   '''Read lines from a file object in a separate thread.
   
   These are then logged on the local host with a given prefix.'''
-  def __init__(self, prefix):
+  def __init__(self, prefix, buffer=True):
     self.prefix = prefix
+    self.buffer = buffer
     self.lines = []
 
   def start(self, stream):
-    self.thread = threading.Thread(target=self.run, args=(stream,))
-    self.thread.start()
+    self.thread = mycloud.thread.spawn(self.run, stream)
 
   def run(self, stream):
     while 1:
       line = stream.readline()
       if not line:
         break
+
+      if not self.buffer:
+        logging.info(self.prefix + ' --- ' + line.strip())
+
       self.lines.append(line.strip())
 
   def dump(self):
             ('\n' + self.prefix + ' --- ').join(self.lines))
 
   def join(self):
-    self.thread.join()
+    self.thread.wait()
 
 
 def to_tuple(arglist):

src/mycloud/worker.py

 #!/usr/bin/env python
-
-__doc__ = '''Worker for executing cluster tasks.'''
-
 from cloud.serialization import cloudpickle
 import cPickle
+import logging
 import os
 import sys
 import traceback
 
-def log(*args):
-  print >> sys.stderr, ' '.join([str(a) for a in args])
+__doc__ = '''Worker for executing cluster tasks.'''
+
+logging.basicConfig(stream=sys.stderr,
+                      format='%(asctime)s %(funcName)s %(message)s',
+                      level=logging.INFO)
 
 def execute_task():
   '''Execute a function and it's arguments, as read from stdin.'''
   sys.stdin = open('/dev/null', 'r')
   sys.stdout = open('/dev/null', 'w')
 
-  log('Task execution start.')
+  logging.info('Task execution start.')
   try:
-    log('Loading function and arguments.')
+    logging.info('Loading function and arguments.')
     f, args = cPickle.load(input)
-    log('Done.  Executing function....')
+    logging.info('Done.  Executing function %s(%s)', f, args)
     result = f(*args)
-    log('Done. Returning result.')
+    logging.info('Done. Returning result.')
     cloudpickle.dump(result, output)
-    log('Done!')
-  except:
-    log('Exception!', traceback.print_exc())
+    logging.info('Done!')
+  except Exception, e:
+    logging.info('Exception! %s', traceback.print_exc())
     cloudpickle.dump(Exception(traceback.format_exc()), output)
-  log('Task execution finished.')
+  logging.info('Task execution finished.')
 
 
 if __name__ == '__main__':

tests/test_mapreduce.py

 
 import logging
 import mycloud
+import sys
 import unittest
 
 class MapReduceTestCase(unittest.TestCase):
 
 
 if __name__ == "__main__":
+  logging.basicConfig(stream=sys.stderr,
+                      format='%(asctime)s %(funcName)s %(message)s',
+                      level=logging.INFO)
   unittest.main()