Wednesday, February 15, 2012

Map Reduce Programming with Hadoop

I understand the Map Reduce paradigm. But it has taken me some time to grasp the finer details of Hadoop Map Reduce programming.

I liked the following blog post on M/R programming on Hadoop:
Thinking Map Reduce with Hadoop.  This blog post gives a good description of what M/R programming with Hadoop involves.


Word Count Program

I copied the classic word count program from Hadoop examples, to try out M/R.  Nothing great about that.

I have two versions of the program - one in the Older Hadoop Programming Style and the other in the Newer Hadoop Programming Style.  If you are starting fresh, always go with the Newer Hadoop Programming Style.


Older Hadoop Programming Style

================
package mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Processing {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        @Override
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
                Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>{

        @Override
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
                Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
   
    private static void delete(File f) throws IOException {
          if (f.isDirectory()) {
            for (File c : f.listFiles())
              delete(c);
          }
          if (!f.delete())
            throw new FileNotFoundException("Failed to delete file: " + f);
        }



    public static void main(String[] args) throws IOException{
        JobConf jobConf = new JobConf(Processing.class);
        jobConf.setJobName("TestingHadoop");

        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);

        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);

        jobConf.setMapperClass(Map.class);
        jobConf.setCombinerClass(Reduce.class);
        jobConf.setReducerClass(Reduce.class);
       
        //Ensure that the output directory does not exist already
        File outputDir = new File(args[1]);
        delete(outputDir);

        FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

        JobClient.runJob(jobConf);
    }
}
================

Now the program arguments are as follows:
/home/anil/hadoopinput /home/anil/output

Basically, I have an directory in my home directory called hadoopinput. This will contain the input files.  The program will create an output directory called "output".

I have 2 files in the hadoopinput directory, namely a.txt and b.txt

a.txt  contains:
Java is a cool language .
Hadoop is written in Java
HDFS is part of Hadoop

b.txt contains:
Are there Hadoop users ?
Do you like Hadoop ?

Now the program trace for running this class is:
==========================================
Feb 15, 2012 9:38:35 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.FileInputFormat listStatus
INFO: Total input paths to process : 2
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
Feb 15, 2012 9:38:35 PM org.apache.hadoop.util.ProcessTree isSetsidSupported
INFO: setsid exited with exit code 0
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3794d372
Feb 15, 2012 9:38:35 PM org.apache.hadoop.mapred.MapTask runOldMapper
INFO: numReduceTasks: 1
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
Feb 15, 2012 9:38:36 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 0% reduce 0%
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: file:/home/anil/hadoopinput/a.txt:0+75
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@73da669c
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.MapTask runOldMapper
INFO: numReduceTasks: 1
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
Feb 15, 2012 9:38:38 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
Feb 15, 2012 9:38:39 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
Feb 15, 2012 9:38:39 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
Feb 15, 2012 9:38:39 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 0%
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: file:/home/anil/hadoopinput/b.txt:0+46
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000001_0' done.
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1d329642
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 2 sorted segments
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 2 segments left of total size: 218 bytes
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
Feb 15, 2012 9:38:41 PM org.apache.hadoop.mapred.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to file:/home/anil/output
Feb 15, 2012 9:38:44 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
Feb 15, 2012 9:38:44 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 100%
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 21
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:   File Input Format Counters
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Read=121
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:   File Output Format Counters
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Written=137
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:   FileSystemCounters
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_READ=1642
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_WRITTEN=96728
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:   Map-Reduce Framework
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output materialized bytes=226
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Map input records=5
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce shuffle bytes=0
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Spilled Records=40
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output bytes=225
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Total committed heap usage (bytes)=869203968
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     CPU time spent (ms)=0
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Map input bytes=121
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     SPLIT_RAW_BYTES=172
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine input records=26
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input records=20
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input groups=19
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine output records=20
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Physical memory (bytes) snapshot=0
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce output records=19
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Virtual memory (bytes) snapshot=0
Feb 15, 2012 9:38:45 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output records=26
=========================

When I go to the directory called "output" in my home directory.

~/output$ ls
part-00000  _SUCCESS

Let me peek inside the part-0000 file
~/output$ vi part-00000

================
.       1
?       2
Are     1
Do      1
HDFS    1
Hadoop  4
Java    2
a       1
cool    1
in      1
is      3
language        1
like    1
of      1
part    1
there   1
users   1
written 1
you     1
===================



Newer Hadoop Programming Style

The program is

=============
package mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Processing extends Configured implements Tool {

    public static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,IntWritable> {

        private final static IntWritable counter = new IntWritable(1);
        private Text word = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, counter);
            }
        }
    }

    /*public static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text,IntWritable,Text,IntWritable>{

        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            Set<IntWritable> uniques = new HashSet<IntWritable>();
            for (IntWritable value : values) {
                if (uniques.add(value)) {
                    context.write(key, value);
                }
            }
        }
    }*/

    public static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            Iterator<IntWritable> iter = values.iterator();
            int sum = 0;
            while (iter.hasNext()) {
                sum += iter.next().get();
            }

            context.write(key, new IntWritable(sum));
        }
    }


    private  void delete(File f) throws IOException {
        if (f.isDirectory()) {
            for (File c : f.listFiles())
                delete(c);
        }
        if (!f.delete())
            throw new FileNotFoundException("Failed to delete file: " + f);
    }



    @Override
    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        File outputDir = new File(args[1]);
        delete(outputDir);
        Path outputPath = new Path(args[1]);


        Job job = new Job(getConf(), "TestingHadoop");

        job.setJarByClass(Processing.class);

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setMapperClass(Mapper.class);
        //job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        return job.waitForCompletion(false) ? 0 : -1;
    }


    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new Processing(), args));
    }
}
=============

Note:  I have commented out the Combiner class because we do not use a combiner in this example.

The program arguments are the same as shown in the "old hadoop programming style".

Log of the running program.
===================
Feb 15, 2012 10:42:25 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 2
Feb 15, 2012 10:42:25 PM org.apache.hadoop.util.ProcessTree isSetsidSupported
INFO: setsid exited with exit code 0
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@5b976011
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
Feb 15, 2012 10:42:25 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@388530b8
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
Feb 15, 2012 10:42:28 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000001_0' done.
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@39e4853f
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 2 sorted segments
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 2 segments left of total size: 281 bytes
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
Feb 15, 2012 10:42:31 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /home/anil/output
Feb 15, 2012 10:42:34 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
Feb 15, 2012 10:42:34 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
===================


Now let's us look at the output directory.

~/output$ ls
part-r-00000  _SUCCESS

~/output$ vi part-r-00000


=================
.       1
?       2
Are     1
Do      1
HDFS    1
Hadoop  4
Java    2
a       1
cool    1
in      1
is      3
language        1
like    1
of      1
part    1
there   1
users   1
written 1
=================


Magical, isn't it?

No comments:

Post a Comment