Home »

R and Hadoop

Software

Train in R

Predictive Analytics- The Book

Here is an exciting project for using R on the cloud computing environment ( two of my favorite things). It is called RHIPE

R and Hadoop Integrated Processing Environment v.0.38

cloud

The website source is http://ml.stat.purdue.edu/rhipe/

RHIPE(phonetic spelling: hree-pay’ 1) is a java package that integrates the R environment with Hadoop, the open source implementation of Google’s mapreduce. Using RHIPE it is possible to code map-reduce algorithms in R e.g

m <- function(key,val){
  words <- strsplit(val," +")[[1]]
  wc <- table(words)
  cln <- names(wc)
  names(wc)<-NULL; names(cln)<-NULL;
  return(sapply(1:length(wc),function(r) list(key=cln[r],value=wc[[r]]),simplify=F))
}
r <- function(key,value){
  value <- do.call("rbind",value)
  return(list(list(key=key,value=sum(value))))
}
rhmr(map=m,reduce=r,combiner=T,input.folder="X",output.folder="Y")
rhapply packages the user's request into an R vector object. This is serialized and sent to the RHIPE server. The RHIPE server picks apart the object creating a job request that Hadoop can understand. Each element of the provided list is processed by the users function during the Map stage of mapreduce. The results are returned and if the output is to a file, these results are serialized and written to a Hadoop Sequence file, the values can be read back into R using the rhsq* functions.

2 rhlapply

rhlapply <- function( list.object,
                    func,
                    configure=expression(),
                    output.folder='',
                    shared.files=c(),
                    hadoop.mapreduce=list(),
                    verbose=T,
                    takeAll=T)
list.object
This can either be a list or a single scalar. In case of the former, the function given by func will be applied to each element of list.object. In case of a scalar, the function will be applied to the list 1:n where n is the value of the scalar
func
A function that takes one parameter: an element of the list.
configure
An configuration expression to run before the func is executed. Executed once for every JVM. If you need variables, data frames, use rhsave or rhsave.image , use rhput to copy it to the DFS and then use shared.files

config = expression({
              library(lattice)
              load("mydataset.Rdata")
})
output.folder
Any file that is created by the function is stored in the output.folder. This is deleted first. If not given, the files created will not be copied. For side effect files to be copies create them in tmp e.g pdf("tmp/x.pdf"), note no leading slash.The directory will contain a slew of part* files, as many as there maps. These contain the binary key-value pairs.
shared.files
The function or the preload expression might require the presence resource files e.g *.Rdata files. The user could copy it from the HDFS in the R code or just load it from the local work directory were the files present. This is the role of shared.files. It is a vector of paths to files on the HDFS, each of these will be copied to the work directory where the R code is run. e.g c('/tmp/x.Rdata','/foo.tgz'), then the first file can be loaded via load("x.Rdata") . For those familiar with Hadoop terminology, this is implemented via DistributedCache .
hadoop.mapreduce
a list of Hadoop specific options e.g

list(mapreduce.map.tasks=10,mapreduce.reduce.tasks=3)
takeAll
if takeALL is true, the value returned is a list each entry the return value of the the function, not in order so element 1 of the returned list is not the result of func(list.object=1=) .
verbose
If True, the user will see the job progress in the R console. If False, the web url to the jobtracker will be displayed. Cancelling the command with CTRL-C will not cancel the job, use rhkill for that.
Mapreduce in R.

rhmr <- function(map,reduce,input.folder,configure=list(map=expression(),reduce=expression()),
                close=list(map=expression(),reduce=expression())
                 output.folder='',combiner=F,step=F,
                 shared.files=c(),inputformat="TextInputFormat",
                 outputformat="TextOutputFormat",
                 hadoop.mapreduce=list(),verbose=T,libjars=c())

Execute map reduce algorithms from within R. A discussion of the parameters follow.

input.folder
A folder on the DFS containing the files to process. Can be a vector.
output.folder
A folder on the DFS where output will go to.
inputformat
Either of TextInputFormat or SequenceFileInputFormat. Use the former for text files and the latter for sequence files created from within R or as outputs from RHIPE(e.g rhlapply or rhmr). Note, one can't use any sequence file, they must have been created via a RHIPE function. Custom Input formats are also possible. Download the source and look at code/java/RXTextInputFormat.java
outputformat
Either of TextOutputFormat or SequenceFileOutputFormat. In case of the former, the return value from the mapper or reducer is converted to character and written to disk. The following code is used to convert to character.

paste(key,sep='',collapse=field_separator)

Custom output formats are also possible. Download the source and look at code/java/RXTextOutputFormat.java

If custom formats implement their own writables, it must subclass RXWritable or use one of the writables presents in RHIPE

shared.files
same as in rhlapply, see that for documentation.
verbose
If T, the job progress is displayed. If false, then the job URL is displayed.

At any time in the configure, close, map or reduce function/expression, the variable mapred.task.is.map will be equal to "true" if it is map task,"false" otherwise (both strings) Also, mapred.iswhat is mapper, reducer, combiner in their respective environments.

configure
A list with either one element (an expression) or two elements map and reduce both of which must be expressions. These expressions are called in their respective environments, i.e the map expression is called during the map configure and similarly for the reduce expression. The reduce expression is called for the combiner configure method.If only one list element, the expression is used for both the map and reduce
close
Same as configure .
map
a function that takes two values key and value. Should return a list of lists. Each list entry must contain two elements key and value , e.g

...
ret <- list()
ret[[1]] <-  list(key=c(1,2), value=c('x','b'))
return(ret)

If any of key/value are missing the output is not collected, e.g. return NULL to skip this record. If the input format is a TextInputFormat, the value is the entire line and the key is probably useless to the user( it is a number indicating bytes into the file). If the input format is SequenceFileInputFormat, the key and value are taken from the sequence file.

reduce
Not needed if mapred.reduce.tasks is 0. Takes a key and a list of values( all values emitted from the maps that share the same map output key ). If step is True, then not a list. Must return a list of lists each element of which must have two elements key and value. This collects all the values and sends them to function. If NULL is returned or the return value is not conforming to the above nothing is collected the Hadoop collector.
step
If step is TRUE, then the reduce function is called for every value corresponding to a key that is once for every value.

  • The variable red.status is equal to 1 on the first call.
  • red.status is equal to 0 for every subsequent calls including the last value
  • The reducer function is called one last time with red.status equal to -1. The value is NULL.Anything returned at any of these stages is written to disk The close function is called once every value for a given key has been processed, but returning anything has no effect. To a assign to the global environment use the <<- operator.
combiner
T or F, to use the reducer as a combiner. Using a combiner makes computation more efficient. If combiner is true, the reduce function will be called as a combiner (0 or more times, it may never be called during the combine stage even if combiner is T) .The value of mapred.task.is.map is 'true' or '*'false*' (both strings) if the combiner is being executed as part of the map stage or reduce stage respectively.

Whether knowledge of this is useful or not is something I'm not sure of. However, if combiner is T , keep in mind,your reduce function must be able to handle inputs sent from the map or inputs sent from the reduce function(itself).

libjars
If specifying a custom input/output format, the user might need to specify jar files here.
hadoop.mapreduce
set RHIPE and Hadoop options via this.

1.1 RHIPE Options for mapreduce

Option Default Explanation
rhipejob.rport 8888 The port on which Rserve runs, should be same across all machines
rhipejob.charsxp.short 0 If 1, RHIPE optimize serialization for character vectors. This reduces the length of the serialization
rhipejob.getmapbatches 100 If the reduce/mapper emits several key,values, how many to get from Rserve at a time. A higher number reduce the number of network reads(the network reads are to localhost)
rhipejob.outfmt.is.text 1 if TextInputFormat Must be 1 if the output is textual
rhipejob.textoutput.fieldsep ' ' The field separator for any text based output format
rhipejob.textinput.comment '#' In the TextInputFormat, lines beginning with this are skipped
rhipejob.combinerspill 100,000 The combiner is run after collecting at most this many items
rhipejob.tor.batch 200,000 Number of values for the same key to collate before sending to the Reducer, if you have dollops of memory, set this larger. However, too large and you hit Java's heap space limit
rhipejob.max.count.reduce Java's INT_MAX (about 2BN) the total number of values for a given key to be collected, note the values are not ordered by any variable.
rhipejob.inputformat.keyclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText, when using a Custom InputFormat implement RXWritable and implement the methods
rhipejob.inputformat.valueclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the valueclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText when using a Custom InputFormat implement RXWritable and implement the methods
mapred.input.format.class As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextInputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat specify yours here
rhipejob.outputformat.keyclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the keyclass must implement RXWritable and
rhipejob.outputformat.valueclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the value e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the valueclass must implement RXWritable
mapred.output.format.class As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextOutputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat specify yours here, provide libjars if required

Citation:http://ml.stat.purdue.edu/rhipe/index.html

Great exciting news for the world of computing remotely.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Conferences

Books

Follow

Get every new post delivered to your Inbox.

Join 857 other followers