DataFrame Join in Spark

Data Frame Join in Spark (shuffle join )

Problem :  Given a Big json file containing contry -> language mapping ,  and a big parquet file containing  Employee info. The job is expected to output   Employee to language based on the country.  (Github)

1. Parquet file (Huge file on HDFS ) ,  Avro  Schema :

root 
|– emp_id: integer (nullable = false)
|– emp_name: string (nullable = false)
|– emp_country: string (nullable = false)
|– subordinates: map (nullable = true)
|    |– key: string
|    |– value: string (valueContainsNull = false)

2.  Json File  with schema (Small file that can be held in memory ):

[addToAppearHere]

Dependency :

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.big.data</groupId>
        <artifactId>avro-schema</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.5.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
    </dependency>
  </dependencies>
  
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>

[addToAppearHere]
Spark : 

package com.big.data.spark;

import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;


public class DataFrameJoin extends Configured implements Tool, Closeable {

    public static final String PARQUET_INPUT_PATH = "spark.parquet.input.path";
    public static final String JSON_INPUT_PATH = "spark.json.input.path";

    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";
    public static final String NEW_LINE_DELIMETER = "\n";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;
    private static final Logger LOG = LoggerFactory.getLogger(DataFrameJoin.class);

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                //Set spark conf here , 
                //After one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();

        //Left path Parquet
        String parquetinputPath = conf.get(PARQUET_INPUT_PATH);

        //Right Path JSON
        String jsonInputPath = conf.get(JSON_INPUT_PATH);

        String outputPath = conf.get(OUTPUT_PATH);

        //Get spark context, This is the central context , which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                conf.get(DEFAULT_FS), this.getClass());
        sqlContext = new SQLContext(javaSparkContext);

        // No input path has been read, no job has not been started yet .
        //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        // Avro schema to StructType conversion
        final StructType outPutSchemaStructType = (StructType) SchemaConverters
                                                    .toSqlType(Employee.getClassSchema())
                                                     .dataType();
        // read data from parquetfile,
       //  the schema of the data is taken from the avro schema (Schema Employee -> Country)
        DataFrame parquetDFLeft = sqlContext.read()
                                            .format(Employee.class.getCanonicalName())
                                            .parquet(parquetinputPath);

        // show() is a action so donot have it enabled unecessary
        if (LOG.isDebugEnabled()) {
            LOG.info("Schema and Data from parquet file is ");
            parquetDFLeft.printSchema();
            parquetDFLeft.show();
        }

        // Read Json file from Right (Json file schema  : country -> langage )
        DataFrame jsonDataframeRight = sqlContext.read().json(jsonInputPath);
        if (LOG.isDebugEnabled()) {
            LOG.info("Schema and Data from Json file is ");
            jsonDataframeRight.printSchema();
            jsonDataframeRight.show();
        }

        // Inner Join
        // various Join Type =>  
        //"inner", "outer", "full", "fullouter", "leftouter", "left", 
        //"rightouter", "right", "leftsemi"
        DataFrame join = parquetDFLeft.join(jsonDataframeRight, parquetDFLeft.col("emp_country")
                                      .equalTo(jsonDataframeRight.col("country")));
        if (LOG.isDebugEnabled()) {
            LOG.info("Schema and Data from Join is  ");
            join.printSchema();
            join.show();
        }

        join.write()
            .format(Employee.class.getCanonicalName())
            .parquet(outputPath);

        return 0;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new DataFrameJoin(), args);
    }
}

[addToAppearHere]
Integration Test :

package com.big.data.spark;

import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


public class DataFrameJoinTest {

    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
    private static final String BASEDIR = 
           "/tmp/DataFrameJoinTest/avroparquetInputFile/" + System.currentTimeMillis() + "/";
    private String parquetInput;
    private String jsonInput;
    private String output;

    private Employee employee;

    @Before
    public void inputDataPreperation() throws IOException {

        parquetInput = BASEDIR + "parquetInput/";
        jsonInput = BASEDIR + "jsonInput";
        output = BASEDIR + "output/";


        // WRITE parquet file
        employee = new Employee();
        employee.setEmpId(1);
        employee.setEmpName("Maverick");
        employee.setEmpCountry("DE");

        //Write parquet file with GZIP compression
        ParquetWriter<Object> writer = AvroParquetWriter
                                             .builder(new Path(parquetInput + "1.gz.parquet"))
                                             .withCompressionCodec(CompressionCodecName.GZIP)
                                             .withSchema(Employee.getClassSchema())
                                             .build();
        writer.write(employee);
        writer.close();


        //Write the data into the local filesystem  for Left input
        File tempFileleft = new File(jsonInput + "/input.txt");
        Map<String,Object> jsonMap = new HashMap<>();
        jsonMap.put("country","DE");
        jsonMap.put("lang","german");
        JSONObject obj = new JSONObject(jsonMap);

        FileUtils.writeStringToFile(tempFileleft, obj.toJSONString(), "UTF-8");
        //FileUtils.writeStringToFile(tempFileleft, DataFrameJoin.NEW_LINE_DELIMETER, "UTF-8", true);
    }

    @Test
    public void testSuccess() throws Exception {

        String[] args = new String[]{"-D" + DataFrameJoin.PARQUET_INPUT_PATH + "=" + parquetInput,
                "-D" + DataFrameJoin.JSON_INPUT_PATH + "=" + jsonInput,
                "-D" + DataFrameJoin.OUTPUT_PATH + "=" + output,
                "-D" + DataFrameJoin.IS_RUN_LOCALLY + "=true",
                "-D" + DataFrameJoin.DEFAULT_FS + "=file:///",
                "-D" + DataFrameJoin.NUM_PARTITIONS + "=1"};

        DataFrameJoin.main(args);

        ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
        //Use .withConf(FS.getConf())
        // for reading from a diferent HDFS and not local , by default the fs is local

        GenericData.Record event = (GenericData.Record) reader.read();
        Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo
                     (AvroUtils.convertAvroPOJOtoByteArray
                                 (event, Employee.getClassSchema()), Employee.getClassSchema());
        reader.close();
        LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
        Assert.assertEquals(employee.getEmpId(), outputEvent.getEmpId());
    }

    @After
    public void cleanup() throws IOException {
        FileUtils.deleteDirectory(new File(BASEDIR));
    }
}