HADOOP

Apache Hadoop is an open-source software framework for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware.

Hadoop was originally designed for shared-nothing systems; for example, a cluster where each node has its own CPU, memory, and disk space. To run Hadoop jobs on HPC clusters which have shared/parallel file system such as Panasas, you must use myhadoop. myhadoop allows you to submit a Hadoop job as a batch job to a scheduler.

Running Hadoop on the HPC Cluster

In the following, You need to  1 need be done only once, whereas steps 2, 3, and 4 should be wrapped in your job submit script.

Step 1: Install Apache hadoop-1.2.1 and myhadoop-0.30 at your home directory.

First, obtain the source code for Hadoop:

$ cd ~
$ mkdir hadoop-stack
$ cp /panfs/storage.local/opt/hadoop/hadoop-1.2.1-bin.tar.gz  ./hadoop-stack/
$ cp /panfs/storage.local/opt/hadoop/v0.30.tar.gz                      ./hadoop-stack/
$ cd hadoop-stack
$ tar xvfz hadoop-1.2.1-bin.tar.gz
$ tar xvfz v0.30.tar.gz

Next, apply patches created by myhadoop to a few hadoop configuration files:

$ cd hadoop-1.2.1/conf
$ patch < ../../myhadoop-0.30/myhadoop-1.2.1.patch

This will produce the following output:

 patching file core-site.xml
 patching file hdfs-site.xml
 patching file mapred-site.xml

Add umask 0022 to the ~/.bashrc file in your home directory:

echo -e "\n#For hadoop\numask 0022" >> ~/.bashrc && source ~/.bashrc

Note. Currently there is a copy of hadoop-1.2.1 installed at the following directory:

/panfs/storage.local/opt/hadoop

However, since it is quite easy to install Hadoop and Myhadoop, we suggest you to install your own version to your home directory to avoid possible interference with other users.

Step 2. Start up a Hadoop Cluster

The following code snippets should all become part of your submit script.  Refer to Example 1 below for a complete example.

Myhadoop bridges the gap between Hadoop and Batch Scheduling Systems (e.g., our scheduler, Slurm). When you submit a job using a Slurm script, the scheduler will allocate resources automatically.  Myhadoop will configure the Hadoop distributed file system (HDFS) dynamically according to the computing resources allocated to you.  This means that a new HDFS will be created each time your submit a job.  Your Hadoop file system will be created/maintained in your Panasas volume.  This means any Hadoop work your job performs must respect your Panasas quota.

Define a few environmental variables (DO NOT change the names of the following environmental variables):

$ export JAVA_HOME=/usr
$ export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
$ export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
$ export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH
Create a directory to store configuration files.  Every time you start a Hadoop cluster, there will be some (per-job) configuration files.
 $ mkdir -p  $HOME/hadoop/config
 $ export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID

Also create need a directory to store your hdfs (hadoop distributed file system) data:

 $ mkdir -p  $HOME/hadoop/hdfs
 $ export MH_PERSIST_DIR=$HOME/hadoop/hdfs

MH_SCRATCH_DIR is the directory for node-local scratch space:

$ export MH_SCRATCH_DIR=/tmp

(this is needed even though there is no node-local disk space for HPC cluster.)

d. Running myhadoop-configure.sh (must under persistent mode, see the -p option)

$ myhadoop-configure.sh -s $ MH_SCRATCH_DIR -p $MH_PERSIST_DIR

e. Starting up a Hadoop cluster

$ $HADOOP_HOME/bin/start-all.sh

f. to check if your Hadoop cluster is up, run the following Hadoop administrating command:

$ hadoop dfsadmin -report

You will see a brief status report about basic information about the overall health of the HDFS cluster, as well as some per-server metrics.

Step 3. Run your hadoop job

Refer to Example 1 below.

Step 4 Stop and Clean

 $ $HADOOP_HOME/bin/stop-all.sh
 $ myhadoop-cleanup.sh

Note. Since the hHdoop cluster is dynamically configured through the job submit script and myhadoop, it is not advisable to run hadoop commands directly (outside your hadoop job submit script).

Example 1: Running the word-counting example on the HPC cluster.

To run Hadoop on the HPC cluster, your Hadoop job must be submitted as a batch job. Below is a sample job submission script (hadoop_sub.sh) for the well-known word-counting example (i.e., for one or multiple books, count the number of times each word occurs).

 #!/bin/bash
   #####################################################################################
   #  name: hadoop_sub.sh 
   #  A sample slurm submit script that illustrates how to spin up a Hadoop cluster on the HPC cluster at FSU
   ######################################################################################

   #SBATCH -J "test_hadoop"
   #SBATCH -N 2
   #SBATCH --ntasks-per-node=4
   #SBATCH -p genacc_q
   #SBATCH --mail-type=ALL
   #SBATCH -t 00:30:00

   # define a few environmental variables needed:
   export JAVA_HOME=/usr
   export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
   export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
   export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH

   if [ ! -d $HOME/hadoop ]; then
      mkdir $HOME/hadoop
      mkdir $HOME/hadoop/config
      mkdir $HOME/hadoop/hdfs
   fi

   if [ -d $HOME/hadoop/hdfs ]; then
      rm -rf $HOME/hadoop/hdfs/*
   fi

   # HADOOP_CONF_DIR is where your hadoop cluster's configuration will be located.
   export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID
   #MH_SCRATCH_DIR  is directory for node-local scratch space, 
   export MH_SCRATCH_DIR=/tmp
   # We have to work under persistent mode for HPC at FSU
   export MH_PERSIST_DIR=$HOME/hadoop/hdfs

   echo "Running myhadoop-configure.sh under persistent mode:     "
   # both -s and -p options have to be specified for persistent mode
   myhadoop-configure.sh -s $MH_SCRATCH_DIR -p $MH_PERSIST_DIR

   echo "Starting all Hadoop Daemons:   "
   $HADOOP_HOME/bin/start-all.sh
   echo

   echo "check if hadoop is deployed (in trouble if you see a lot or zeros) "
   hadoop dfsadmin -report

   # need pg2701.txt, an E-book, for the word counting example
   if [ ! -f ./pg2701.txt ]; then
     cp /panfs/storage.local/opt/hadoop/Doc/pg2701.txt .
   fi
   # wordcount-output is the directory in your local file system storing the results
   if [  -d ./wordcount-output ]; then
     rm -rf ./wordcount-output
   fi
   if [  -d ./wordcount-output2 ]; then
     rm -rf ./wordcount-output2
   fi

   echo "Running a test word-counting example:   "
   # create a directory "data" in your HDFS file system
   hadoop dfs -mkdir data
   # copy the book "pg2701.txt" from your LOCAL file system to the HDFS file system
   hadoop dfs -put ./pg2701.txt data/
   # check if the book is copied successfully
   hadoop dfs -ls data
   # run the Hadoop word count example 
   # note: "data" and "wordcount-output" are the input/output directory in the HDFS system
   hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data wordcount-output
   hadoop dfs -ls wordcount-output
   # copy results from the HDFS file system to LOCAL file system
   hadoop dfs -get wordcount-output ./
   echo

   echo "Next, re-run the word count example but with 100 copies of pg2701.txt"
   echo " it will take a few minutes for the map-reduce to be finished "
   hadoop dfs -mkdir        data2
   for i in {1..100}
   do
     hadoop dfs -cp data/pg2701.txt    data2/pg2701_copy.$i.txt
   done
   hadoop dfs -dus data2
   hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data2  wordcount-output2
   hadoop dfs -get wordcount-output2 ./

   echo "results should be in the directory 'wordcount-output2' in your local file system  "
   echo "Please check/compare the results in two output directories."

   echo "Stopping all Hadoop daemons:" 
   $HADOOP_HOME/bin/stop-all.sh
   echo

   echo "cleanning up working directories:   "
   myhadoop-cleanup.sh
   echo

If the above example runs successfully, you will see three files in the wordcount-output directory

  $ ls wordcount-output/
   _logs           part-r-00000              _SUCCESS

The file part-r-00000 stores the results of the map/reduce job:

 $ head part-r-00000 
   "'A         3
   "'Also   1 
   "'Are            1
   "'Aye,   2
   "'Aye?   1
   "'Best   1
   "'Better 1
   "'Bout   1
   "'But            2
   "'Canallers!'    1
   ......

In the wordcount-output2 directory you will see similar things as expected:
  $ head part-r-00000 
  "'A           300
  "'Also            100
  "'Are         100
  "'Aye,    200
  "'Aye?    100
  "'Best            100
  "'Better  100
  "'Bout    100  
  "'But          200
  "'Canallers!' 100
  ......
Note. In the above submit script, the Hadoop file system commands...
  hadoop dfs -mkdir 
  hadoop dfs -ls  

...are very similar to the Unix file system commands mkdir and ls, respectively (dfs is a module in HDFS). Since the Hadoop Distributed File System cannot be directly probed by traditional Unix commands, Hadoop command -put and -get are used to copy between HDFS and the local file system:

  hadoop dfs -put  local_file       hadoop_file
  hadoop dfs -get  hadoop_file  local_file
Note. The source code WordCount.java is provided here (there are several versions of java code for this word counting example. Do not worry about the details if you are new to Hadoop, Map/Reduce, or Java):
// WordCount.java
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        output.collect(word, one);
      }
    }
  }
  public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    JobClient.runJob(conf);
  }
}

Persistency of HDFS data on the HPC cluster

You can persist data in the HDFS between Hadoop restarts and job submissions.  To achieve this, you must run your jobs in persistent mode and use the $MH_PERSIST_DIR variable.

Example 2. Retrieve data From a previous Hadoop job

Suppose you have run the Example 1 above successfully. Now, start up a new hadoop cluster via the following script:

#!/bin/bash
#####################################################################################
#  name: hadoop_persistency_test.sh 
#  A sample slurm submit script testing persistency of Hadoop data.
######################################################################################

#SBATCH -J "test_hadoop_persistency"
#SBATCH -N 4
#SBATCH --ntasks-per-node=4
#SBATCH -p genacc_q
#SBATCH --mail-type=ALL
#SBATCH -t 01:00:00

export JAVA_HOME=/usr
export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH

export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID
export MH_SCRATCH_DIR=/tmp

# to use previous HDFS data, use same $MH_PERSISIT_DIR as previous job
export MH_PERSIST_DIR=$HOME/hadoop/hdfs

myhadoop-configure.sh -s $MH_SCRATCH_DIR  -p $MH_PERSIST_DIR
$HADOOP_HOME/bin/start-all.sh

echo "Check if there is any thing in my HDFS system:   "
echo "Should be something since you were/are running under persistent mode"
hadoop dfs -ls
hadoop dfs -ls data
echo

echo "Stopping all Hadoop daemons and cleaning  working directories:" 
$HADOOP_HOME/bin/stop-all.sh
myhadoop-cleanup.sh

You will see content of your HDFS similar to the following (refer to the Slurm output file test_hadoop_persistency.oJOBID, where JOBID is the Slurm Job ID):

$ hadoop dfs -ls
Found 2 items
drwxr-xr-x   -$USER supergroup          0 2014-05-12 14:11 /user/$USER/data 
drwxr-xr-x   -$USER supergroup          0 2014-05-12 14:12 /user/$USER/wordcount-output

$ hadoop dfs -ls data
Found 1 items
-rw-r--r--   3 $USER supergroup    1257275 2014-05-12 14:11 /user/$USER/data/pg2701.txt

Example 3: Interacting with HDFS programmatically on HPC cluster

The following Java code HDFSHelloWorld.java creates a file hello.txt in the HDFS file system, writes a hello message, reads it back, and prints it as standard output.

//HDFSHelloWorld.java
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;

public class HDFSHelloWorld {
  
  public static final String theFilename = "hello.txt";
  public static final String message = "Hello, world!\n";
  
  public static void main (String [] args) throws IOException {
    
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path filenamePath = new Path(theFilename);
    
    try {
      if (fs.exists(filenamePath)) {
         // remove the file first
         fs.delete(filenamePath);
      }
      
      FSDataOutputStream out = fs.create(filenamePath);

      out.writeUTF(message);
      out.close();
      FSDataInputStream in = fs.open(filenamePath);
      String     messageIn = in.readUTF();
      System.out.print(messageIn);
      in.close();
    }
    catch (IOException ioe) {
     System.err.println("IOException during operation: " + ioe.toString());
     System.exit(1);
    }
  }
}
To run the above java code on the HPC cluster, follow the following steps:

Compile HDFSHelloWorld.java :

javac -classpath /path/to/your/hadoop-1.2.1/hadoop-core-1.2.1.jar HDFSHelloWorld.java

Create a .jar file, say, hello.jar :

jar cfm hello.jar manifest.txt HDFSHelloWorld.class

...where manifest.txt contains the headers to append to the META-INF/MANIFEST.MF

$ cat manifest.txt
Main-Class: HDFSHelloWorld
Class-Path: $HOME/hadoop-stack/hadoop-1.2.1/hadoop-core-1.2.1.jar

Run hello.jar within a Hadoop cluster on HPC. Again, Hadoop jobs must be run within a job submit script (please refer to the examples above).

 configure and start up your hadoop cluster
   ......
   echo "Running java code hello.jar: "
   echo "will create hello.txt within hdfs,  write Hello, world! to it, "
   echo "and read it back from HDFS, and print it to stdout:   "
   hadoop jar hello.jar
   echo "check if you have the hello.txt created in the hdfs"
   hadoop dfs -ls
   .......
   stop your hadoop cluster

You will see a file similar to the following upon successful execution of your code:

-rw-r--r-- 3 $USER supergroup 16 2014-05-07 11:32 /user/$USER/hello.txt

Example 4: Hadoop Word-Counting Using Python

Even though the Hadoop framework is written in Java, Map/Reduce programs can be developed in other languages such as Python or C++. This example demonstrates how to run the simple word-count example again, but with Mapper/Reducer developed in Python.

Here is the source code of mapper.py:

#!/usr/bin/env python
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

# remove leading and trailing whitespace
line = line.strip()

# split the line into words
words = line.split()

# increase counters
for word in words:
  # write the results to STDOUT (standard output);
  # what we output here will be the input for the
  # Reduce step, i.e. the input for reducer.py
  #
  # tab-delimited; the trivial word count is 1
  print '%s\t%s' % (word, 1)

Here is the Python code for reducer.py:

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0 
word = None

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split('\t', 1)

# convert count (currently a string) to int
try:
  count = int(count)
except ValueError:
  # count was not a number, so silently
  # ignore/discard this line
  continue
  
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
  current_count += count
else:
  if current_word:
      # write result to STDOUT
      print '%s\t%s' % (current_word, current_count)
  current_count = count
  current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)

To run this example, first copy mapper.py and reducer.py to your home directory. Next, create a submit script by replacing the line #72 of the submit script hadoop_sub.sh in the Example 1 above, i.e.,

hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data wordcount-output

...with...

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar \
-file mapper.py -mapper mapper.py \
-file reducer.py -reducer reducer.py \
-input data -output wordcount-output

...and the line #86, i.e.,

hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data2 wordcount-output2

...with...

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar \
-file mapper.py -mapper mapper.py \
-file reducer.py -reducer reducer.py \
-input data2 -output wordcount-output2

This Python example will generate the same output as the Java example above.

Note. The idea behind this example is that the Hadoop Streaming API helps pass data between the Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

Note. There is no intermediate combining stage before the reducing stage, so the code is a little slower than the word count example written in Java.

Further Reading

  1. Webpage for myhadoop
  2. Webpage for Apache hadoop
  3. A nice introduction to Hadoop can be found from Yahoo
  4. The Example 4 running Hadoop job using Python was from Michael G. Noll
  5. Apache Hadoop documentation for Hadoop_Streaming
  6. Hadoop Programming with Arbitrary Languages