MultipleOutputs in Map Reduce

multipleOutput in Map Reduce .

Given a input file of Numbers , the job is expected to write even and odd numbers in even.txt and odd.txt files.  SPLIT operator is the name given in PIG for performing this operation. (Github)

dependency

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.9.0</version>
        </dependency>
   </dependencies>

   <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories> 

[addToAppearHere]

Mapper :

 

package com.big.data.mapreduce.multioutputpath;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

/**
 * The mappers derives if a number is even or odd and writes the number in different folders.
 */
public class EvenOddNumberMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
    // As TextInput Format has been used , 
    // the key is the offset of line in the file , The actual line goes in the value

    public static final String MULTI_OUTPUT_NAME = "textMultiOutputformat";
    // This relative path , as per the path given in driver (Path doesnt start with /) .
    // Adding a absolute path "Staring with /"  will result 
    // into various issues ( directory will not be cleared if job fails)
    public static final String EVEN_KEY_PATH = "evenkey/output/";
    public static final String ODD_KEY_PATH = "oddkey/output/";
    protected MultipleOutputs<IntWritable, NullWritable> multipleOutput;
    private IntWritable output;

    @Override
    protected void setup(Context context) 
                             throws IOException, InterruptedException {
        super.setup(context);

        //Initialize multioutput
        multipleOutput = new MultipleOutputs<>(context);

        output = new IntWritable();

    }

    @Override
    public void map(LongWritable key, Text value,
                    Context context) throws IOException, InterruptedException {

        int number = Integer.parseInt(value.toString());
        output.set(number);
        if ((number % 2) == 0) {
            // part -> file names needed to be appended in multiOutput
            multipleOutput.write(MULTI_OUTPUT_NAME, output, 
                                               NullWritable.get(), EVEN_KEY_PATH + "part");
        } else {
            multipleOutput.write(MULTI_OUTPUT_NAME, output, 
                                              NullWritable.get(), ODD_KEY_PATH + "part");
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        if (null != multipleOutput) {
            multipleOutput.close();
        }
    }
}

Key Take Aways:
1. Never give absolute Path names in multipleOutput (you will have to manage cleanup in case of failure)
2. Always call multipleOutput.close() in cleanup to flush all the data

[addToAppearHere]

Driver:


package com.big.data.mapreduce.multioutputpath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MultiOutputPathDriver extends Configured implements Tool {

    //extends Configured implements Tool helps in argument parsing . 
    //Arguments need to passed as -Dkey=Value

    public static final String INPUT_PATH = "input.path";
    public static final String OUTPUT_PATH = "output.path";

    public static void main(String[] args) throws Exception {
        if (ToolRunner.run(new MultiOutputPathDriver(), args) != 0) {
            throw new IOException("Job has failed");
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        Path inputPath = new Path(conf.get(INPUT_PATH));
        Path outputPath = new Path(conf.get(OUTPUT_PATH));
        Job job = new Job(conf, this.getClass().toString());

        FileInputFormat.setInputPaths(job, inputPath);

        //This is the base Path for the sub directories , 
        //the extra path will be added in the mapper .
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setJobName("MultiOutputPathDriver");
        job.setJarByClass(MultiOutputPathDriver.class);
        //Set InputFormatClass
        job.setInputFormatClass(TextInputFormat.class);

        MultipleOutputs.addNamedOutput(job, EvenOddNumberMapper.MULTI_OUTPUT_NAME,
                                       TextOutputFormat.class, IntWritable.class, NullWritable.class);
        //Enabled the counters as , the default Output 
        //record counters will no longer report the numbers
        MultipleOutputs.setCountersEnabled(job, true);
        //Set OutPutFormat class
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        job.setMapperClass(EvenOddNumberMapper.class);

        //As no reducers are used, its a map only task
        job.setNumReduceTasks(0);

        // Driver polls to find out if the job has completed or not.
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

[addToAppearHere]

Integration Test : Github

package com.big.data.mapreduce.multioutputpath;

import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;


public class MultiOutputPathTest {

    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static final DateTime NOW = DateTime.now();
    private static String baseDir;
    private static String outputDir;
    private static final String NEW_LINE_DELIMETER = "\n";

    @BeforeClass
    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/fetch/" + UUID.randomUUID().toString();
        outputDir = baseDir + "/output/";

        File tempFile = new File(baseDir + "/input.txt");

        //Write the data into the local filesystem
        FileUtils.writeStringToFile(tempFile, "2", "UTF-8");
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, "3", "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFile, "4", "UTF-8", true);

    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);
    }

    @Test
    public void WordCount() throws Exception {
        MultiOutputPathDriver driver = new MultiOutputPathDriver();

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{
             "-D" + MultiOutputPathDriver.INPUT_PATH + "=" + baseDir,
             "-D" + MultiOutputPathDriver.OUTPUT_PATH + "=" + outputDir};
        driver.main(args);

        //Read the data from the outputfile
        File outputFile = new File(outputDir + EvenOddNumberMapper.EVEN_KEY_PATH + "/part-m-00000");
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
        Set<Integer> integeSet = new HashSet<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            integeSet.add(Integer.parseInt(e));
        });

        //4 words .
        Assert.assertEquals(2L, integeSet.size());
        Assert.assertTrue(integeSet.contains(2));
        Assert.assertTrue(integeSet.contains(4));
    }

}