Map Side Joins in Spark
Map Side Join in Spark
Problem : Given country to city mapping in a huge Text file on HDFS , and a small file(can fit in memory) of city to Airlines Mapping , The job is expected to perform map side joins and generate country to airlines on HDFS . (Github)
Solution : Broadcast the small dataSet , lookup in map operation for each input data set .
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.big.data</groupId>
<artifactId>avro-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Input from file bigfile : country -> city
* City To Airline : small data
* Outcome is to find country -> Airline and save it onto disk
*/
public class MapSideJoin extends Configured implements Tool, Closeable {
public static final String BIG_FILE_INPUT_PATH = "spark.big.file.input.path";
public static final String SMALL_FILE_INPUT_PATH = "spark.small.file.input.path";
public static final String OUTPUT_PATH = "spark.output.path";
public static final String IS_RUN_LOCALLY = "spark.is.run.local";
public static final String DEFAULT_FS = "spark.default.fs";
public static final String NUM_PARTITIONS = "spark.num.partitions";
private static final String NEW_LINE_DELIMETER = "\n";
private SQLContext sqlContext;
private JavaSparkContext javaSparkContext;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
//Set spark conf here ,
//after one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
/**
* read from file local/HDFS and poupulate city to airline . 1 city => n airlines
*
* @param inputPath
* @param fileSystem
* @return
* @throws IOException
*/
protected Map<String, Set<String>> formCityToAirlineHashMap
(String inputPath, FileSystem fileSystem) throws IOException {
Map<String, Set<String>> cityToAirline = new HashMap<>();
//Read the data from the outputfile
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(
fileSystem.open(new Path("file:///" + inputPath))))) {
String line = reader.readLine();
while (line != null) {
String[] cityToAirlineArray = line.split(",");
Set<String> airline = null;
if (cityToAirline.get(cityToAirlineArray[0]) == null) {
airline = new HashSet<String>();
airline.add(cityToAirlineArray[1]);
cityToAirline.put(cityToAirlineArray[0], airline);
} else {
airline = cityToAirline.get(cityToAirlineArray[0]);
airline.add(cityToAirlineArray[1]);
}
line = reader.readLine();
}
}
return cityToAirline;
}
@Override
public int run(String[] args) throws Exception {
//The arguments passed has been split into Key value by ToolRunner
Configuration conf = getConf();
String countryToCityPath = conf.get(BIG_FILE_INPUT_PATH);
// The filename is also included in the path
String cityToAirlinesFilePath = conf.get(SMALL_FILE_INPUT_PATH);
String outputPath = conf.get(OUTPUT_PATH);
//Get spark context, This is the central context ,
//which can be wrapped in Any Other context
javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE),
conf.get(DEFAULT_FS), this.getClass());
Map<String, Set<String>> cityToAirline =
formCityToAirlineHashMap(cityToAirlinesFilePath, FileSystem.get(conf));
Broadcast<Map<String, Set<String>>> airlinesBroadcast =
javaSparkContext.broadcast(cityToAirline);
// No input path has been read, no job has not been started yet .
//To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
// To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(countryToCityPath);
stringJavaRDD.map(new Lookup(airlinesBroadcast))
.flatMap(e-> ((ArrayList)e))
// How many partitions to slpit the output into
.repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
.saveAsTextFile(outputPath);
return 0;
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(javaSparkContext);
}
public static class Lookup implements Function<String, Object> {
private Broadcast<Map<String, Set<String>>> airlinesBroadcast;
private String temp;
public Lookup(Broadcast<Map<String, Set<String>>> airlinesBroadcast) {
this.airlinesBroadcast = airlinesBroadcast;
temp = DEFAULT_FS;
}
@Override
public Object call(String v1) throws Exception {
ArrayList<Tuple2<String, String>> outuputList = new ArrayList<>();
String[] countryToCityArray = v1.split(",");
Set<String> airlines = airlinesBroadcast.getValue()
.get(countryToCityArray[1]);
if (airlines != null) {
airlines.stream().forEach(e -> {
outuputList.add(new Tuple2<String, String>(countryToCityArray[0], e));
});
} else {
return null;
}
// returning back a list , will have to flatten it to store appropriately
return outuputList;
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new MapSideJoin(), args);
}
}
Key Take Aways :
1. Small dataSet is broadcasted to all the executor.
2. In map transformation , broadcast is used to lookup for the value.
3. This is the same concept of ditributed cache of MapReduce.
[addToAppearHere]
Integration Test : (Github)
package com.big.data.spark;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class MapSideJoinTest {
private static Configuration conf;
private static FileSystem fs;
private static String baseDir;
private static String outputDir;
private static String input;
// File Path along with fileName needs to be provided
private static String distrubutedFilePath;
private static final String NEW_LINE_DELIMETER = "\n";
private static Map<String, Set<String>> countryToAirline;
@BeforeClass
public static void startup() throws Exception {
conf = new Configuration();
//set the fs to file:/// which means the local fileSystem
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
fs = FileSystem.getLocal(conf);
baseDir = "/tmp/spark/mapsideJoin/" + UUID.randomUUID().toString() + "/";
input = baseDir + "inputdir";
distrubutedFilePath = baseDir + "distributed" + "/distributed.txt";
outputDir = baseDir + "/output";
//Write the data into the local filesystem for Left input
File tempFileleft = new File(input + "/input.txt");
FileUtils.writeStringToFile(tempFileleft, "Germany,Berlin", "UTF-8");
FileUtils.writeStringToFile(tempFileleft, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileleft, "India,Delhi", "UTF-8", true);
//Write the data into the local filesystem for right input
File tempFileRight = new File(distrubutedFilePath );
FileUtils.writeStringToFile(tempFileRight, "Berlin,Tegel", "UTF-8");
FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, "Berlin,Schonfield", "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
FileUtils.writeStringToFile(tempFileRight, "Delhi,IGI", "UTF-8", true);
countryToAirline = new HashMap<>();
}
@AfterClass
public static void cleanup() throws Exception {
//Delete the local filesystem folder after the Job is done
fs.delete(new Path(baseDir), true);
}
void fileToHashMap(String filePath) throws IOException {
//Read the data from the outputfile
File outputFile = new File(filePath);
String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");
//4 lines in output file, with one word per line
Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
String[] countryToAirlineArray = e.substring
(e.indexOf("(") + 1, e.indexOf(")")).split(",");
Set<String> airline = null;
if (countryToAirline.get(countryToAirlineArray[0]) == null) {
airline = new HashSet<String>();
airline.add(countryToAirlineArray[1]);
countryToAirline.put(countryToAirlineArray[0], airline);
} else {
airline = countryToAirline.get(countryToAirlineArray[0]);
airline.add(countryToAirlineArray[1]);
}
});
}
@Test
public void countryToAirlineTest() throws Exception {
// Any argument passed with -DKey=Value will be parsed by ToolRunner
String[] args = new String[]{
"-D" + MapSideJoin.BIG_FILE_INPUT_PATH + "=" + input,
"-D"+ MapSideJoin.SMALL_FILE_INPUT_PATH+"="+distrubutedFilePath,
"-D" + MapSideJoin.OUTPUT_PATH + "=" + outputDir,
"-D" + MapSideJoin.IS_RUN_LOCALLY + "=true",
"-D" + MapSideJoin.DEFAULT_FS + "=file:///",
"-D" + MapSideJoin.NUM_PARTITIONS + "=1"
};
MapSideJoin sparkHandler = new MapSideJoin();
conf.set(MapSideJoin.BIG_FILE_INPUT_PATH, input);
conf.set(MapSideJoin.SMALL_FILE_INPUT_PATH,distrubutedFilePath);
conf.set(MapSideJoin.OUTPUT_PATH,outputDir);
conf.set(MapSideJoin.IS_RUN_LOCALLY,"true");
conf.set(MapSideJoin.DEFAULT_FS,"file:///");
conf.set(MapSideJoin.NUM_PARTITIONS ,"1");
sparkHandler.setConf(conf);
sparkHandler.run(args);
fileToHashMap(outputDir + "/part-00000");
//4 words .
Assert.assertEquals(2L, countryToAirline.size());
Assert.assertEquals(2L, countryToAirline.get("Germany").size());
Assert.assertTrue(countryToAirline.get("Germany").contains("Tegel"));
Assert.assertTrue(countryToAirline.get("Germany").contains("Schonfield"));
}
}