mycloud / src / mycloud / mapreduce.py

#!/usr/bin/env python

import blocked_table
import collections
import json
import logging
import mycloud.merge
import mycloud.thread
import mycloud.util
import tempfile
import types
import xmlrpclib

def shard_for_key(k, num_shards):
  return hash(k) % num_shards

def flatten(l):
  if not isinstance(l, list):
    return [l]
  r = []
  for i in l:
    r.extend(flatten(i))
  return r


def identity_mapper(k, v):
  yield k, v

def identity_reducer(k, values):
  for v in values:
    yield k, v

def sum_reducer(k, values):
  yield k, sum(values)

class MRHelper(object):
  def __init__(self,
               mapper,
               reducer,
               tmp_prefix,
               num_mappers,
               num_reducers,
               map_buffer_size=1000,
               reduce_buffer_size=100e6):
    self.mapper = mapper
    self.reducer = reducer
    self.tmp_prefix = tmp_prefix
    self.num_mappers = num_mappers
    self.num_reducers = num_reducers
    self.map_buffer_size = map_buffer_size
    self.reduce_buffer_size = reduce_buffer_size


class MapHelper(MRHelper):
  def __init__(self, index, input, reducers, **kw):
    MRHelper.__init__(self, **kw)

    self.input = input
    self.index = index
    self.output_tmp = collections.defaultdict(list)
    self.buffer_size = 0
    self.reducers = reducers

  def output(self, k, v):
    shard = shard_for_key(k, self.num_reducers)
    self.output_tmp[shard].append((k, v))
    self.buffer_size += 1

    if self.buffer_size > self.map_buffer_size:
      self.flush()

  def flush(self, final=False):
    logging.info('Flushing map %d', self.index)
    for shard in range(self.num_reducers):
      shard_output = self.output_tmp[shard]
      logging.info('Writing to reducer')
      self.reducers[shard].invoke('write_map_output',
                                  self.index,
                                  json.dumps(shard_output),
                                  final)

    self.output_tmp.clear()
    self.buffer_size = 0
    logging.info('Flush finished.')

  def run(self):
    logging.info('Reading from: %s', self.input)
    if isinstance(self.mapper, types.ClassType):
      mapper = self.mapper(self.mrinfo, self.index, self.input)
    else:
      mapper = self.mapper

    for k, v in self.input.reader():
      logging.info('Reading %s', k)
      for mk, mv in mapper(k, v):
        logging.info('Writing %s', k)
        self.output(mk, mv)
    self.flush(final=True)


class ReduceHelper(MRHelper):
  def __init__(self, index, output, **kw):
    MRHelper.__init__(self, **kw)

    self.index = index
    self.buffer_size = 0
    self.buffer = []
    self.map_tmp = []
    self.maps_finished = [0] * self.num_mappers
    self.output = output

    self.serving = False
    self.thread = None

  def write_map_output(self, mapper, block, is_finished):
    logging.info('Reading from mapper %d %d', mapper, is_finished)
    if is_finished:
      self.maps_finished[mapper] = 1

    self.buffer_size += len(block)
    for k, v in json.loads(block):
      self.buffer.append((k, v))

    if self.buffer_size > self.reduce_buffer_size:
      self.flush()

  def flush(self):
    tf = tempfile.NamedTemporaryFile(suffix='reducer-tmp')
    bt = blocked_table.TableBuilder(tf.name)
    self.buffer.sort()
    for k, v in self.buffer:
      bt.add(k, v)
    del bt

    self.map_tmp.append(tf)

  def start_server(self):
    self.thread = mycloud.thread.spawn(self._run)
    mycloud.thread.sleep(0)
    logging.info('Returning proxy to self')
    return mycloud.util.Proxy(self)

  def _run(self):
    # Read map outputs until all mappers have finished executing.
    while sum(self.maps_finished) != self.num_mappers:
      mycloud.thread.sleep(1)
    self.flush()

    logging.info('Finished reading map data, beginning merge.')

    inputs = [blocked_table.Table(tf.name).iteritems() for tf in self.map_tmp]
    out = self.output.writer()

    if isinstance(self.reducer, types.ClassType):
      reducer = self.reducer(self.mrinfo, self.index, self.output)
    else:
      reducer = self.reducer

    logging.info('Reducing over %s temporary map inputs.', len(inputs))
    for k, v in mycloud.merge.Merger(inputs):
#      logging.info('REDUCE: %s %s', k, v)
      for rk, rv in reducer(k, v):
        out.add(rk, rv)

    logging.info('Returning output: %s', self.output)

  def wait(self):
    self.thread.wait()
    return self.output


class MapReduce(object):
  def __init__(self, cluster, mapper, reducer, input, output):
    self.cluster = cluster
    self.mapper = mapper
    self.reducer = reducer
    self.input = input
    self.output = output

  def run(self):
    logging.info('Inputs: %s...', self.input[:10])
    logging.info('Outputs: %s...', self.output[:10])

    try:
      reducers = [ReduceHelper(index=i,
                               output=self.output[i],
                               mapper=self.mapper,
                               reducer=self.reducer,
                               num_mappers=len(self.input),
                               num_reducers=len(self.output),
                               tmp_prefix=self.cluster.fs_prefix + '/tmp/mr')
                  for i in range(len(self.output)) ]

      reduce_tasks = self.cluster.map(lambda r: r.start_server(), reducers)

      mappers = [MapHelper(index=i,
                           input=self.input[i],
                           reducers=reduce_tasks,
                           mapper=self.mapper,
                           reducer=self.reducer,
                           num_mappers=len(self.input),
                           num_reducers=len(self.output),
                           tmp_prefix=self.cluster.fs_prefix + '/tmp/mr')
                 for i in range(len(self.input)) ]

      self.cluster.map(lambda m: m.run(), mappers)

      return [r.invoke('wait') for r in reduce_tasks]
    except:
      logging.info('MapReduce failed.', exc_info=1)
      raise
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.