Wiki

Clone wiki

demeter-course / HadoopStreaming

Hadoop Streaming

###Mapper

#category_mapper.py
import sys

for line in sys.stdin:
    line = line.strip()
    fields = line.split('\t')
    category = fields[8]
    salary = fields[10]
    print '%s\t%s' % (category, salary)

Reducer

#category_reducer.py

import sys

current_cat = None
current_sum = 0 
current_length = 0

for line in sys.stdin:
    line = line.strip()
    category, salary = line.split('\t',1)
    try:
        category = category.strip()
        salary = float(salary)
    except:
        continue
    if current_cat == category:
        current_length += 1
        current_sum += salary
    else:
        if current_cat:
            print '%s\t%f' % (current_cat, current_sum/current_length)
        current_cat = category
        current_sum = salary
        current_length = 1
if current_cat == category:
    print '%s\t%f' % (current_cat, current_sum/current_length)

###Running it on your command line

cat file.tsv | python category_mapper.py
cat file.tsv | python category_mapper.py | python category_reducer.py

##Why is the last one wrong?
cat file.tsv | python category_mapper.py | sort | python category_reducer.py

Updated