MultipleOutputs in Map Reduce
multipleOutput in Map Reduce .
Given a input file of Numbers , the job is expected to write even and odd numbers in even.txt and odd.txt files. SPLIT operator is the name given in PIG for performing this operation. (Github)
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.9.0</version> </dependency> </dependencies> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
[addToAppearHere]
Mapper :
package com.big.data.mapreduce.multioutputpath;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
/**
* The mappers derives if a number is even or odd and writes the number in different folders.
*/
public class EvenOddNumberMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
// As TextInput Format has been used ,
// the key is the offset of line in the file , The actual line goes in the value
public static final String MULTI_OUTPUT_NAME = "textMultiOutputformat";
// This relative path , as per the path given in driver (Path doesnt start with /) .
// Adding a absolute path "Staring with /" will result
// into various issues ( directory will not be cleared if job fails)
public static final String EVEN_KEY_PATH = "evenkey/output/";
public static final String ODD_KEY_PATH = "oddkey/output/";
protected MultipleOutputs<IntWritable, NullWritable> multipleOutput;
private IntWritable output;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
//Initialize multioutput
multipleOutput = new MultipleOutputs<>(context);
output = new IntWritable();
}
@Override
public void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
int number = Integer.parseInt(value.toString());
output.set(number);
if ((number % 2) == 0) {
// part -> file names needed to be appended in multiOutput
multipleOutput.write(MULTI_OUTPUT_NAME, output,
NullWritable.get(), EVEN_KEY_PATH + "part");
} else {
multipleOutput.write(MULTI_OUTPUT_NAME, output,
NullWritable.get(), ODD_KEY_PATH + "part");
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (null != multipleOutput) {
multipleOutput.close();
}
}
}
Key Take Aways:
1. Never give absolute Path names in multipleOutput (you will have to manage cleanup in case of failure)
2. Always call multipleOutput.close() in cleanup to flush all the data
[addToAppearHere]
Driver:
package com.big.data.mapreduce.multioutputpath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class MultiOutputPathDriver extends Configured implements Tool {
//extends Configured implements Tool helps in argument parsing .
//Arguments need to passed as -Dkey=Value
public static final String INPUT_PATH = "input.path";
public static final String OUTPUT_PATH = "output.path";
public static void main(String[] args) throws Exception {
if (ToolRunner.run(new MultiOutputPathDriver(), args) != 0) {
throw new IOException("Job has failed");
}
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
Path inputPath = new Path(conf.get(INPUT_PATH));
Path outputPath = new Path(conf.get(OUTPUT_PATH));
Job job = new Job(conf, this.getClass().toString());
FileInputFormat.setInputPaths(job, inputPath);
//This is the base Path for the sub directories ,
//the extra path will be added in the mapper .
FileOutputFormat.setOutputPath(job, outputPath);
job.setJobName("MultiOutputPathDriver");
job.setJarByClass(MultiOutputPathDriver.class);
//Set InputFormatClass
job.setInputFormatClass(TextInputFormat.class);
MultipleOutputs.addNamedOutput(job, EvenOddNumberMapper.MULTI_OUTPUT_NAME,
TextOutputFormat.class, IntWritable.class, NullWritable.class);
//Enabled the counters as , the default Output
//record counters will no longer report the numbers
MultipleOutputs.setCountersEnabled(job, true);
//Set OutPutFormat class
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(EvenOddNumberMapper.class);
//As no reducers are used, its a map only task
job.setNumReduceTasks(0);
// Driver polls to find out if the job has completed or not.
return job.waitForCompletion(true) ? 0 : 1;
}
}
[addToAppearHere]
Integration Test : Github
package com.big.data.mapreduce.multioutputpath;
import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
public class MultiOutputPathTest {
private final Configuration conf = new Configuration();
private static FileSystem fs;
private static final DateTime NOW = DateTime.now();
private static String baseDir;
private static String outputDir;
private static final String NEW_LINE_DELIMETER = "\n";
@BeforeClass
public static void startup() throws Exception {
Configuration conf = new Configuration();
//set the fs to file:/// which means the local fileSystem
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
fs = FileSystem.getLocal(conf);
baseDir = "/tmp/fetch/" + UUID.randomUUID().toString();
outputDir = baseDir + "/output/";
File tempFile = new File(baseDir + "/input.txt");
//Write the data into the local filesystem
FileUtils.writeStringToFile(tempFile, "2", "UTF-8");
FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFile, "3", "UTF-8", true);
FileUtils.writeStringToFile(tempFile, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFile, "4", "UTF-8", true);
}
@AfterClass
public static void cleanup() throws Exception {
//Delete the local filesystem folder after the Job is done
fs.delete(new Path(baseDir), true);
}
@Test
public void WordCount() throws Exception {
MultiOutputPathDriver driver = new MultiOutputPathDriver();
// Any argument passed with -DKey=Value will be parsed by ToolRunner
String[] args = new String[]{
"-D" + MultiOutputPathDriver.INPUT_PATH + "=" + baseDir,
"-D" + MultiOutputPathDriver.OUTPUT_PATH + "=" + outputDir};
driver.main(args);
//Read the data from the outputfile
File outputFile = new File(outputDir + EvenOddNumberMapper.EVEN_KEY_PATH + "/part-m-00000");
String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
Set<Integer> integeSet = new HashSet<>();
//4 lines in output file, with one word per line
Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
integeSet.add(Integer.parseInt(e));
});
//4 words .
Assert.assertEquals(2L, integeSet.size());
Assert.assertTrue(integeSet.contains(2));
Assert.assertTrue(integeSet.contains(4));
}
}