Friday, May 25, 2012

Apache Hadoop Map Reduce - Advanced Example


Map Reduce Example

Latest updated proper work is at http://blog.hampisoftware.com/?p=20

Solve the same problem using Apache Spark: http://blog.hampisoftware.com/?p=41


Use ^^^^












DATED MATERIAL


Continuing with my experiments, now I tried to attempt the Patent Citation Example mentioned in the book "Hadoop In Action" by Chuck Lam.


Data Set

Visit http://nber.org/patents/
Choose acite75_99.zip  and it yielded cite75_99.txt

This file lists the patent number that cites another patent.  In this map reduce example, we are going to attempt to find the reverse.

For a given patent, how many citations exist.

What do you need?

Download Hadoop v1.0.3 from http://hadoop.apache.org
I downloaded hadoop-1.0.3.tar.gz (60MB)

Map Reduce Program

Note: I give out a program that was busted.  Look for commentary following this program as to why it is busted etc. Finally, I give out a working program.

==================
package mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PatentCitation extends Configured implements Tool{

    public static class PatentCitationMapper extends Mapper<Text,Text,Text,Text> {
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, key);
        }
    }
   
    public static class PatentCitationReducer extends Reducer<Text,Text,Text,Text>{
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String csv = "";
            Iterator<Text> iterator = values.iterator();
            while(iterator.hasNext()){
                if(csv.length() > 0 ) csv += ",";
                csv += iterator.next().toString();
            }
            context.write(key, new Text(csv));
        }
    }
   
    private  void deleteFilesInDirectory(File f) throws IOException {
        if (f.isDirectory()) {
            for (File c : f.listFiles())
                deleteFilesInDirectory(c);
        }
        if (!f.delete())
            throw new FileNotFoundException("Failed to delete file: " + f);
    }
   
    @Override
    public int run(String[] args) throws Exception {
        if(args.length == 0)
            throw new IllegalArgumentException("Please provide input and output paths");
       
        Path inputPath = new Path(args[0]);
        File outputDir = new File(args[1]);
        deleteFilesInDirectory(outputDir);
        Path outputPath = new Path(args[1]);

        Job job = new Job(getConf(), "Hadoop Patent Citation Example");
        job.setJarByClass(PatentCitation.class);

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       
        job.setMapperClass(PatentCitationMapper.class);
        job.setReducerClass(PatentCitationReducer.class);
       
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        return job.waitForCompletion(false) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new PatentCitation(), args));
    }
}


I created a directory called "input" where the cite75_99.txt was placed.  I also created an empty directory called "output".  These directories form the input and output directories for the M/R program

Execution

First Attempt:

I executed the program as is. It choked because my /tmp directory and root filesystem exhausted the disk space.

Second Attempt:

Now I exclusively configure the hadoop tmp directory so I can place the tmp files wherever I want.
-Dhadoop.tmp.dir=/home/anil/judcon12/tmp

Ok, now the program did not choke.  It just ran and ran for 2+ hours. I killed it. It seems the data set is too large to get it finished in a short duration.
The culprit was the reduce phase. It just did not finish.

Third Attempt:

Now I tried to configure the reducers to zero so I can view the output of the map phase.

I tried the property -Dmapred.reduce.tasks=0.  It made no difference.

I then added the following deprecated usage of the Job class. That worked.

        job.setNumReduceTasks(0);

Ok, now the program just undertook the map phase.

=======================
May 25, 2012 4:40:42 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
May 25, 2012 4:40:42 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
May 25, 2012 4:40:42 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
May 25, 2012 4:40:42 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
May 25, 2012 4:40:42 PM org.apache.hadoop.util.ProcessTree isSetsidSupported
INFO: setsid exited with exit code 0
May 25, 2012 4:40:42 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@420f9c40
May 25, 2012 4:40:44 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
May 25, 2012 4:40:44 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:44 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000000_0 is allowed to commit now
May 25, 2012 4:40:44 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000000_0' to /home/anil/judcon12/output
May 25, 2012 4:40:45 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:45 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
May 25, 2012 4:40:45 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@527736bd
May 25, 2012 4:40:46 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
May 25, 2012 4:40:46 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:46 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000001_0 is allowed to commit now
May 25, 2012 4:40:46 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000001_0' to /home/anil/judcon12/output
May 25, 2012 4:40:48 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:48 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000001_0' done.
May 25, 2012 4:40:48 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@5749b290
May 25, 2012 4:40:49 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
May 25, 2012 4:40:49 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:49 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000002_0 is allowed to commit now
May 25, 2012 4:40:49 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000002_0' to /home/anil/judcon12/output
May 25, 2012 4:40:51 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:51 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000002_0' done.
May 25, 2012 4:40:51 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2a8ceeea
May 25, 2012 4:40:52 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting
May 25, 2012 4:40:52 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:52 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000003_0 is allowed to commit now
May 25, 2012 4:40:52 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000003_0' to /home/anil/judcon12/output
May 25, 2012 4:40:54 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:54 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000003_0' done.
May 25, 2012 4:40:54 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@46238a47
May 25, 2012 4:40:55 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000004_0 is done. And is in the process of commiting
May 25, 2012 4:40:55 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:55 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000004_0 is allowed to commit now
May 25, 2012 4:40:55 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000004_0' to /home/anil/judcon12/output
May 25, 2012 4:40:57 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:57 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000004_0' done.
May 25, 2012 4:40:57 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@559113f8
May 25, 2012 4:40:58 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000005_0 is done. And is in the process of commiting
May 25, 2012 4:40:58 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:40:58 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000005_0 is allowed to commit now
May 25, 2012 4:40:58 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000005_0' to /home/anil/judcon12/output
May 25, 2012 4:41:00 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:41:00 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000005_0' done.
May 25, 2012 4:41:00 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@76a9b9c
May 25, 2012 4:41:01 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000006_0 is done. And is in the process of commiting
May 25, 2012 4:41:01 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:41:01 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000006_0 is allowed to commit now
May 25, 2012 4:41:01 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000006_0' to /home/anil/judcon12/output
May 25, 2012 4:41:03 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:41:03 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000006_0' done.
May 25, 2012 4:41:03 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@560c3014
May 25, 2012 4:41:04 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000007_0 is done. And is in the process of commiting
May 25, 2012 4:41:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:41:04 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_m_000007_0 is allowed to commit now
May 25, 2012 4:41:04 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_m_000007_0' to /home/anil/judcon12/output
May 25, 2012 4:41:06 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 4:41:06 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000007_0' done.
=======================

So the Map phase is taking between 1-2mins.  It is the reduce phase that does not end for me.  I will evaluate as to why that is the case. :)

Let us see what the output of the map phase looks like:

=============================
anil@sadbhav:~/judcon12/output$ ls
part-m-00000  part-m-00002  part-m-00004  part-m-00006  _SUCCESS
part-m-00001  part-m-00003  part-m-00005  part-m-00007
==============================

You can see the end result of the map phase in these files.

Now onto figuring out the number of reduce phases.

Let me look at the file size

===================
 anil@sadbhav:~/judcon12/input$ wc -l cite75_99.txt
16522439 cite75_99.txt
===================

OMG!  that is something like 16 million 522 thousand entries.  Too much for a laptop to handle.


Let us try another experiment.  Let us choose 20000 patent citations from this file and save it to 20000cite.txt

When I run the program now,  I see that the M/R execution took all of 10 secs to finish.

Let us view the results of the M/R execution.

=============================
anil@sadbhav:~/judcon12/output$ ls
part-r-00000  _SUCCESS
============================

When I look inside the part-r-00000 ,  I see one long line of csv patent citations.  Yeah, you are right.  My reducer is busted.  It is not working.  I need to fix it.....
That is next step....  This exercise was a failure. But there was a lesson here.  If you mess up, you will wait. :)




Ok,  here is the updated map reduce program that works:
======================
package mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PatentCitation extends Configured implements Tool{

    public static class PatentCitationMapper extends Mapper<Text,Text,Text,Text> {
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] citation = key.toString().split(",");
            context.write(new Text(citation[1]), new Text(citation[0]));
        }
    }

    public static class PatentCitationReducer extends Reducer<Text,Text,Text,Text>{
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String csv = "";
            for(Text val:values){
                if(csv.length() > 0 ) csv += ",";
                csv += val.toString();
            }
            context.write(key, new Text(csv));
        }
    }

    private  void deleteFilesInDirectory(File f) throws IOException {
        if (f.isDirectory()) {
            for (File c : f.listFiles())
                deleteFilesInDirectory(c);
        }
        if (!f.delete())
            throw new FileNotFoundException("Failed to delete file: " + f);
    }

    @Override
    public int run(String[] args) throws Exception {
        if(args.length == 0)
            throw new IllegalArgumentException("Please provide input and output paths");

        Path inputPath = new Path(args[0]);
        File outputDir = new File(args[1]);
        deleteFilesInDirectory(outputDir);
        Path outputPath = new Path(args[1]);

        Job job = new Job(getConf(), "Hadoop Patent Citation Example");
        job.setJarByClass(PatentCitation.class);

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setMapperClass(PatentCitationMapper.class);
        job.setReducerClass(PatentCitationReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //job.setNumReduceTasks(10000);

        return job.waitForCompletion(false) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new PatentCitation(), args));
    }
}
=======================================


Running updated program

================================
May 25, 2012 6:14:26 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
May 25, 2012 6:14:26 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
May 25, 2012 6:14:26 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
May 25, 2012 6:14:26 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
May 25, 2012 6:14:27 PM org.apache.hadoop.util.ProcessTree isSetsidSupported
INFO: setsid exited with exit code 0
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@420f9c40
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
May 25, 2012 6:14:27 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3a56860b
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 1 sorted segments
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 1 segments left of total size: 359270 bytes
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO:
May 25, 2012 6:14:30 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
May 25, 2012 6:14:30 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /home/anil/judcon12/output
May 25, 2012 6:14:33 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
May 25, 2012 6:14:33 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
=================================


Let us look at the output:
===============
anil@sadbhav:~/judcon12/output$ ls
part-r-00000  _SUCCESS
================

If you look inside the part-xxx file, you will the results:

======================
 "CITED" "CITING"
1000715 3861270
1001069 3858600
1001170 3861317
1001597 3861811
1004288 3861154
1006393 3861066
1006952 3860293
.......
1429311 3861187
1429835 3860154
1429968 3860060
1430491 3859976
1431444 3861601
1431718 3859022
1432243 3861774
1432467 3862478
1433649 3861223,3861222
1433923 3861232
1434088 3862293
1435134 3861526
1435144 3858398
....
======================


Woot!


Additional Information


If you see the following error, it means you have disk space issues.

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/spill0.out


How do I set my own tmp directory for Hadoop?
-Dhadoop.tmp.dir=
or
-Dmapred.local.dir=    (for map/reduce)


References

http://blog.hampisoftware.com