Wiki
Clone wikiENMA / modules / developers_R
R tutorial (Using Rhipe)
Other developers tutorials
Create a basic Map Reduce job
Create a Map Reduce job in R is based on two steps:
- Create a new R script file which defines the Map Reduce job.
- Execute this R script using the function run_rscript() from the tasks.py of the module.
run_rscript(cmd, r_params)
- Definition: This function is defined in lib/utils.py and is used to execute R scripts from Python.
- Arguments:
- cmd: A tuple containing the command to run an R script through the shell, normally 'Rscript', and the path where the R script is located)
- r_params: A dictionary containing the needed parameters for the R script.
This is a simple example of a Map Reduce job defined in an R script using the Rhipe library.
#!/usr/bin/Rscript # Load initial libraries and params library(RJSONIO) h = basicJSONHandler() # f is the r_params object defined in Python f <- file("stdin") open(f) params <- fromJSON(readLines(f), h) library(Rhipe) rhinit() # Variables definition (This variables are get from the params definition) sep <- params$sep postalCode_column <- params$postalCode_column consumption_column <- params$consumption power_column <- params$power companyId <- params$companyId # Example of input file (imagine that columns are postalCode, consumption, power) # 25110\t120.2\t2300 # 25006\t430.2\t6000 # 25110\t347.1\t4400 # 08080\t74.2\t2300 # Mapper definition example_mapper <- expression({ # Split all the columns of every map values row. This transform the raw input file to an R list containing vectors of rows rows <- strsplit(unlist(map.values),'\t') # Treat every row separately mapply(function(row){ # Emit to the reducer (key: postalCode, value: vector of consumption and power) rhcollect( row[postalCode_column], c( 'consumption' = row[consumption_column], 'power' = row[power_column] ) ) }, rows, SIMPLIFY=FALSE) }) # Reducer definition example_reducer<-expression( reduce = { # Join all the reduce values in a single data.frame (called df) df <- do.call("rbind",reduce.values) # Make some calculation to df average_consumption <- mean( as.numeric(df[,'consumption']), na.rm=T) average_users <- nrow(df) average_power <- mean( as.numeric(df[,'power']), na.rm=T) # Emit the final results. As we want to use this output as the final results of the module, the output text file columns format will be: # <postalCode>~<companyId>, <average_consumption>, <average_users> rhcollect(NULL, c( paste(reduce.key,companyId,sep="~"), average_consumption, average_users ) ) } ) # Prepare the Map Reduce job example_mr <- rhwatch(map=example_mapper, reduce=example_reducer, input=rhfmt(params$paths$input,type='text'), output=rhfmt(params$paths$result_dir,type='text'), read=FALSE, noeval=TRUE, mapred=list(mapred.field.separator=",", mapred.textoutputformat.usekey=FALSE) ) # Execute the Map Reduce job job <- rhex(example_mr, async=TRUE) # Parse feedback status <- rhstatus(job) print('Printing rhstatus') toJSON(status, collapse='', .escapeEscapes = FALSE) # Clean the temporary files created by the Map Reduce job rhclean()
Updated