User Tools

Site Tools


cluster:115


Back

Use Hadoop Cluster

Word count, vanilla

Ross writes ….

I did the classic map-reduce example: a word count of a flat text file, in this case James Joyce's Ulysses.

  • First, I downloaded the data.
  • Second, I copied the data from my home folder to the Hadoop Distributed File System (HDFS).
  • Third, I ran the word count example bundled with the Cloudera Hadoop distribution.
  • Fourth, I copied and merged the results from the HDFS back to the local filesystem.
  • Finally, I inspected the output (reformatted below).
curl http://www.gutenberg.org/cache/epub/4300/pg4300.txt > ulysses.txt
hadoop dfs -copyFromLocal ~/ulysses.txt /tmp/ulysses.txt
hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount /tmp/ulysses.txt /tmp/results
hadoop dfs -getmerge /tmp/results ~/results.tsv
hadoop dfs -rmr /tmp/results

sort --numeric-sort --key=2 --reverse results.tsv | head --lines=5
the    13600
of      8127
and     6542
a       5842
to      4787

I use the Hadoop Streaming API with R (via RHadoop) and Python (via mrjob). I highly recommend these solutions because you can write map-reduce jobs in just a few lines of code.

Python Wordcount

This is the IBM article on the “Building Hadoop” page.

Assumption: Long sentences and large word lengths mean complexity. Told you, example.

Read up on the IBM article about the benefits of this example. I just wanted to know how python front ended the Hadoop cluster. First we download some books, rename the files, and note one is compressed.

wget -U firefox http://www.gutenberg.org/cache/epub/76/pg76.txt                       
wget -U firefox http://www.gutenberg.org/cache/epub/3285/pg3285.txt                   
mv pg3285.txt DS.txt                                                                  
mv pg76.txt HF.txt                                                                    
gzip DS.txt 

Then we stage them up in HDFS.

hadoop fs -put  DS.txt.gz /tmp                                                        
hadoop fs -put  HF.txt /tmp                                                           

And look at what's up there.

hadoop fs -ls /tmp
-rw-r--r--   3 hmeij07   supergroup     459378 2013-05-23 14:24 /tmp/DS.txt.gz
-rw-r--r--   3 hmeij07   supergroup     610155 2013-05-23 14:24 /tmp/HF.txt  
drwxrwxrwt   - mapred    supergroup          0 2013-05-16 13:58 /tmp/hadoop-mapred
drwxr-xr-x   - qactweet1 supergroup          0 2013-05-17 12:52 /tmp/ngrams      
-rw-r--r--   3 hmeij07   supergroup    1573150 2013-05-20 14:26 /tmp/ulysses.txt  

Next we build two files in our local home directory; a mapper and a reduce file

vi mapper.py # with the following contents ...(python=indentation matters!)

#!/usr/bin/env python                                                         
import sys                                                                    

# read stdin
for linein in sys.stdin:
# strip blanks         
      linein = linein.strip()
# split into words     
      mywords = linein.split()
# loop on mywords, output the length of each word
for word in mywords:                            
# the reducer just cares about the first column,
# normally there is a key - value pair          
     print '%s %s' % (len(word), 0)       


vi statsreducer.awk  # with the following contents ...

awk '{delta = $1 - avg; avg += delta / NR; \
mean2 += delta * ($1 - avg); sum=$1+sum } \
END { print NR, sum/NR, sqrt(mean2 / NR); }'


# finally, make them executable
chmod u+x mapper.py                                    
chmod u+x statsreducer.awk       

Let us see what these programs do on command line:

# basic word length and occurrence count

# mapper file
zcat DS.txt.gz | ./mapper.py                
9 0                                                                
2 0                                                                
3 0                                                                
5 0                                                                
10 0                                                               
2 0                                                                
4 0                                                                
5 0                                                                
3 0                                                                
7 0 

# reducer filer, L2R:
#  NR - the number of words in total
#  sum/NR - the average word length
# sqrt(mean2/NR) - the standard deviation 
           
zcat DS.txt.gz | ./mapper.py | sort | ./statsreducer.awk
10 5 2.68328 

# consult IBM article on how to interpret this                 

Next we run these commands in the Hadoop environment. The -output naming is unconventional, the *.txt output is actually a directory, not a file. Please note that everything prefixed by a tilde (~) is in our home directory not on Hadoop's filesystem.

hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \
-input /tmp/HF.txt -output /tmp/HFstats.txt \
-file ~/mapper.py -file ~/statsreducer.awk \
-mapper ~/mapper.py  -reducer ~/statsreducer.awk

hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \
-input /tmp/DS.txt.gz  -output /tmp/DSstats.txt \
-file ~/mapper.py -file ~/statsreducer.awk \
-mapper ~/mapper.py  -reducer ~/statsreducer.awk

Lets look at the output. One is wordier than the other.

hadoop fs -cat /tmp/HFstats.txt/part-00000
14 4 1.77281

 hadoop fs -cat /tmp/DSstats.txt/part-00000
10 5 2.68328

Clean up.

hadoop fs -rmr /tmp/DFstats.txt/*
# for some reason -rmr does not imply remove recursively, bug?

hadoop fs -rmr /tmp/HFstats.txt
Deleted hdfs://qactweet1:54310/tmp/HFstats.txt

Ok, so I get this. We should be able to this with other languages and databases.

Rhadoop

Here are my steps to get this working (with lots of Ross's help) for rmr2 and rhdfs installation. Do this on all nodes.

  • Add EPEL repository to your yum installation, then
  • yum install R, which pulls in
R-core-3.0.0-2.el6.x86_64
R-java-devel-3.0.0-2.el6.x86_64
R-devel-3.0.0-2.el6.x86_64
R-core-devel-3.0.0-2.el6.x86_64
R-java-3.0.0-2.el6.x86_64
R-3.0.0-2.el6.x86_64

Make sure java is installed properly (the one you used for Hadoop itself) and set ENV in /etc/profile

export JAVA_HOME="/usr/java/latest"
export PATH=/usr/java/latest/bin:$PATH

export HADOOP_HOME=/usr/lib/hadoop-0.20
export HADOOP_CMD=/usr/bin/hadoop
export HADOOP_STREAMING=/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar

I noticed that at soome point openJDK is reinstalled so I managed these links

lrwxrwxrwx  1 root root 24 May 27 10:41 /usr/bin/jar -> /usr/java/latest/bin/jar
lrwxrwxrwx  1 root root 21 May 27 09:47 /usr/bin/jar-alt -> /etc/alternatives/jar
lrwxrwxrwx  1 root root 30 May 27 10:41 /usr/bin/jarsigner -> /usr/java/latest/bin/jarsigner
lrwxrwxrwx  1 root root 27 May 27 09:47 /usr/bin/jarsigner-alt -> /etc/alternatives/jarsigner
lrwxrwxrwx  1 root root 25 May 27 10:35 /usr/bin/java -> /usr/java/latest/bin/java
lrwxrwxrwx  1 root root 22 May 27 09:47 /usr/bin/java-alt -> /etc/alternatives/java
lrwxrwxrwx  1 root root 26 May 27 10:38 /usr/bin/javac -> /usr/java/latest/bin/javac
lrwxrwxrwx  1 root root 23 May 27 09:47 /usr/bin/javac-alt -> /etc/alternatives/javac
lrwxrwxrwx  1 root root 25 May 27 09:47 /usr/bin/javadoc -> /etc/alternatives/javadoc
lrwxrwxrwx  1 root root 26 May 28 09:37 /usr/bin/javah -> /usr/java/latest/bin/javah
lrwxrwxrwx  1 root root 23 May 27 09:47 /usr/bin/javah-alt -> /etc/alternatives/javah
lrwxrwxrwx  1 root root 26 May 27 10:39 /usr/bin/javap -> /usr/java/latest/bin/javap
lrwxrwxrwx  1 root root 23 May 27 09:47 /usr/bin/javap-alt -> /etc/alternatives/javap
lrwxrwxrwx  1 root root 27 May 27 10:40 /usr/bin/javaws -> /usr/java/latest/bin/javaws
lrwxrwxrwx. 1 root root 28 May 15 14:56 /usr/bin/javaws-alt -> /usr/java/default/bin/javaws

So if commands which java and java -version return the proper information, reconfigure java in R. At the OS prompt

# at OS
R CMD javareconf
# in R
install.packages('rJava')

You could also set java in this file: $HADOOP_HOME/conf/hadoop-env.sh

When that successful, add dependencies:

See the following files for current lists of dependencies:

https://github.com/RevolutionAnalytics/rmr2/blob/master/pkg/DESCRIPTION https://github.com/RevolutionAnalytics/rhdfs/blob/master/pkg/DESCRIPTION

Enter R and issues the command

install.packages(c("Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2"))

If Rccp is a problem: locate and install an older version of Rcpp in the CRAN archives (http://cran.r-project.org/src/contrib/Archive/Rcpp/):

# in R
install.packages("int64")
# at OS
wget http://cran.r-project.org/src/contrib/Archive/Rcpp/Rcpp_0.9.8.tar.gz
R CMD INSTALL Rcpp_0.9.8.tar.gz

Finally the RHadoop stuff, at The OS level

wget -O rmr-2.2.0.tar.gz http://goo.gl/bhCU6
wget -O rhdfs_1.0.5.tar.gz https://github.com/RevolutionAnalytics/rhdfs/blob/master/build/rhdfs_1.0.5.tar.gz?raw=true

R CMD INSTALL rmr-2.2.0.tar.gz
R CMD INSTALL rhdfs_1.0.5.tar.gz

Verify

Type 'q()' to quit R.

> library(rmr2)
Loading required package: Rcpp
Loading required package: RJSONIO
Loading required package: digest
Loading required package: functional
Loading required package: stringr
Loading required package: plyr
Loading required package: reshape2
> library(rhdfs)
Loading required package: rJava

HADOOP_CMD=/usr/bin/hadoop

Be sure to run hdfs.init()
> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-redhat-linux-gnu (64-bit)

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8
 [7] LC_PAPER=C                 LC_NAME=C
 [9] LC_ADDRESS=C               LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base

other attached packages:
 [1] rhdfs_1.0.5    rJava_0.9-4    rmr2_2.2.0     reshape2_1.2.2 plyr_1.8
 [6] stringr_0.6.2  functional_0.4 digest_0.6.3   RJSONIO_1.0-3  Rcpp_0.10.3

Test

Tutorial documentation: https://github.com/RevolutionAnalytics/rmr2/blob/master/docs/tutorial.md

R script:

#!/usr/bin/Rscript

library(rmr2)
library(rhdfs)
hdfs.init()

small.ints = to.dfs(1:1000)
mapreduce(input = small.ints, map = function(k, v) cbind(v, v^2))

Then Hbase for Rhbase:

http://hbase.apache.org/book/configuration.html

But first Trift, the language interface to the database Hbase:

yum install openssl098e

Download Trift: http://thrift.apache.org/download/

yum install byacc -y
yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel

./configure
make
make install
export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/local/lib/pkgconfig/
pkg-config --cflags thrift
cp -p /usr/local/lib/libthrift-0.9.0.so /usr/lib/

HBASE_ROOT/bin/hbase thrift start &
lsof -i:9090 that is server, port 9095 is monitor

Configure for distributed environment: http://hbase.apache.org/book/standalone_dist.html#standalone

  • used 3 zookeepers with quorum, see config example online
  • start with rolling_restart, the start & stop have a timing issue
  • /hbase owened by root:root
  • permissions reset on /hdfs, not sure why
  • also use /sanscratch/zookeepers
  • some more notes below

install.packages('rJava')
install.packages("int64")
install.packages(c("Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2"))

wget http://cran.r-project.org/src/contrib/Archive/Rcpp/Rcpp_0.9.8.tar.gz
wget -O rmr-2.2.0.tar.gz http://goo.gl/bhCU6
wget -O rhdfs_1.0.5.tar.gz https://github.com/RevolutionAnalytics/rhdfs/blob/master/build/rhdfs_1.0.5.tar.gz?raw=true

R CMD INSTALL Rcpp_0.9.8.tar.gz
R CMD INSTALL rmr-2.2.0.tar.gz
R CMD INSTALL rhdfs_1.0.5.tar.gz
R CMD INSTALL rhbase_1.2.0.tar.gz

yum install openssl098e openssl openssl-devel flex boost ruby ruby-libs ruby-devel php php-libs php-devel \
automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel

b2 install --prefix=/usr/local

thrift: ./configure --prefix=/usr/local --with-boost=/usr/local; make
make install

cp -p /usr/local/lib/libthrift-0.9.0.so /usr/lib/
cd /usr/lib; ln -s libthrift-0.9.0.so libthrift.so

SKIP (nasty replaced with straight copy, could go to nodes)
http://www.cpan.org
'o conf commit'
cpan> install Hadoop::Streaming 

whitetail only, unpack hbase, edit conf/hbase-site.xml, add to /etc/rc.local
also edit conf/regionservers
copy /usr/local/hbase-version-dir to nodes:/usr/local

  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>example1,example2,example3</value>
    <description>The directory shared by RegionServers.
    </description>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/export/zookeeper</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
  </property>

Perl Hadoop's native Streaming

First we create a script for the map step, note that this only print individual words back

vi wc_mapper.pl  # make user executable and add contents below

#!/usr/bin/perl -w

# read in a file of text, print word by word

while(<>) {

        @line = split;
        foreach $word (@line) {
                print "$word\n";
        }
}

Next we create a script for the reduce step, note that the use of a hash avoids the sort

vi wc_reducer.pl  # make user executable and add contents below

#!/usr/bin/perl -w

# store words in hash and print key-value results

while (<>) {
        chomp;
        $seen{$_}++;
}

foreach $key (keys %seen) {
        print "$seen{$key} $key\n";
}

Next test this on the command line

perl wc_mapper.pl HF.txt | perl wc_reducer.pl | sort -rn | head

6050 and
4708 the
2935 a
2903 to
2475 I
1942 was
1733 of
1427 it
1372 he
1367 in

Load the text input file up to Hadoop's HDFS and submit the job

hadoop fs -put  HF.txt /tmp 
hadoop dfs -ls /tmp

# results
Found 2 items
-rw-r--r--   3 hmeij07   supergroup     459378 2013-05-23 14:24 /tmp/DS.txt.gz
-rw-r--r--   3 hmeij07   supergroup     610155 2013-05-23 14:24 /tmp/HF.txt

# submit, note that -mapper and -reducer options are paired with a -file option
# pointing to files in our home dirrectory (not HDFS). these files will be copied 
# to each datanode for execution, and then the results will be tabulated

hadoop jar \
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \
-input /tmp/HF.txt -output /tmp/HF.out \
-file ~/wc_mapper.pl -mapper ~/wc_mapper.pl \
-file ~/wc_reducer.pl -reducer ~/wc_reducer.pl

hadoop fs -ls /tmp/HF.out
# results
Found 3 items
-rw-r--r--   3 hmeij07 supergroup          0 2013-05-24 15:10 /tmp/HF.out/_SUCCESS
drwxrwxrwt   - hmeij07 supergroup          0 2013-05-24 15:10 /tmp/HF.out/_logs
-rw-r--r--   3 hmeij07 supergroup     161788 2013-05-24 15:10 /tmp/HF.out/part-00000


hadoop fs -cat /tmp/HF.out/part-00000 | sort -rn | head
# results
6050 and
4708 the
2935 a
2903 to
2475 I
1942 was
1733 of
1427 it
1372 he
1367 in

# clean up space

Perl Hadoop's native Streaming #2

Adopted from http://autofei.wordpress.com/category/java/hadoop-code/

  • Create vectors X = [x1,x2, …] and Y = [y1,y2, …]
  • And solve the product Z = [x1*y1, x2*y2, …]

First, do this twice in shell

for i in `seq 1 1000000`
> do
> echo -e "$i,$RANDOM" >> v_data_large.txt
> done

The we'll use the mapper

#!/usr/bin/perl
 
# convert comma delimited to tab delimited

while($line=<STDIN>){
        @fields = split(/,/, $line);
        if ($fields[0] eq '#') { next;}
        if($fields[0] && $fields[1]){
                print "$fields[0]\t$fields[1]";
        }
}

And the reducer from the web site

#!/usr/bin/perl

$lastKey="";
$product=1;
$count=0;

while($line=<STDIN>){
@fields=split(/\t/, $line);
$key = $fields[0];
$value = $fields[1];
if($lastKey ne "" && $key ne $lastKey){
if($count==2){
print "$lastKey\t$product\n";
}
$product=$value;
$lastKey=$key;
$count=1;
}
else{
$product=$product*$value;
$lastKey=$key;
$count++;
}
}
#the last key
if($count==2){
print "$lastKey\t$product\n";

And submit the job

 hadoop jar \
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u6.jar \
-input /tmp/v_data.txt  -output /tmp/v.out \
-file ~/v_mapper.pl -mapper ~/v_mapper.pl \
-file ~/v_reducer.pl -reducer ~/v_reducer.pl 

And that works.

Perl Hadoop::Streaming

  • All nodes
yum install cpan
cpan> install Hadoop::Streaming 


Installing /usr/local/share/perl5/Hadoop/Streaming.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Mapper.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Combiner.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Role/Iterator.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Role/Emitter.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input/Iterator.pm
Installing /usr/local/share/perl5/Hadoop/Streaming/Reducer/Input/ValuesIterator.pm
  • How to use this?

MySQL

  • to come

SAS

  • I think we'll wait for the next version, to beta'ish

http://www.informationweek.com/software/business-intelligence/sas-gets-hip-to-hadoop-for-big-data/240009035?pgno=1

“ Where Hadoop is concerned the latest release already runs on the platform, technically, but it's limited to a SAS-customized version of the open source software based on Apache Hadoop v1.0 (also known as version 0.20.20x). SAS says HPA will run on mainstream distributions of Hadoop from the likes of Cloudera, with an upcoming December release of HPA that will based on Apache Hadoop v2.0 (also known as version 0.23).

Whether you're using SAS's current Hadoop software or plan to embrace the v2.0 release, HPA provides a graphical user interface that lets you tap HDFS, MapReduce, Pig, and Hive to apply SAS analyses to the vast data sets residing on Hadoop. MapReduce is the primary model for processing data on Hadoop. Pig is an open source Apache programming tool and language for writing MapReduce jobs. Hive is data warehousing infrastructure built on top of Hadoop that supports data summarization, query, and analysis. HPA also supports Pig and MapReduce code generation, visual editing and syntax checking. Finally SAS Data Integration Studio data transformations and SAS DataFlux data quality routines have also been adapted to Hadoop. ”


Back

cluster/115.txt · Last modified: 2013/09/10 19:04 by hmeij