Word Count Map Reduce

Word Count  in Map Reduce .  No more counting Dollars will be counting Stars !!






Mapper :

package com.big.data.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // As TextInput Format has been used , 
    //the key is the offset of line in the file , The actual line goes in the value

    // Reuse the writables, to avoid GC.
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,
                    Mapper.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {
            //Writable.Set() replaces the previous 
            //content of the writable Object with the new content
            context.write(word, one);



package com.big.data.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable values, Context context) 
                                            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();

        context.write(key, new IntWritable(sum));

Key Take aways :
1. Map reads Key, Values And emit Key, Values .
2. Because the Input format is TextInputFormat the Map KEYIN = long and VALUEIN = line read
3. For counting the words, a given word across all the mapper need to be collected at a given reducer
to perform the counting
4. Writables have been reused , both in Mapper and Reducer. By creating less object we are avoiding GC


package com.big.data.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class WordCountDriver extends Configured implements Tool {

    //extends Configured implements Tool helps in argument parsing . 
    //Arguments need to passed as -Dkey=Value

    public static final String INPUT_PATH = "input.path";
    public static final String OUTPUT_PATH = "output.path";

    public static void main(String[] args) throws Exception {
            if(ToolRunner.run(new WordCountDriver(), args)!=0){
                throw new IOException("Job has failed");

    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        Path inputPath = new Path(conf.get(INPUT_PATH));
        Path outputPath = new Path(conf.get(OUTPUT_PATH));
        Job job = new Job(conf, this.getClass().toString());

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        //Set InputFormatClass
        //Set OutPutFormat class


        // Driver polls to find out if the job has completed or not.
        return job.waitForCompletion(true) ? 0 : 1;

Working code can be found at Github

Integartion Test:

package com.big.data.mapreduce.wordcount;

import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class WordCountDriverTest {

    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static final DateTime NOW = DateTime.now();
    private static String baseDir;
    private static String outputDir;
    private static final String NEW_LINE_DELIMETER = "\n";

    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();
        outputDir = baseDir + "/output";

        File tempFile = new File(baseDir + "/input.txt");
        String content = "My name is Maverick";

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, content, "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, content, "UTF-8", true);

    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);

    public void WordCount() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{
                     "-D" + WordCountDriver.INPUT_PATH + "=" + baseDir,
                     "-D" + WordCountDriver.OUTPUT_PATH + "=" + outputDir };

        //Read the data from the outputfile
        File outputFile = new File(outputDir + "/part-r-00000");
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Map<String, Integer> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split("\t");
            wordToCount.put(wordCount[0], Integer.parseInt(wordCount[1]));

        //4 words .
        Assert.assertEquals(4L, wordToCount.size());
        Assert.assertEquals(2L, wordToCount.get("Maverick").longValue());