Map Side Joins In Map Reduce
Map Side Joins In Map Reduce (Replicated Joins)
Problem : Given a big file having Country to City Mapping , and a small file (small enough to fit in memory) having City to Airlines mapping. The job is expected to produce Country to Airline Mappings . The fact that the small file can be fit into memory enables us to perform MapSide Joins. ( Github)
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.9.0<version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.9.0</version> </dependency> </dependencies> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> [addToAppearHere]
Mapper :
package com.big.data.mapreduce.mapsidejoin;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class CountryToCityMapper extends Mapper<LongWritable, Text, Text, Text> {
// As TextInput Format has been used ,
// the key is the offset of line in the file , The actual line goes in the value
private Text country;
private Text airline;
private static Map<String, Set<String>> cityToAirline;
public static final String DELIMETER = "\n";
// Load the city To Airline file in memory.
public void filetoHashMap(Path filePath) throws IOException {
try (BufferedReader bufferedReader =
new BufferedReader(new FileReader(filePath.toString()))) {
String line = null;
while ((line = bufferedReader.readLine()) != null) {
Arrays.stream(line.split(DELIMETER)).forEach(e -> {
String[] countryToAirlineArray = e.split(",");
Set<String> airline = null;
if (cityToAirline.get(countryToAirlineArray[0]) == null) {
airline = new HashSet<String>();
airline.add(countryToAirlineArray[1]);
cityToAirline.put(countryToAirlineArray[0], airline);
} else {
airline = cityToAirline.get(countryToAirlineArray[0]);
airline.add(countryToAirlineArray[1]);
}
});
}
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
country = new Text();
airline = new Text();
// city can be mapped to Multiple airline
cityToAirline = new HashMap<>();
// From distributed cache get city -> airline mapping. a city can have multiple airlines
URI[] cityToAirlineFiles = context.getCacheFiles();
// All files from cache are retrieved in the array
//
if (cityToAirlineFiles != null && cityToAirlineFiles.length > 0) {
for (URI cityToAirlineMappingFile : cityToAirlineFiles) {
//getPath() gets the raw filePath (without file:/ prefix)
filetoHashMap(new Path(cityToAirlineMappingFile.getPath()));
// for custom logic based on fileName add
//some code to cityToAirlineMappingFile.getPath().contains() .....
}
}
}
// Input country , city
@Override
public void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
String[] countryToCity = value.toString().split(CountryToAirlineDriver.DELIMTER);
String city = countryToCity[1];
country.set(countryToCity[0]);
if(cityToAirline.get(city)!= null && !cityToAirline.get(city).isEmpty() ){
for(String airlinesInCity : cityToAirline.get(city) ){
airline.set(airlinesInCity);
context.write(country,airline);
}
}
}
}
[addToAppearHere]
Driver:
package com.big.data.mapreduce.mapsidejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* country ->city , city ->airline task is to find country->airline .
* In the process learn about join and Output Full outer
* join as output and hence understand how to implememt rightouter , leftouterjoins
*/
public class CountryToAirlineDriver extends Configured implements Tool {
//extends Configured implements Tool helps in argument parsing .
// Arguments need to passed as -Dkey=Value
public static final String INPUT_PATH = "input.path";
public static final String INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED = "input.path.distributed.file";
public static final String OUTPUT_PATH = "output.path";
public static final String DELIMTER = ",";
public static void main(String[] args) throws Exception {
if (ToolRunner.run(new CountryToAirlineDriver(), args) != 0) {
throw new IOException("Job has failed");
}
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
Path inputPat = new Path(conf.get(INPUT_PATH));
Path inputPathDistributed = new Path(conf.get(INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED));
Path outputPath = new Path(conf.get(OUTPUT_PATH));
Job job = new Job(conf, this.getClass().toString());
FileInputFormat.setInputPaths(job,inputPat);
FileOutputFormat.setOutputPath(job, outputPath);
job.setJobName("CountryToAirlineDriver");
job.setJarByClass(CountryToAirlineDriver.class);
// add files to cache
job.addCacheFile(inputPathDistributed.toUri());
job.addCacheFile(inputPathDistributed.toUri());
job.setMapperClass(CountryToCityMapper.class);
//Set OutPutFormat class
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//As no reducers are used, its a map only task
job.setNumReduceTasks(0);
// Driver polls to find out if the job has completed or not.
return job.waitForCompletion(true) ? 0 : 1;
}
}
[addToAppearHere]
Integration Test : Github
package com.big.data.mapreduce.mapsidejoin;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class CountryToAirlineDriverTest {
private final Configuration conf = new Configuration();
private static FileSystem fs;
private static String baseDir;
private static String outputDir;
private static String input;
// File Path along with fileName needs to be provided
private static String distrubutedFilePath;
private static final String NEW_LINE_DELIMETER = "\n";
private static Map<String, Set<String>> countryToAirline;
@BeforeClass
public static void startup() throws Exception {
Configuration conf = new Configuration();
//set the fs to file:/// which means the local fileSystem
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
fs = FileSystem.getLocal(conf);
baseDir = "/tmp/mapreduce/mapsideJoin/" + UUID.randomUUID().toString() + "/";
input = baseDir + "inputdir";
distrubutedFilePath = baseDir + "distributed" + "/distributed.txt";
outputDir = baseDir + "/output/";
//Write the data into the local filesystem for Left input
File tempFileleft = new File(input + "/input.txt");
FileUtils.writeStringToFile(tempFileleft, "Germany,Berlin", "UTF-8");
FileUtils.writeStringToFile(tempFileleft, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileleft, "India,Delhi", "UTF-8", true);
//Write the data into the local filesystem for right input
File tempFileRight = new File(distrubutedFilePath );
FileUtils.writeStringToFile(tempFileRight, "Berlin,Tegel", "UTF-8");
FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, "Berlin,Schonfield", "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, "Delhi,IGI", "UTF-8", true);
countryToAirline = new HashMap<>();
}
@AfterClass
public static void cleanup() throws Exception {
//Delete the local filesystem folder after the Job is done
fs.delete(new Path(baseDir), true);
}
void fileToHashMap(String filePath) throws IOException {
//Read the data from the outputfile
File outputFile = new File(filePath);
String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
//4 lines in output file, with one word per line
Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
String[] countryToAirlineArray = e.split("\t");
Set<String> airline = null;
if (countryToAirline.get(countryToAirlineArray[0]) == null) {
airline = new HashSet<String>();
airline.add(countryToAirlineArray[1]);
countryToAirline.put(countryToAirlineArray[0], airline);
} else {
airline = countryToAirline.get(countryToAirlineArray[0]);
airline.add(countryToAirlineArray[1]);
}
});
}
@Test
public void countryToAirlineTest() throws Exception {
// Any argument passed with -DKey=Value will be parsed by ToolRunner
String[] args = new String[]{
"-D" + CountryToAirlineDriver.INPUT_PATH + "=" + input,
"-D" + CountryToAirlineDriver.INPUT_PATH_FOR_FILE_TO_BE_DISTRIBUTED
+ "=" + distrubutedFilePath,
"-D" + CountryToAirlineDriver.OUTPUT_PATH + "=" + outputDir};
// call the main function to run the job
CountryToAirlineDriver.main(args);
fileToHashMap(outputDir + "/part-m-00000");
//4 words .
Assert.assertEquals(2L, countryToAirline.size());
Assert.assertEquals(2L, countryToAirline.get("Germany").size());
Assert.assertTrue(countryToAirline.get("Germany").contains("Tegel"));
Assert.assertTrue(countryToAirline.get("Germany").contains("Schonfield"));
}
}