Here is an exciting project for using R on the cloud computing environment ( two of my favorite things). It is called RHIPE
R and Hadoop Integrated Processing Environment v.0.38

The website source is http://ml.stat.purdue.edu/rhipe/
RHIPE(phonetic spelling: hree-pay’ 1) is a java package that integrates the R environment with Hadoop, the open source implementation of Google’s mapreduce. Using RHIPE it is possible to code map-reduce algorithms in R e.g
m <- function(key,val){
words <- strsplit(val," +")[[1]]
wc <- table(words)
cln <- names(wc)
names(wc)<-NULL; names(cln)<-NULL;
return(sapply(1:length(wc),function(r) list(key=cln[r],value=wc[[r]]),simplify=F))
}
r <- function(key,value){
value <- do.call("rbind",value)
return(list(list(key=key,value=sum(value))))
}
rhmr(map=m,reduce=r,combiner=T,input.folder="X",output.folder="Y")
rhapply
packages the user's request into an R vector object. This is serialized and sent to the RHIPE server. The RHIPE server picks apart the object creating a job request that Hadoop can understand. Each element of the provided list is processed by the users function during the Map stage of mapreduce. The results are returned and if the output is to a file, these results are serialized and written to a Hadoop Sequence file, the values can be read back into R using the rhsq* functions.
2 rhlapply
rhlapply <- function( list.object,
func,
configure=expression(),
output.folder='',
shared.files=c(),
hadoop.mapreduce=list(),
verbose=T,
takeAll=T)
- list.object
- This can either be a list or a single scalar. In case of the former, the function given by
func
will be applied to each element of list.object
. In case of a scalar, the function will be applied to the list 1:n
where n
is the value of the scalar
- func
- A function that takes one parameter: an element of the list.
- configure
- An configuration expression to run before the
func
is executed. Executed once for every JVM. If you need variables, data frames, use rhsave
or rhsave.image
, use rhput
to copy it to the DFS and then use shared.files
config = expression({
library(lattice)
load("mydataset.Rdata")
})
- output.folder
- Any file that is created by the function is stored in the
output.folder
. This is deleted first. If not given, the files created will not be copied. For side effect files to be copies create them in tmp
e.g pdf("tmp/x.pdf"), note no leading slash.The directory will contain a slew of part*
files, as many as there maps. These contain the binary key-value pairs.
- shared.files
- The function or the preload expression might require the presence resource files e.g *.Rdata files. The user could copy it from the HDFS in the R code or just load it from the local work directory were the files present. This is the role of shared.files. It is a vector of paths to files on the HDFS, each of these will be copied to the work directory where the R code is run. e.g
c('/tmp/x.Rdata','/foo.tgz')
, then the first file can be loaded via load("x.Rdata")
. For those familiar with Hadoop terminology, this is implemented via DistributedCache .
- hadoop.mapreduce
- a list of Hadoop specific options e.g
list(mapreduce.map.tasks=10,mapreduce.reduce.tasks=3)
- takeAll
- if takeALL is true, the value returned is a list each entry the return value of the the function, not in order so element 1 of the returned list is not the result of func(list.object=1=) .
- verbose
- If True, the user will see the job progress in the R console. If False, the web url to the jobtracker will be displayed. Cancelling the command with CTRL-C will not cancel the job, use
rhkill
for that.
Mapreduce in R.
rhmr <- function(map,reduce,input.folder,configure=list(map=expression(),reduce=expression()),
close=list(map=expression(),reduce=expression())
output.folder='',combiner=F,step=F,
shared.files=c(),inputformat="TextInputFormat",
outputformat="TextOutputFormat",
hadoop.mapreduce=list(),verbose=T,libjars=c())
Execute map reduce algorithms from within R. A discussion of the parameters follow.
- input.folder
- A folder on the DFS containing the files to process. Can be a vector.
- output.folder
- A folder on the DFS where output will go to.
- inputformat
- Either of
TextInputFormat
or SequenceFileInputFormat
. Use the former for text files and the latter for sequence files created from within R or as outputs from RHIPE(e.g rhlapply or rhmr). Note, one can't use any sequence file, they must have been created via a RHIPE function. Custom Input formats are also possible. Download the source and look at code/java/RXTextInputFormat.java
- outputformat
- Either of
TextOutputFormat
or SequenceFileOutputFormat
. In case of the former, the return value from the mapper or reducer is converted to character and written to disk. The following code is used to convert to character.
paste(key,sep='',collapse=field_separator)
Custom output formats are also possible. Download the source and look at code/java/RXTextOutputFormat.java
If custom formats implement their own writables, it must subclass RXWritable or use one of the writables presents in RHIPE
- shared.files
- same as in
rhlapply
, see that for documentation.
- verbose
- If
T
, the job progress is displayed. If false, then the job URL is displayed.
At any time in the configure
, close
, map
or reduce
function/expression, the variable mapred.task.is.map
will be equal to "true" if it is map task,"false" otherwise (both strings) Also, mapred.iswhat
is mapper, reducer, combiner in their respective environments.
- configure
- A list with either one element (an expression) or two elements map and reduce both of which must be expressions. These expressions are called in their respective environments, i.e the map expression is called during the map configure and similarly for the reduce expression. The reduce expression is called for the combiner configure method.If only one list element, the expression is used for both the map and reduce
- close
- Same as
configure
.
- map
- a function that takes two values key and value. Should return a list of lists. Each list entry must contain two elements key and value , e.g
...
ret <- list()
ret[[1]] <- list(key=c(1,2), value=c('x','b'))
return(ret)
If any of key/value are missing the output is not collected, e.g. return NULL
to skip this record. If the input format is a TextInputFormat
, the value is the entire line and the key is probably useless to the user( it is a number indicating bytes into the file). If the input format is SequenceFileInputFormat
, the key and value are taken from the sequence file.
- reduce
- Not needed if
mapred.reduce.tasks
is 0. Takes a key and a list of values( all values emitted from the maps that share the same map output key ). If step
is True, then not a list. Must return a list of lists each element of which must have two elements key and value. This collects all the values and sends them to function. If NULL is returned or the return value is not conforming to the above nothing is collected the Hadoop collector.
- step
- If
step
is TRUE, then the reduce
function is called for every value corresponding to a key that is once for every value.
- The variable
red.status
is equal to 1 on the first call.
-
red.status
is equal to 0 for every subsequent calls including the last value
- The reducer function is called one last time with
red.status
equal to -1. The value is NULL.Anything returned at any of these stages is written to disk The close
function is called once every value for a given key has been processed, but returning anything has no effect. To a assign to the global environment use the <<-
operator.
- combiner
-
T
or F
, to use the reducer as a combiner. Using a combiner makes computation more efficient. If combiner is true, the reduce function will be called as a combiner (0 or more times, it may never be called during the combine stage even if combiner is T
) .The value of mapred.task.is.map
is 'true' or '*'false*' (both strings) if the combiner is being executed as part of the map stage or reduce stage respectively.
Whether knowledge of this is useful or not is something I'm not sure of. However, if combiner is T
, keep in mind,your reduce function must be able to handle inputs sent from the map or inputs sent from the reduce function(itself).
- libjars
- If specifying a custom input/output format, the user might need to specify jar files here.
- hadoop.mapreduce
- set RHIPE and Hadoop options via this.
1.1 RHIPE Options for mapreduce
Option |
Default |
Explanation |
rhipejob.rport |
8888 |
The port on which Rserve runs, should be same across all machines |
rhipejob.charsxp.short |
0 |
If 1, RHIPE optimize serialization for character vectors. This reduces the length of the serialization |
rhipejob.getmapbatches |
100 |
If the reduce/mapper emits several key,values, how many to get from Rserve at a time. A higher number reduce the number of network reads(the network reads are to localhost) |
rhipejob.outfmt.is.text |
1 if TextInputFormat |
Must be 1 if the output is textual |
rhipejob.textoutput.fieldsep |
' ' |
The field separator for any text based output format |
rhipejob.textinput.comment |
'#' |
In the TextInputFormat, lines beginning with this are skipped |
rhipejob.combinerspill |
100,000 |
The combiner is run after collecting at most this many items |
rhipejob.tor.batch |
200,000 |
Number of values for the same key to collate before sending to the Reducer, if you have dollops of memory, set this larger. However, too large and you hit Java's heap space limit |
rhipejob.max.count.reduce |
Java's INT_MAX (about 2BN) |
the total number of values for a given key to be collected, note the values are not ordered by any variable. |
rhipejob.inputformat.keyclass |
The default is chosen depending on TextInputFormat or SequenceFileInputFormat |
Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , when using a Custom InputFormat implement RXWritable and implement the methods |
rhipejob.inputformat.valueclass |
The default is chosen depending on TextInputFormat or SequenceFileInputFormat |
Provide the full Java URL to the valueclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText when using a Custom InputFormat implement RXWritable and implement the methods |
mapred.input.format.class |
As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextInputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat |
specify yours here |
rhipejob.outputformat.keyclass |
The default is chosen depending on TextInputFormat or SequenceFileInputFormat |
Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the keyclass must implement RXWritable and |
rhipejob.outputformat.valueclass |
The default is chosen depending on TextInputFormat or SequenceFileInputFormat |
Provide the full Java URL to the value e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the valueclass must implement RXWritable |
mapred.output.format.class |
As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextOutputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat |
specify yours here, provide libjars if required |
Great exciting news for the world of computing remotely.