MultipleInputs in Map Reduce

MultipleInputs in Map Reduce (Union Operator)

Problem :   The job is expected to read two Files having different schema . (Github)

dependency

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.9.0</version>
        </dependency>
   </dependencies>

   <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>

[addToAppearHere]

Left Mapper :

package com.big.data.mapreduce.multiinputpath;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class StudentToIdMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // As TextInput Format has been used ,
   //  the key is the offset of line in the file , The actual line goes in the value

    private Text outputName;
    private IntWritable outputId;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);

        outputName = new Text();
        outputId = new IntWritable();

    }

    // Input StudentName,RollId
    @Override
    public void map(LongWritable key, Text value,
                    Context context) throws IOException, InterruptedException {

        String[] studentId = value.toString().split(MultiInputPathDriver.DELIMTER);
        outputName.set(studentId[0]);
        outputId.set(Integer.parseInt(studentId[1]));
        context.write(outputName, outputId);
    }

}

[addToAppearHere]

Right Mapper :

 

package com.big.data.mapreduce.multiinputpath;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class IdtoStudentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // As TextInput Format has been used , 
    // the key is the offset of line in the file , The actual line goes in the value

    private Text outputName;
    private IntWritable outputId;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);

        outputName = new Text();
        outputId = new IntWritable();

    }

    // Input RollId,StudentName
    @Override
    public void map(LongWritable key, Text value,
                    Context context) throws IOException, InterruptedException {

        String[] studentId = value.toString().split(MultiInputPathDriver.DELIMTER);
        outputName.set(studentId[1]);
        outputId.set(Integer.parseInt(studentId[0]));
        context.write(outputName, outputId);
    }
}

Driver:

 

package com.big.data.mapreduce.multiinputpath;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MultiInputPathDriver extends Configured implements Tool {

    //extends Configured implements Tool helps in argument parsing . 
   // Arguments need to passed as -Dkey=Value

    public static final String INPUT_PATH_LEFT = "input.path.left";
    public static final String INPUT_PATH_RIGHT = "input.path.right";
    public static final String OUTPUT_PATH = "output.path";
    public static final String DELIMTER = ",";

    public static void main(String[] args) throws Exception {
        if (ToolRunner.run(new MultiInputPathDriver(), args) != 0) {
            throw new IOException("Job has failed");
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        Path inputPathLeft = new Path(conf.get(INPUT_PATH_LEFT));
        Path inputPathRight = new Path(conf.get(INPUT_PATH_RIGHT));
        Path outputPath = new Path(conf.get(OUTPUT_PATH));
        Job job = new Job(conf, this.getClass().toString());

        // For left path set StudentToIdMapper ,
       //  For right path set IdtoStudentMapper , 
       // as the schema are different hence different mapper
        MultipleInputs.addInputPath(job, inputPathLeft, 
                                        TextInputFormat.class, StudentToIdMapper.class);
        MultipleInputs.addInputPath(job, inputPathRight, 
                                        TextInputFormat.class, IdtoStudentMapper.class);

        //This is the base Path for the sub directories , the extra path will be added in the mapper .
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setJobName("MultiOutputPathDriver");
        job.setJarByClass(MultiInputPathDriver.class);

        //Set OutPutFormat class
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //As no reducers are used, its a map only task
        job.setNumReduceTasks(0);

        // Driver polls to find out if the job has completed or not.
        return job.waitForCompletion(true) ? 0 : 1;
    }
}
[addToAppearHere]

Working code can be found at Github

 

Integration Test :

 

package com.big.data.mapreduce.multiinputpath;

import com.cloudera.org.joda.time.DateTime;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


public class MultiInputPathTest {

    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static final DateTime NOW = DateTime.now();
    private static String baseDir;
    private static String outputDir;
    private static String leftdir;
    private static String rightdir;
    private static final String NEW_LINE_DELIMETER = "\n";
    private static Map<String, Integer> studentToId;

    @BeforeClass
    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/mapreduce/multiinputPath/" + UUID.randomUUID().toString() + "/";

        leftdir = baseDir + "left";
        rightdir = baseDir + "right";

        outputDir = baseDir + "/output/";

        //Write the data into the local filesystem  for Left input
        File tempFileleft = new File(leftdir + "/input.txt");
        FileUtils.writeStringToFile(tempFileleft, "Maverick,1", "UTF-8");
        FileUtils.writeStringToFile(tempFileleft, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileleft, "Ninja,2", "UTF-8", true);

        //Write the data into the local filesystem  for Left input
        File tempFileRight = new File(rightdir + "/input.txt");
        FileUtils.writeStringToFile(tempFileRight, "3,Zoom", "UTF-8");
        FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, "4,Dracula", "UTF-8", true);

        studentToId = new HashMap<>();
    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);
    }

    void fileToHashMap(String filePath) throws IOException {

        //Read the data from the outputfile
        File outputFile = new File(filePath);
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split("\t");
            studentToId.put(wordCount[0], Integer.parseInt(wordCount[1]));
        });

    }

    @Test
    public void multiInputTest() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{
          "-D" + MultiInputPathDriver.INPUT_PATH_LEFT + "=" + leftdir,
          "-D" + MultiInputPathDriver.INPUT_PATH_RIGHT + "=" + rightdir, 
          "-D" + MultiInputPathDriver.OUTPUT_PATH + "=" + outputDir};

        // call the main function to run the job
        MultiInputPathDriver.main(args);

        //Two mappers have been spawned because, 
        //Two input files are there and InputFormat is 
        //TextInputFormat (with CombinedTextInputFormat this will not be the case)
        fileToHashMap(outputDir + "/part-m-00000");
        fileToHashMap(outputDir + "/part-m-00001");

        //4 words .
        Assert.assertEquals(4L, studentToId.size());
        Assert.assertEquals(1L, studentToId.get("Maverick").longValue());

    }

}