Map Side Joins in Spark

Map Side Join in Spark

Problem :  Given  country to city mapping in a huge Text file on HDFS , and a small file(can fit in memory) of city to Airlines Mapping , The job is expected to perform  map side joins and generate country to airlines on HDFS .  (Github)

 

 

Solution : Broadcast the small dataSet ,  lookup in map operation for each input data set .

Dependency:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.big.data</groupId>
        <artifactId>avro-schema</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.5.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
    </dependency>
  </dependencies>
  
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>

[addToAppearHere]

Spark :

 

package com.big.data.spark;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Input from file bigfile : country -> city
 * City To Airline : small data
 * Outcome is to find country -> Airline and save it onto disk
 */
public class MapSideJoin extends Configured implements Tool, Closeable {

    public static final String BIG_FILE_INPUT_PATH = "spark.big.file.input.path";
    public static final String SMALL_FILE_INPUT_PATH = "spark.small.file.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";
    private static final String NEW_LINE_DELIMETER = "\n";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , 
                //after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    /**
     * read from file local/HDFS and poupulate city to airline . 1 city => n airlines
     *
     * @param inputPath
     * @param fileSystem
     * @return
     * @throws IOException
     */

    protected Map<String, Set<String>> formCityToAirlineHashMap
                        (String inputPath, FileSystem fileSystem) throws IOException {

        Map<String, Set<String>> cityToAirline = new HashMap<>();
        //Read the data from the outputfile
        try (BufferedReader reader = 
             new BufferedReader(
                new InputStreamReader(
                    fileSystem.open(new Path("file:///" + inputPath))))) {

            String line = reader.readLine();
            while (line != null) {

                String[] cityToAirlineArray = line.split(",");
                Set<String> airline = null;

                if (cityToAirline.get(cityToAirlineArray[0]) == null) {
                    airline = new HashSet<String>();
                    airline.add(cityToAirlineArray[1]);
                    cityToAirline.put(cityToAirlineArray[0], airline);

                } else {
                    airline = cityToAirline.get(cityToAirlineArray[0]);
                    airline.add(cityToAirlineArray[1]);
                }

                line = reader.readLine();
            }

        }

        return cityToAirline;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String countryToCityPath = conf.get(BIG_FILE_INPUT_PATH);

        // The filename is also included in the path
        String cityToAirlinesFilePath = conf.get(SMALL_FILE_INPUT_PATH);
        String outputPath = conf.get(OUTPUT_PATH);


        //Get spark context, This is the central context , 
        //which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                 conf.get(DEFAULT_FS), this.getClass());

        Map<String, Set<String>> cityToAirline =
                 formCityToAirlineHashMap(cityToAirlinesFilePath, FileSystem.get(conf));

        Broadcast<Map<String, Set<String>>> airlinesBroadcast = 
                             javaSparkContext.broadcast(cityToAirline);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(countryToCityPath);
        stringJavaRDD.map(new Lookup(airlinesBroadcast))
                     .flatMap(e-> ((ArrayList)e))
                     // How many partitions to slpit the output into
                     .repartition(conf.getInt(conf.get(NUM_PARTITIONS), 1))
                     .saveAsTextFile(outputPath);
        return 0;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }



    public static class Lookup implements Function<String, Object> {

        private Broadcast<Map<String, Set<String>>> airlinesBroadcast;
        private String temp;

        public Lookup(Broadcast<Map<String, Set<String>>> airlinesBroadcast) {
            this.airlinesBroadcast = airlinesBroadcast;
            temp = DEFAULT_FS;

        }

        @Override
        public Object call(String v1) throws Exception {
            ArrayList<Tuple2<String, String>> outuputList = new ArrayList<>();

            String[] countryToCityArray = v1.split(",");
            Set<String> airlines = airlinesBroadcast.getValue()
                                                    .get(countryToCityArray[1]);

            if (airlines != null) {
                airlines.stream().forEach(e -> {
                    outuputList.add(new Tuple2<String, String>(countryToCityArray[0], e));
                });
            } else {
                return null;
            }
            // returning back a list , will have to flatten it to store appropriately
            return outuputList;

        }
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new MapSideJoin(), args);
    }

}

 

Key Take Aways :
1. Small dataSet is broadcasted to all the executor.
2. In map transformation , broadcast is used to lookup for the value.
3. This is the same concept of ditributed cache of MapReduce.

[addToAppearHere]

Integration Test : (Github)

 

package com.big.data.spark;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;


public class MapSideJoinTest {
    private static Configuration conf;
    private static FileSystem fs;
    private static String baseDir;
    private static String outputDir;
    private static String input;
    // File Path along with fileName needs to be provided
    private static String distrubutedFilePath;
    private static final String NEW_LINE_DELIMETER = "\n";
    private static Map<String, Set<String>> countryToAirline;

    @BeforeClass
    public static void startup() throws Exception {

        conf = new Configuration();
        //set the fs to file:/// which means the local fileSystem
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        fs = FileSystem.getLocal(conf);
        baseDir = "/tmp/spark/mapsideJoin/" + UUID.randomUUID().toString() + "/";



        input = baseDir + "inputdir";
        distrubutedFilePath = baseDir + "distributed" + "/distributed.txt";

        outputDir = baseDir + "/output";

        //Write the data into the local filesystem  for Left input
        File tempFileleft = new File(input + "/input.txt");
        FileUtils.writeStringToFile(tempFileleft, "Germany,Berlin", "UTF-8");
        FileUtils.writeStringToFile(tempFileleft, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileleft, "India,Delhi", "UTF-8", true);

        //Write the data into the local filesystem  for right input
        File tempFileRight = new File(distrubutedFilePath );
        FileUtils.writeStringToFile(tempFileRight, "Berlin,Tegel", "UTF-8");
        FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, "Berlin,Schonfield", "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, NEW_LINE_DELIMETER, "UTF-8", true);
        FileUtils.writeStringToFile(tempFileRight, "Delhi,IGI", "UTF-8", true);

        countryToAirline = new HashMap<>();
    }

    @AfterClass
    public static void cleanup() throws Exception {
        //Delete the local filesystem folder after the Job is done
        fs.delete(new Path(baseDir), true);
    }

    void fileToHashMap(String filePath) throws IOException {

        //Read the data from the outputfile
        File outputFile = new File(filePath);
        String fileToString = FileUtils.readFileToString(outputFile, "UTF-8");

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] countryToAirlineArray = e.substring
                                    (e.indexOf("(") + 1, e.indexOf(")")).split(",");
            Set<String> airline = null;

            if (countryToAirline.get(countryToAirlineArray[0]) == null) {
                airline = new HashSet<String>();
                airline.add(countryToAirlineArray[1]);
                countryToAirline.put(countryToAirlineArray[0], airline);

            } else {
                airline = countryToAirline.get(countryToAirlineArray[0]);
                airline.add(countryToAirlineArray[1]);
            }
        });

    }

    @Test
    public void countryToAirlineTest() throws Exception {

        // Any argument passed with -DKey=Value will be parsed by ToolRunner
        String[] args = new String[]{
                "-D" + MapSideJoin.BIG_FILE_INPUT_PATH + "=" + input,
                "-D"+  MapSideJoin.SMALL_FILE_INPUT_PATH+"="+distrubutedFilePath,
                "-D" + MapSideJoin.OUTPUT_PATH + "=" + outputDir,
                "-D" + MapSideJoin.IS_RUN_LOCALLY + "=true",
                "-D" + MapSideJoin.DEFAULT_FS + "=file:///",
                "-D" + MapSideJoin.NUM_PARTITIONS + "=1"
        };

        MapSideJoin sparkHandler = new MapSideJoin();
        conf.set(MapSideJoin.BIG_FILE_INPUT_PATH, input);
        conf.set(MapSideJoin.SMALL_FILE_INPUT_PATH,distrubutedFilePath);
        conf.set(MapSideJoin.OUTPUT_PATH,outputDir);
        conf.set(MapSideJoin.IS_RUN_LOCALLY,"true");
        conf.set(MapSideJoin.DEFAULT_FS,"file:///");
        conf.set(MapSideJoin.NUM_PARTITIONS ,"1");
        sparkHandler.setConf(conf);
        sparkHandler.run(args);



        fileToHashMap(outputDir + "/part-00000");

        //4 words .
        Assert.assertEquals(2L, countryToAirline.size());
        Assert.assertEquals(2L, countryToAirline.get("Germany").size());
        Assert.assertTrue(countryToAirline.get("Germany").contains("Tegel"));
        Assert.assertTrue(countryToAirline.get("Germany").contains("Schonfield"));

    }
}