Wiki

Clone wiki

ENMA / modules / developers_R

R tutorial (Using Rhipe)

'Return home'

Create a basic Map Reduce job

Create a Map Reduce job in R is based on two steps:

  1. Create a new R script file which defines the Map Reduce job.
  2. Execute this R script using the function run_rscript() from the tasks.py of the module.
run_rscript(cmd, r_params)
  • Definition: This function is defined in lib/utils.py and is used to execute R scripts from Python.
  • Arguments:
    • cmd: A tuple containing the command to run an R script through the shell, normally 'Rscript', and the path where the R script is located)
    • r_params: A dictionary containing the needed parameters for the R script.

This is a simple example of a Map Reduce job defined in an R script using the Rhipe library.

#!/usr/bin/Rscript

# Load initial libraries and params
library(RJSONIO)

h = basicJSONHandler()

# f is the r_params object defined in Python
f <- file("stdin")
open(f)
params <- fromJSON(readLines(f), h)

library(Rhipe)
rhinit()

# Variables definition (This variables are get from the params definition)
sep <- params$sep
postalCode_column <- params$postalCode_column
consumption_column <- params$consumption
power_column <- params$power
companyId <- params$companyId

# Example of input file (imagine that columns are postalCode, consumption, power)
#    25110\t120.2\t2300
#    25006\t430.2\t6000
#    25110\t347.1\t4400
#    08080\t74.2\t2300

# Mapper definition
example_mapper <- expression({

                    # Split all the columns of every map values row. This transform the raw input file to an R list containing vectors of rows
                    rows <- strsplit(unlist(map.values),'\t')

                    # Treat every row separately
                    mapply(function(row){

                                            # Emit to the reducer (key: postalCode, value: vector of consumption and power)
                                            rhcollect( row[postalCode_column], c( 'consumption' = row[consumption_column], 'power' = row[power_column] ) )

                    }, rows, SIMPLIFY=FALSE)

            })

# Reducer definition
example_reducer<-expression(

            reduce = {

                    # Join all the reduce values in a single data.frame (called df)
                    df <- do.call("rbind",reduce.values)

                    # Make some calculation to df
                    average_consumption <- mean( as.numeric(df[,'consumption']), na.rm=T)
                    average_users <- nrow(df)
                    average_power <- mean( as.numeric(df[,'power']), na.rm=T)

                    # Emit the final results. As we want to use this output as the final results of the module, the output text file columns format will be:
                    #      <postalCode>~<companyId>, <average_consumption>, <average_users>
                    rhcollect(NULL, c( paste(reduce.key,companyId,sep="~"), average_consumption, average_users ) )

            }

)

# Prepare the Map Reduce job
example_mr <- rhwatch(map=example_mapper, reduce=example_reducer,
            input=rhfmt(params$paths$input,type='text'),
            output=rhfmt(params$paths$result_dir,type='text'),
            read=FALSE, noeval=TRUE,
            mapred=list(mapred.field.separator=",", mapred.textoutputformat.usekey=FALSE)
)

# Execute the Map Reduce job
job <- rhex(example_mr, async=TRUE)

# Parse feedback
status <- rhstatus(job)
print('Printing rhstatus')
toJSON(status, collapse='', .escapeEscapes = FALSE)

# Clean the temporary files created by the Map Reduce job
rhclean()

Updated