Ross writes ….
I did the classic map-reduce example: a word count of a flat text file, in this case James Joyce's Ulysses.
curl http://www.gutenberg.org/cache/epub/4300/pg4300.txt > ulysses.txt hadoop dfs -copyFromLocal ~/ulysses.txt /tmp/ulysses.txt hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount /tmp/ulysses.txt /tmp/results hadoop dfs -getmerge /tmp/results ~/results.tsv hadoop dfs -rmr /tmp/results sort --numeric-sort --key=2 --reverse results.tsv | head --lines=5 the 13600 of 8127 and 6542 a 5842 to 4787
I use the Hadoop Streaming API with R (via RHadoop) and Python (via mrjob). I highly recommend these solutions because you can write map-reduce jobs in just a few lines of code.
This is the IBM article on the “Building Hadoop” page.
Assumption: Long sentences and large word lengths mean complexity. Told you, example.
Read up on the IBM article about the benefits of this example. I just wanted to know how python front ended the Hadoop cluster. First we download some books, rename the files, and note one is compressed.
wget -U firefox http://www.gutenberg.org/cache/epub/76/pg76.txt wget -U firefox http://www.gutenberg.org/cache/epub/3285/pg3285.txt mv pg3285.txt DS.txt mv pg76.txt HF.txt gzip DS.txt
Then we stage them up in HDFS.
hadoop fs -put DS.txt.gz /tmp hadoop fs -put HF.txt /tmp
And look at what's up there.
hadoop fs -ls /tmp -rw-r--r-- 3 hmeij07 supergroup 459378 2013-05-23 14:24 /tmp/DS.txt.gz -rw-r--r-- 3 hmeij07 supergroup 610155 2013-05-23 14:24 /tmp/HF.txt drwxrwxrwt - mapred supergroup 0 2013-05-16 13:58 /tmp/hadoop-mapred drwxr-xr-x - qactweet1 supergroup 0 2013-05-17 12:52 /tmp/ngrams -rw-r--r-- 3 hmeij07 supergroup 1573150 2013-05-20 14:26 /tmp/ulysses.txt
Next we build two files in our local home directory; a mapper and a reduce file
vi mapper.py # with the following contents ...(python=indentation matters!) #!/usr/bin/env python import sys # read stdin for linein in sys.stdin: # strip blanks linein = linein.strip() # split into words mywords = linein.split() # loop on mywords, output the length of each word for word in mywords: # the reducer just cares about the first column, # normally there is a key - value pair print '%s %s' % (len(word), 0) vi statsreducer.awk # with the following contents ... awk '{delta = $1 - avg; avg += delta / NR; \ mean2 += delta * ($1 - avg); sum=$1+sum } \ END { print NR, sum/NR, sqrt(mean2 / NR); }' # finally, make them executable chmod u+x mapper.py chmod u+x statsreducer.awk
Let us see what these programs do on command line:
# basic word length and occurrence count # mapper file zcat DS.txt.gz | ./mapper.py 9 0 2 0 3 0 5 0 10 0 2 0 4 0 5 0 3 0 7 0 # reducer filer, L2R: # NR - the number of words in total # sum/NR - the average word length # sqrt(mean2/NR) - the standard deviation zcat DS.txt.gz | ./mapper.py | sort | ./statsreducer.awk 10 5 2.68328 # consult IBM article on how to interpret this
Next we run these commands in the Hadoop environment. The -output naming is unconventional, the *.txt output is actually a directory, not a file. Please note that everything prefixed by a tilde (~) is in our home directory not on Hadoop's filesystem.
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \ -input /tmp/HF.txt -output /tmp/HFstats.txt \ -file ~/mapper.py -file ~/statsreducer.awk \ -mapper ~/mapper.py -reducer ~/statsreducer.awk hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \ -input /tmp/DS.txt.gz -output /tmp/DSstats.txt \ -file ~/mapper.py -file ~/statsreducer.awk \ -mapper ~/mapper.py -reducer ~/statsreducer.awk
Lets look at the output. One is wordier than the other.
hadoop fs -cat /tmp/HFstats.txt/part-00000 14 4 1.77281 hadoop fs -cat /tmp/DSstats.txt/part-00000 10 5 2.68328
Clean up.
hadoop fs -rmr /tmp/DFstats.txt/* # for some reason -rmr does not imply remove recursively, bug? hadoop fs -rmr /tmp/HFstats.txt Deleted hdfs://qactweet1:54310/tmp/HFstats.txt
Ok, so I get this. We should be able to this with other languages and databases.
Here are my steps to get this working (with lots of Ross's help) for rmr2 and rhdfs installation. Do this on all nodes.
R-core-3.0.0-2.el6.x86_64 R-java-devel-3.0.0-2.el6.x86_64 R-devel-3.0.0-2.el6.x86_64 R-core-devel-3.0.0-2.el6.x86_64 R-java-3.0.0-2.el6.x86_64 R-3.0.0-2.el6.x86_64
Make sure java is installed properly (the one you used for Hadoop itself) and set ENV in /etc/profile
export JAVA_HOME="/usr/java/latest" export PATH=/usr/java/latest/bin:$PATH export HADOOP_HOME=/usr/lib/hadoop-0.20 export HADOOP_CMD=/usr/bin/hadoop export HADOOP_STREAMING=/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar
I noticed that at soome point openJDK is reinstalled so I managed these links
lrwxrwxrwx 1 root root 24 May 27 10:41 /usr/bin/jar -> /usr/java/latest/bin/jar lrwxrwxrwx 1 root root 21 May 27 09:47 /usr/bin/jar-alt -> /etc/alternatives/jar lrwxrwxrwx 1 root root 30 May 27 10:41 /usr/bin/jarsigner -> /usr/java/latest/bin/jarsigner lrwxrwxrwx 1 root root 27 May 27 09:47 /usr/bin/jarsigner-alt -> /etc/alternatives/jarsigner lrwxrwxrwx 1 root root 25 May 27 10:35 /usr/bin/java -> /usr/java/latest/bin/java lrwxrwxrwx 1 root root 22 May 27 09:47 /usr/bin/java-alt -> /etc/alternatives/java lrwxrwxrwx 1 root root 26 May 27 10:38 /usr/bin/javac -> /usr/java/latest/bin/javac lrwxrwxrwx 1 root root 23 May 27 09:47 /usr/bin/javac-alt -> /etc/alternatives/javac lrwxrwxrwx 1 root root 25 May 27 09:47 /usr/bin/javadoc -> /etc/alternatives/javadoc lrwxrwxrwx 1 root root 26 May 28 09:37 /usr/bin/javah -> /usr/java/latest/bin/javah lrwxrwxrwx 1 root root 23 May 27 09:47 /usr/bin/javah-alt -> /etc/alternatives/javah lrwxrwxrwx 1 root root 26 May 27 10:39 /usr/bin/javap -> /usr/java/latest/bin/javap lrwxrwxrwx 1 root root 23 May 27 09:47 /usr/bin/javap-alt -> /etc/alternatives/javap lrwxrwxrwx 1 root root 27 May 27 10:40 /usr/bin/javaws -> /usr/java/latest/bin/javaws lrwxrwxrwx. 1 root root 28 May 15 14:56 /usr/bin/javaws-alt -> /usr/java/default/bin/javaws
So if commands which java
and java -version
return the proper information, reconfigure java in R. At the OS prompt
# at OS R CMD javareconf # in R install.packages('rJava')
You could also set java in this file: $HADOOP_HOME/conf/hadoop-env.sh
When that successful, add dependencies:
See the following files for current lists of dependencies:
https://github.com/RevolutionAnalytics/rmr2/blob/master/pkg/DESCRIPTION https://github.com/RevolutionAnalytics/rhdfs/blob/master/pkg/DESCRIPTION
Enter R and issues the command
install.packages(c("Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2"))
If Rccp is a problem: locate and install an older version of Rcpp in the CRAN archives (http://cran.r-project.org/src/contrib/Archive/Rcpp/):
# in R install.packages("int64") # at OS wget http://cran.r-project.org/src/contrib/Archive/Rcpp/Rcpp_0.9.8.tar.gz R CMD INSTALL Rcpp_0.9.8.tar.gz
Finally the RHadoop stuff, at The OS level
wget -O rmr-2.2.0.tar.gz http://goo.gl/bhCU6 wget -O rhdfs_1.0.5.tar.gz https://github.com/RevolutionAnalytics/rhdfs/blob/master/build/rhdfs_1.0.5.tar.gz?raw=true R CMD INSTALL rmr-2.2.0.tar.gz R CMD INSTALL rhdfs_1.0.5.tar.gz
Verify
Type 'q()' to quit R. > library(rmr2) Loading required package: Rcpp Loading required package: RJSONIO Loading required package: digest Loading required package: functional Loading required package: stringr Loading required package: plyr Loading required package: reshape2 > library(rhdfs) Loading required package: rJava HADOOP_CMD=/usr/bin/hadoop Be sure to run hdfs.init() > sessionInfo() R version 3.0.0 (2013-04-03) Platform: x86_64-redhat-linux-gnu (64-bit) locale: [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C [3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8 [5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8 [7] LC_PAPER=C LC_NAME=C [9] LC_ADDRESS=C LC_TELEPHONE=C [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C attached base packages: [1] stats graphics grDevices utils datasets methods base other attached packages: [1] rhdfs_1.0.5 rJava_0.9-4 rmr2_2.2.0 reshape2_1.2.2 plyr_1.8 [6] stringr_0.6.2 functional_0.4 digest_0.6.3 RJSONIO_1.0-3 Rcpp_0.10.3
Test
Tutorial documentation: https://github.com/RevolutionAnalytics/rmr2/blob/master/docs/tutorial.md
R script:
#!/usr/bin/Rscript library(rmr2) library(rhdfs) hdfs.init() small.ints = to.dfs(1:1000) mapreduce(input = small.ints, map = function(k, v) cbind(v, v^2))
Then Hbase for Rhbase:
http://hbase.apache.org/book/configuration.html
But first Trift, the language interface to the database Hbase:
yum install openssl098e
Download Trift: http://thrift.apache.org/download/
yum install byacc -y yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel ./configure make make install export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/local/lib/pkgconfig/ pkg-config --cflags thrift cp -p /usr/local/lib/libthrift-0.9.0.so /usr/lib/ HBASE_ROOT/bin/hbase thrift start & lsof -i:9090 that is server, port 9095 is monitor
Configure for distributed environment: http://hbase.apache.org/book/standalone_dist.html#standalone
install.packages('rJava') install.packages("int64") install.packages(c("Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2")) wget http://cran.r-project.org/src/contrib/Archive/Rcpp/Rcpp_0.9.8.tar.gz wget -O rmr-2.2.0.tar.gz http://goo.gl/bhCU6 wget -O rhdfs_1.0.5.tar.gz https://github.com/RevolutionAnalytics/rhdfs/blob/master/build/rhdfs_1.0.5.tar.gz?raw=true R CMD INSTALL Rcpp_0.9.8.tar.gz R CMD INSTALL rmr-2.2.0.tar.gz R CMD INSTALL rhdfs_1.0.5.tar.gz R CMD INSTALL rhbase_1.2.0.tar.gz yum install openssl098e openssl openssl-devel flex boost ruby ruby-libs ruby-devel php php-libs php-devel \ automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel b2 install --prefix=/usr/local thrift: ./configure --prefix=/usr/local --with-boost=/usr/local; make make install cp -p /usr/local/lib/libthrift-0.9.0.so /usr/lib/ cd /usr/lib; ln -s libthrift-0.9.0.so libthrift.so SKIP (nasty replaced with straight copy, could go to nodes) http://www.cpan.org 'o conf commit' cpan> install Hadoop::Streaming whitetail only, unpack hbase, edit conf/hbase-site.xml, add to /etc/rc.local also edit conf/regionservers copy /usr/local/hbase-version-dir to nodes:/usr/local <property> <name>hbase.zookeeper.quorum</name> <value>example1,example2,example3</value> <description>The directory shared by RegionServers. </description> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/export/zookeeper</value> <description>Property from ZooKeeper's config zoo.cfg. The directory where the snapshot is stored. </description> </property>
First we create a script for the map step, note that this only print individual words back
vi wc_mapper.pl # make user executable and add contents below #!/usr/bin/perl -w # read in a file of text, print word by word while(<>) { @line = split; foreach $word (@line) { print "$word\n"; } }
Next we create a script for the reduce step, note that the use of a hash avoids the sort
vi wc_reducer.pl # make user executable and add contents below #!/usr/bin/perl -w # store words in hash and print key-value results while (<>) { chomp; $seen{$_}++; } foreach $key (keys %seen) { print "$seen{$key} $key\n"; }
Next test this on the command line
perl wc_mapper.pl HF.txt | perl wc_reducer.pl | sort -rn | head 6050 and 4708 the 2935 a 2903 to 2475 I 1942 was 1733 of 1427 it 1372 he 1367 in
Load the text input file up to Hadoop's HDFS and submit the job
hadoop fs -put HF.txt /tmp hadoop dfs -ls /tmp # results Found 2 items -rw-r--r-- 3 hmeij07 supergroup 459378 2013-05-23 14:24 /tmp/DS.txt.gz -rw-r--r-- 3 hmeij07 supergroup 610155 2013-05-23 14:24 /tmp/HF.txt # submit, note that -mapper and -reducer options are paired with a -file option # pointing to files in our home dirrectory (not HDFS). these files will be copied # to each datanode for execution, and then the results will be tabulated hadoop jar \ /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \ -input /tmp/HF.txt -output /tmp/HF.out \ -file ~/wc_mapper.pl -mapper ~/wc_mapper.pl \ -file ~/wc_reducer.pl -reducer ~/wc_reducer.pl hadoop fs -ls /tmp/HF.out # results Found 3 items -rw-r--r-- 3 hmeij07 supergroup 0 2013-05-24 15:10 /tmp/HF.out/_SUCCESS drwxrwxrwt - hmeij07 supergroup 0 2013-05-24 15:10 /tmp/HF.out/_logs -rw-r--r-- 3 hmeij07 supergroup 161788 2013-05-24 15:10 /tmp/HF.out/part-00000 hadoop fs -cat /tmp/HF.out/part-00000 | sort -rn | head # results 6050 and 4708 the 2935 a 2903 to 2475 I 1942 was 1733 of 1427 it 1372 he 1367 in # clean up space
Adopted from http://autofei.wordpress.com/category/java/hadoop-code/
First, do this twice in shell
for i in `seq 1 1000000` > do > echo -e "$i,$RANDOM" >> v_data_large.txt > done
The we'll use the mapper
#!/usr/bin/perl # convert comma delimited to tab delimited while($line=<STDIN>){ @fields = split(/,/, $line); if ($fields[0] eq '#') { next;} if($fields[0] && $fields[1]){ print "$fields[0]\t$fields[1]"; } }
And the reducer from the web site
#!/usr/bin/perl $lastKey=""; $product=1; $count=0; while($line=<STDIN>){ @fields=split(/\t/, $line); $key = $fields[0]; $value = $fields[1]; if($lastKey ne "" && $key ne $lastKey){ if($count==2){ print "$lastKey\t$product\n"; } $product=$value; $lastKey=$key; $count=1; } else{ $product=$product*$value; $lastKey=$key; $count++; } } #the last key if($count==2){ print "$lastKey\t$product\n";
And submit the job
hadoop jar \ /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \ -input /tmp/v_data.txt -output /tmp/v.out \ -file ~/v_mapper.pl -mapper ~/v_mapper.pl \ -file ~/v_reducer.pl -reducer ~/v_reducer.pl
And that works.
yum install cpan cpan> install Hadoop::Streaming Installing /usr/local/share/perl5/Hadoop/Streaming.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Mapper.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Combiner.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Role/Iterator.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Role/Emitter.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input/Iterator.pm Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input/ValuesIterator.pm
“ Where Hadoop is concerned the latest release already runs on the platform, technically, but it's limited to a SAS-customized version of the open source software based on Apache Hadoop v1.0 (also known as version 0.20.20x). SAS says HPA will run on mainstream distributions of Hadoop from the likes of Cloudera, with an upcoming December release of HPA that will based on Apache Hadoop v2.0 (also known as version 0.23).
Whether you're using SAS's current Hadoop software or plan to embrace the v2.0 release, HPA provides a graphical user interface that lets you tap HDFS, MapReduce, Pig, and Hive to apply SAS analyses to the vast data sets residing on Hadoop. MapReduce is the primary model for processing data on Hadoop. Pig is an open source Apache programming tool and language for writing MapReduce jobs. Hive is data warehousing infrastructure built on top of Hadoop that supports data summarization, query, and analysis. HPA also supports Pig and MapReduce code generation, visual editing and syntax checking. Finally SAS Data Integration Studio data transformations and SAS DataFlux data quality routines have also been adapted to Hadoop. ”