Map Side Joins In Map Reduce

Map Side Joins In Map Reduce (Replicated Joins)

Problem :   Given a big file having Country to City Mapping , and a small file (small enough to fit in memory) having City to Airlines mapping.  The job is expected to produce Country to Airline Mappings .   The fact that the small file can be fit into memory enables us to perform MapSide Joins. ( Github)

dependency

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.9.0<version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.9.0</version>
        </dependency>
   </dependencies>

   <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories> 

[addToAppearHere]

Mapper :

 

package com.big.data.mapreduce.mapsidejoin;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;


public class CountryToCityMapper extends Mapper<LongWritable, Text, Text, Text> {
    // As TextInput Format has been used , 
   // the key is the offset of line in the file , The actual line goes in the value

    private Text country;
    private Text airline;
    private static Map<String, Set<String>> cityToAirline;
    public static final String DELIMETER = "\n";

    // Load the city To Airline file in memory.
    public void filetoHashMap(Path filePath) throws IOException {

        try (BufferedReader bufferedReader = 
                        new BufferedReader(new FileReader(filePath.toString()))) {

            String line = null;

            while ((line = bufferedReader.readLine()) != null) {
                Arrays.stream(line.split(DELIMETER)).forEach(e -> {
                    String[] countryToAirlineArray = e.split(",");
                    Set<String> airline = null;
                    if (cityToAirline.get(countryToAirlineArray[0]) == null) {
                        airline = new HashSet<String>();
                        airline.add(countryToAirlineArray[1]);
                        cityToAirline.put(countryToAirlineArray[0], airline);
                    } else {
                        airline = cityToAirline.get(countryToAirlineArray[0]);
                        airline.add(countryToAirlineArray[1]);
                    }
                });
            }
        }
    }

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);

        country = new Text();
        airline = new Text();
        // city can be mapped to Multiple airline
        cityToAirline = new HashMap<>();

        // From distributed cache get city -> airline mapping. a city can have multiple airlines
        URI[] cityToAirlineFiles = context.getCacheFiles();
        // All files from cache are retrieved in the array
        //
        if (cityToAirlineFiles != null && cityToAirlineFiles.length > 0) {
            for (URI cityToAirlineMappingFile : cityToAirlineFiles) {
                //getPath() gets the raw filePath (without file:/ prefix)
                filetoHashMap(new Path(cityToAirlineMappingFile.getPath()));
                // for custom logic based on fileName add 
                //some code to cityToAirlineMappingFile.getPath().contains() .....
            }
        }

    }

    // Input country , city
    @Override
    public void map(LongWritable key, Text value,
                    Context context) throws IOException, InterruptedException {

        String[] countryToCity = value.toString().split(CountryToAirlineDriver.DELIMTER);

        String city = countryToCity[1];

        country.set(countryToCity[0]);

        if(cityToAirline.get(city)!= null && !cityToAirline.get(city).isEmpty() ){

           for(String airlinesInCity : cityToAirline.get(city) ){
               airline.set(airlinesInCity);
               context.write(country,airline);
           }

        }

    }

}
[addToAppearHere]

Driver:

 

package com.big.data.mapreduce.mapsidejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

  /**
    * country ->city , city ->airline task is to find country->airline .
    * In the process learn about join and Output Full outer 
    * join as output and hence understand how to implememt rightouter , leftouterjoins
 */
public class CountryToAirlineDriver extends Configured implements Tool {
    //extends Configured implements Tool helps in argument parsing .
   // Arguments need to passed as -Dkey=Value

    public static final String INPUT_PATH = "input.path";
    public static final String INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED = "input.path.distributed.file";
    public static final String OUTPUT_PATH = "output.path";
    public static final String DELIMTER = ",";

    public static void main(String[] args) throws Exception {
        if (ToolRunner.run(new CountryToAirlineDriver(), args) != 0) {
            throw new IOException("Job has failed");
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        Path inputPat = new Path(conf.get(INPUT_PATH));
        Path inputPathDistributed = new Path(conf.get(INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED));
        Path outputPath = new Path(conf.get(OUTPUT_PATH));
        Job job = new Job(conf, this.getClass().toString());

        FileInputFormat.setInputPaths(job,inputPat);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setJobName("CountryToAirlineDriver");
        job.setJarByClass(CountryToAirlineDriver.class);

        // add files to cache
        job.addCacheFile(inputPathDistributed.toUri());
        job.addCacheFile(inputPathDistributed.toUri());


        job.setMapperClass(CountryToCityMapper.class);

        //Set OutPutFormat class
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //As no reducers are used, its a map only task
       job.setNumReduceTasks(0);

        // Driver polls to find out if the job has completed or not.
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

[addToAppearHere]

Integration Test : Github

 

package com.big.data.mapreduce.mapsidejoin;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;


public class CountryToAirlineDriverTest {

    private final Configuration conf = new Configuration();
    private static FileSystem fs;
    private static String baseDir;
    private static String outputDir;
    private static String input;
    // File Path along with fileName needs to be provided
    private static String distrubutedFilePath;
    private static final String NEW_LINE_DELIMETER = "\n";
    private static Map<String, Set<String>> countryToAirline;

    @BeforeClass
    public static void startup() throws Exception {

        Configuration conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/mapreduce/mapsideJoin/" + UUID.randomUUID().toString() + "/";

        input = baseDir + "inputdir";
        distrubutedFilePath = baseDir + "distributed" + "/distributed.txt";

        outputDir = baseDir + "/output/";

        //Write the data into the local filesystem  for Left input
        File tempFileleft = new File(input + "/input.txt");
        FileUtils.writeStringToFile(tempFileleft, "Germany,Berlin", "UTF-8");
        FileUtils.writeStringToFile(tempFileleft, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileleft, "India,Delhi", "UTF-8", true);

        //Write the data into the local filesystem  for right input
        File tempFileRight = new File(distrubutedFilePath );
        FileUtils.writeStringToFile(tempFileRight, "Berlin,Tegel", "UTF-8");
        FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, "Berlin,Schonfield", "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, "Delhi,IGI", "UTF-8", true);

        countryToAirline = new HashMap<>();
    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);
    }

    void fileToHashMap(String filePath) throws IOException {

        //Read the data from the outputfile
        File outputFile = new File(filePath);
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] countryToAirlineArray = e.split("\t");
            Set<String> airline = null;

            if (countryToAirline.get(countryToAirlineArray[0]) == null) {
                airline = new HashSet<String>();
                airline.add(countryToAirlineArray[1]);
                countryToAirline.put(countryToAirlineArray[0], airline);

            } else {
                airline = countryToAirline.get(countryToAirlineArray[0]);
                airline.add(countryToAirlineArray[1]);
            }
        });

    }

    @Test
    public void countryToAirlineTest() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
      String[] args = new String[]{
         "-D" + CountryToAirlineDriver.INPUT_PATH + "=" + input,
         "-D" + CountryToAirlineDriver.INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED 
                                                            + "=" + distrubutedFilePath, 
         "-D" + CountryToAirlineDriver.OUTPUT_PATH + "=" + outputDir};

        // call the main function to run the job
        CountryToAirlineDriver.main(args);

        fileToHashMap(outputDir + "/part-m-00000");

        //4 words .
        Assert.assertEquals(2L, countryToAirline.size());
        Assert.assertEquals(2L, countryToAirline.get("Germany").size());
        Assert.assertTrue(countryToAirline.get("Germany").contains("Tegel"));
        Assert.assertTrue(countryToAirline.get("Germany").contains("Schonfield"));

    }

}