Source

mycloud / tests / test_mapreduce.py

Full commit
#!/usr/bin/env python

import logging
import mycloud
import sys
import unittest

class MapReduceTestCase(unittest.TestCase):
  def testSimpleMapper(self):
    cluster = mycloud.Cluster([('localhost', 4)])
    input_desc = [mycloud.resource.SequenceFile(range(100)) for i in range(10)]
    output_desc = [mycloud.resource.MemoryFile() for i in range(5)]

    def map_identity(k, v):
      yield (k, v)

    def reduce_sum(k, values):
      logging.info('%s %s', k, values)
      yield (k, sum(values))

    mr = mycloud.mapreduce.MapReduce(cluster,
                                     map_identity,
                                     reduce_sum,
                                     input_desc,
                                     output_desc)
    result = mr.run()

    logging.info('Result %s %s', result[0], result[0].__class__)
    for k, v in result[0].reader():
      logging.info('Result: %s %s', k, v)

    oiter = result[0].reader()
    for j in range(100):
      k, v = oiter.next()
      self.assertEqual(k, j)
      self.assertEqual(v, j * 10)


if __name__ == "__main__":
  logging.basicConfig(stream=sys.stderr,
                      format='%(asctime)s %(funcName)s %(message)s',
                      level=logging.INFO)
  unittest.main()