Q&A with David Smith, Revolution Analytics.

Here’s a group of questions and answers that David Smith of Revolution Analytics was kind enough to answer post the launch of the new R Package which integrates Hadoop and R-                         RevoScaleR

Ajay- How does RevoScaleR work from a technical viewpoint in terms of Hadoop integration?

David-The point isn’t that there’s a deep technical integration between Revolution R and Hadoop, rather that we see them as complementary (not competing) technologies. Hadoop is amazing at reliably (if slowly) processing huge volumes of distributed data; the RevoScaleR package complements Hadoop by providing statistical algorithms to analyze the data processed by Hadoop. The analogy I use is to compare a freight train with a race car: use Hadoop to slog through a distributed data set and use Map/Reduce to output an aggregated, rectangular data file; then use RevoScaleR to perform statistical analysis on the processed data (and use the speed of RevolScaleR to iterate through many model options to find the best one).

Ajay- How is it different from MapReduce and R Hipe– existing R Hadoop packages?
David- They’re complementary. In fact, we’ll be publishing a white paper soon by Saptarshi Guha, author of the Rhipe R/Hadoop integration, showing how he uses Hadoop to process vast volumes of packet-level VOIP data to identify call time/duration from the packets, and then do a regression on the table of calls using RevoScaleR. There’s a little more detail in this blog post: http://blog.revolutionanalytics.com/2010/08/announcing-big-data-for-revolution-r.html
Ajay- Is it going to be proprietary, free or licensable (open source)?
David- RevoScaleR is a proprietary package, available to paid subscribers (or free to academics) with Revolution R Enterprise. (If you haven’t seen it, you might be interested in this Q&A I did with Matt Shotwell: http://biostatmatt.com/archives/533 )
Ajay- Any existing client case studies for Terabyte level analysis using R.
David- The VOIP example above gets close, but most of the case studies we’ve seen in beta testing have been in the 10’s to 100’s of Gb range. We’ve tested RevoScaleR on larger data sets internally, but we’re eager to hear about real-life use cases in the terabyte range.
Ajay- How can I use RevoScaleR on my dual chip Win Intel laptop for say 5 gb of data.
David- One of the great things about RevoScaleR is that it’s designed to work on commodity hardware like a dual-core laptop. You won’t be constrained by the limited RAM available, and the parallel processing algorithms will make use of all cores available to speed up the analysis even further. There’s an example in this white paper (http://info.revolutionanalytics.com/bigdata.html) of doing linear regression on 13Gb of data on a simple dual-core laptop in less than 5 seconds.
AJ-Thanks to David Smith, for this fast response and wishing him, Saptarshi Guha Dr Norman Nie and the rest of guys at Revolution Analytics a congratulations for this new product launch.

Big Data and R: New Product Release by Revolution Analytics

Press Release by the Guys in Revolution Analytics- this time claiming to enable terabyte level analytics with R. Interesting stuff but techie details are awaited.

Revolution Analytics Brings

Big Data Analysis to R

The world’s most powerful statistics language can now tackle terabyte-class data sets using

Revolution R Enterpriseat a fraction of the cost of legacy analytics products


JSM 2010 – VANCOUVER (August 3, 2010) — Revolution Analytics today introduced ‘Big Data’ analysis to its Revolution R Enterprise software, taking the popular R statistics language to unprecedented new levels of capacity and performance for analyzing very large data sets. For the first time, R users will be able to process, visualize and model terabyte-class data sets in a fraction of the time of legacy products—without employing expensive or specialized hardware.

The new version of Revolution R Enterprise introduces an add-on package called RevoScaleR that provides a new framework for fast and efficient multi-core processing of large data sets. It includes:

  • The XDF file format, a new binary ‘Big Data’ file format with an interface to the R language that provides high-speed access to arbitrary rows, blocks and columns of data.
  • A collection of widely-used statistical algorithms optimized for Big Data, including high-performance implementations of Summary Statistics, Linear Regression, Binomial Logistic Regressionand Crosstabs—with more to be added in the near future.
  • Data Reading & Transformation tools that allow users to interactively explore and prepare large data sets for analysis.
  • Extensibility, expert R users can develop and extend their own statistical algorithms to take advantage of Revolution R Enterprise’s new speed and scalability capabilities.

“The R language’s inherent power and extensibility has driven its explosive adoption as the modern system for predictive analytics,” said Norman H. Nie, president and CEO of Revolution Analytics. “We believe that this new Big Data scalability will help R transition from an amazing research and prototyping tool to a production-ready platform for enterprise applications such as quantitative finance and risk management, social media, bioinformatics and telecommunications data analysis.”

Sage Bionetworks is the nonprofit force behind the open-source collaborative effort, Sage Commons, a place where data and disease models can be shared by scientists to better understand disease biology. David Henderson, Director of Scientific Computing at Sage, commented: “At Sage Bionetworks, we need to analyze genomic databases hundreds of gigabytes in size with R. We’re looking forward to using the high-speed data-analysis features of RevoScaleR to dramatically reduce the times it takes us to process these data sets.”

Take Hadoop and Other Big Data Sources to the Next Level

Revolution R Enterprise fits well within the modern ‘Big Data’ architecture by leveraging popular sources such as Hadoop, NoSQL or key value databases, relational databases and data warehouses. These products can be used to store, regularize and do basic manipulation on very large datasets—while Revolution R Enterprise now provides advanced analytics at unparalleled speed and scale: producing speed on speed.

“Together, Hadoop and R can store and analyze massive, complex data,” said Saptarshi Guha, developer of the popular RHIPE R package that integrates the Hadoop framework with R in an automatically distributed computing environment. “Employing the new capabilities of Revolution R Enterprise, we will be able to go even further and compute Big Data regressions and more.”

Platforms and Availability

The new RevoScaleR package will be delivered as part of Revolution R Enterprise 4.0, which will be available for 32-and 64-bit Microsoft Windows in the next 30 days. Support for Red Hat Enterprise Linux (RHEL 5) is planned for later this year.

On its website (http://www.revolutionanalytics.com/bigdata), Revolution Analytics has published performance and scalability benchmarks for Revolution R Enterprise analyzing a 13.2 gigabyte data set of commercial airline information containing more than 123 million rows, and 29 columns.

Additionally, the company will showcase its new Big Data solution in a free webinar on August 25 at 9:00 a.m. Pacific.

Additional Resources

•      Big Data Benchmark whitepaper

•      The Revolution Analytics Roadmap whitepaper

•      Revolutions Blog

•      Download free academic copy of Revolution R Enterprise

•      Visit Inside-R.org for the most comprehensive set of information on R

•      Spread the word: Add a “Download R!” badge on your website

•      Follow @RevolutionR on Twitter

About Revolution Analytics

Revolution Analytics (http://www.revolutionanalytics.com) is the leading commercial provider of software and support for the popular open source R statistics language. Its Revolution R products help make predictive analytics accessible to every type of user and budget. The company is headquartered in Palo Alto, Calif. and backed by North Bridge Venture Partners and Intel Capital.

Media Contact

Chantal Yang
Page One PR, for Revolution Analytics
Tel: +1 415-875-7494

Email:  revolution@pageonepr.com

R and Hadoop

Here is an exciting project for using R on the cloud computing environment ( two of my favorite things). It is called RHIPE

R and Hadoop Integrated Processing Environment v.0.38

cloud

The website source is http://ml.stat.purdue.edu/rhipe/

RHIPE(phonetic spelling: hree-pay’ 1) is a java package that integrates the R environment with Hadoop, the open source implementation of Google’s mapreduce. Using RHIPE it is possible to code map-reduce algorithms in R e.g

m <- function(key,val){
  words <- strsplit(val," +")[[1]]
  wc <- table(words)
  cln <- names(wc)
  names(wc)<-NULL; names(cln)<-NULL;
  return(sapply(1:length(wc),function(r) list(key=cln[r],value=wc[[r]]),simplify=F))
}
r <- function(key,value){
  value <- do.call("rbind",value)
  return(list(list(key=key,value=sum(value))))
}
rhmr(map=m,reduce=r,combiner=T,input.folder="X",output.folder="Y")
rhapply packages the user's request into an R vector object. This is serialized and sent to the RHIPE server. The RHIPE server picks apart the object creating a job request that Hadoop can understand. Each element of the provided list is processed by the users function during the Map stage of mapreduce. The results are returned and if the output is to a file, these results are serialized and written to a Hadoop Sequence file, the values can be read back into R using the rhsq* functions.

2 rhlapply

rhlapply <- function( list.object,
                    func,
                    configure=expression(),
                    output.folder='',
                    shared.files=c(),
                    hadoop.mapreduce=list(),
                    verbose=T,
                    takeAll=T)
list.object
This can either be a list or a single scalar. In case of the former, the function given by func will be applied to each element of list.object. In case of a scalar, the function will be applied to the list 1:n where n is the value of the scalar
func
A function that takes one parameter: an element of the list.
configure
An configuration expression to run before the func is executed. Executed once for every JVM. If you need variables, data frames, use rhsave or rhsave.image , use rhput to copy it to the DFS and then use shared.files

config = expression({
              library(lattice)
              load("mydataset.Rdata")
})
output.folder
Any file that is created by the function is stored in the output.folder. This is deleted first. If not given, the files created will not be copied. For side effect files to be copies create them in tmp e.g pdf("tmp/x.pdf"), note no leading slash.The directory will contain a slew of part* files, as many as there maps. These contain the binary key-value pairs.
shared.files
The function or the preload expression might require the presence resource files e.g *.Rdata files. The user could copy it from the HDFS in the R code or just load it from the local work directory were the files present. This is the role of shared.files. It is a vector of paths to files on the HDFS, each of these will be copied to the work directory where the R code is run. e.g c('/tmp/x.Rdata','/foo.tgz'), then the first file can be loaded via load("x.Rdata") . For those familiar with Hadoop terminology, this is implemented via DistributedCache .
hadoop.mapreduce
a list of Hadoop specific options e.g

list(mapreduce.map.tasks=10,mapreduce.reduce.tasks=3)
takeAll
if takeALL is true, the value returned is a list each entry the return value of the the function, not in order so element 1 of the returned list is not the result of func(list.object=1=) .
verbose
If True, the user will see the job progress in the R console. If False, the web url to the jobtracker will be displayed. Cancelling the command with CTRL-C will not cancel the job, use rhkill for that.
Mapreduce in R.

rhmr <- function(map,reduce,input.folder,configure=list(map=expression(),reduce=expression()),
                close=list(map=expression(),reduce=expression())
                 output.folder='',combiner=F,step=F,
                 shared.files=c(),inputformat="TextInputFormat",
                 outputformat="TextOutputFormat",
                 hadoop.mapreduce=list(),verbose=T,libjars=c())

Execute map reduce algorithms from within R. A discussion of the parameters follow.

input.folder
A folder on the DFS containing the files to process. Can be a vector.
output.folder
A folder on the DFS where output will go to.
inputformat
Either of TextInputFormat or SequenceFileInputFormat. Use the former for text files and the latter for sequence files created from within R or as outputs from RHIPE(e.g rhlapply or rhmr). Note, one can't use any sequence file, they must have been created via a RHIPE function. Custom Input formats are also possible. Download the source and look at code/java/RXTextInputFormat.java
outputformat
Either of TextOutputFormat or SequenceFileOutputFormat. In case of the former, the return value from the mapper or reducer is converted to character and written to disk. The following code is used to convert to character.

paste(key,sep='',collapse=field_separator)

Custom output formats are also possible. Download the source and look at code/java/RXTextOutputFormat.java

If custom formats implement their own writables, it must subclass RXWritable or use one of the writables presents in RHIPE

shared.files
same as in rhlapply, see that for documentation.
verbose
If T, the job progress is displayed. If false, then the job URL is displayed.

At any time in the configure, close, map or reduce function/expression, the variable mapred.task.is.map will be equal to "true" if it is map task,"false" otherwise (both strings) Also, mapred.iswhat is mapper, reducer, combiner in their respective environments.

configure
A list with either one element (an expression) or two elements map and reduce both of which must be expressions. These expressions are called in their respective environments, i.e the map expression is called during the map configure and similarly for the reduce expression. The reduce expression is called for the combiner configure method.If only one list element, the expression is used for both the map and reduce
close
Same as configure .
map
a function that takes two values key and value. Should return a list of lists. Each list entry must contain two elements key and value , e.g

...
ret <- list()
ret[[1]] <-  list(key=c(1,2), value=c('x','b'))
return(ret)

If any of key/value are missing the output is not collected, e.g. return NULL to skip this record. If the input format is a TextInputFormat, the value is the entire line and the key is probably useless to the user( it is a number indicating bytes into the file). If the input format is SequenceFileInputFormat, the key and value are taken from the sequence file.

reduce
Not needed if mapred.reduce.tasks is 0. Takes a key and a list of values( all values emitted from the maps that share the same map output key ). If step is True, then not a list. Must return a list of lists each element of which must have two elements key and value. This collects all the values and sends them to function. If NULL is returned or the return value is not conforming to the above nothing is collected the Hadoop collector.
step
If step is TRUE, then the reduce function is called for every value corresponding to a key that is once for every value.

  • The variable red.status is equal to 1 on the first call.
  • red.status is equal to 0 for every subsequent calls including the last value
  • The reducer function is called one last time with red.status equal to -1. The value is NULL.Anything returned at any of these stages is written to disk The close function is called once every value for a given key has been processed, but returning anything has no effect. To a assign to the global environment use the <<- operator.
combiner
T or F, to use the reducer as a combiner. Using a combiner makes computation more efficient. If combiner is true, the reduce function will be called as a combiner (0 or more times, it may never be called during the combine stage even if combiner is T) .The value of mapred.task.is.map is 'true' or '*'false*' (both strings) if the combiner is being executed as part of the map stage or reduce stage respectively.

Whether knowledge of this is useful or not is something I'm not sure of. However, if combiner is T , keep in mind,your reduce function must be able to handle inputs sent from the map or inputs sent from the reduce function(itself).

libjars
If specifying a custom input/output format, the user might need to specify jar files here.
hadoop.mapreduce
set RHIPE and Hadoop options via this.

1.1 RHIPE Options for mapreduce

Option Default Explanation
rhipejob.rport 8888 The port on which Rserve runs, should be same across all machines
rhipejob.charsxp.short 0 If 1, RHIPE optimize serialization for character vectors. This reduces the length of the serialization
rhipejob.getmapbatches 100 If the reduce/mapper emits several key,values, how many to get from Rserve at a time. A higher number reduce the number of network reads(the network reads are to localhost)
rhipejob.outfmt.is.text 1 if TextInputFormat Must be 1 if the output is textual
rhipejob.textoutput.fieldsep ' ' The field separator for any text based output format
rhipejob.textinput.comment '#' In the TextInputFormat, lines beginning with this are skipped
rhipejob.combinerspill 100,000 The combiner is run after collecting at most this many items
rhipejob.tor.batch 200,000 Number of values for the same key to collate before sending to the Reducer, if you have dollops of memory, set this larger. However, too large and you hit Java's heap space limit
rhipejob.max.count.reduce Java's INT_MAX (about 2BN) the total number of values for a given key to be collected, note the values are not ordered by any variable.
rhipejob.inputformat.keyclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText, when using a Custom InputFormat implement RXWritable and implement the methods
rhipejob.inputformat.valueclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the valueclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText when using a Custom InputFormat implement RXWritable and implement the methods
mapred.input.format.class As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextInputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat specify yours here
rhipejob.outputformat.keyclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the keyclass e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the keyclass must implement RXWritable and
rhipejob.outputformat.valueclass The default is chosen depending on TextInputFormat or SequenceFileInputFormat Provide the full Java URL to the value e.g org.saptarshiguha.rhipe.hadoop.RXWritableText , also the valueclass must implement RXWritable
mapred.output.format.class As above, the default is either org.saptarshiguha.rhipe.hadoop.RXTextOutputFormat or org.apache.hadoop.mapred.SequenceFileInputFormat specify yours here, provide libjars if required

Citation:http://ml.stat.purdue.edu/rhipe/index.html

Great exciting news for the world of computing remotely.

%d bloggers like this: