DataFrame Join in Spark
Data Frame Join in Spark (shuffle join )
Problem : Given a Big json file containing contry -> language mapping , and a big parquet file containing Employee info. The job is expected to output Employee to language based on the country. (Github)
1. Parquet file (Huge file on HDFS ) , Avro Schema :
root
|– emp_id: integer (nullable = false)
|– emp_name: string (nullable = false)
|– emp_country: string (nullable = false)
|– subordinates: map (nullable = true)
| |– key: string
| |– value: string (valueContainsNull = false)
2. Json File with schema (Small file that can be held in memory ):
[addToAppearHere]
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0-cdh5.9.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.big.data</groupId> <artifactId>avro-schema</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.5.0-cdh5.9.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.5.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.5.0-cdh5.9.0</version> </dependency> </dependencies> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> [addToAppearHere] Spark :
package com.big.data.spark; import com.big.data.avro.schema.Employee; import com.databricks.spark.avro.SchemaConverters; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; public class DataFrameJoin extends Configured implements Tool, Closeable { public static final String PARQUET_INPUT_PATH = "spark.parquet.input.path"; public static final String JSON_INPUT_PATH = "spark.json.input.path"; public static final String OUTPUT_PATH = "spark.output.path"; public static final String IS_RUN_LOCALLY = "spark.is.run.local"; public static final String DEFAULT_FS = "spark.default.fs"; public static final String NUM_PARTITIONS = "spark.num.partitions"; public static final String NEW_LINE_DELIMETER = "\n"; private SQLContext sqlContext; private JavaSparkContext javaSparkContext; private static final Logger LOG = LoggerFactory.getLogger(DataFrameJoin.class); protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal, final String defaultFs, final Class<T> tClass) { final SparkConf sparkConf = new SparkConf() //Set spark conf here , //After one gets spark context you can set hadoop configuration for InputFormats .setAppName(tClass.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); if (isRunLocal) { sparkConf.setMaster("local[*]"); } final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); if (defaultFs != null) { sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs); } return sparkContext; } @Override public int run(String[] args) throws Exception { //The arguments passed has been split into Key value by ToolRunner Configuration conf = getConf(); //Left path Parquet String parquetinputPath = conf.get(PARQUET_INPUT_PATH); //Right Path JSON String jsonInputPath = conf.get(JSON_INPUT_PATH); String outputPath = conf.get(OUTPUT_PATH); //Get spark context, This is the central context , which can be wrapped in Any Other context javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), conf.get(DEFAULT_FS), this.getClass()); sqlContext = new SQLContext(javaSparkContext); // No input path has been read, no job has not been started yet . //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value); // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD // Avro schema to StructType conversion final StructType outPutSchemaStructType = (StructType) SchemaConverters .toSqlType(Employee.getClassSchema()) .dataType(); // read data from parquetfile, // the schema of the data is taken from the avro schema (Schema Employee -> Country) DataFrame parquetDFLeft = sqlContext.read() .format(Employee.class.getCanonicalName()) .parquet(parquetinputPath); // show() is a action so donot have it enabled unecessary if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from parquet file is "); parquetDFLeft.printSchema(); parquetDFLeft.show(); } // Read Json file from Right (Json file schema : country -> langage ) DataFrame jsonDataframeRight = sqlContext.read().json(jsonInputPath); if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from Json file is "); jsonDataframeRight.printSchema(); jsonDataframeRight.show(); } // Inner Join // various Join Type => //"inner", "outer", "full", "fullouter", "leftouter", "left", //"rightouter", "right", "leftsemi" DataFrame join = parquetDFLeft.join(jsonDataframeRight, parquetDFLeft.col("emp_country") .equalTo(jsonDataframeRight.col("country"))); if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from Join is "); join.printSchema(); join.show(); } join.write() .format(Employee.class.getCanonicalName()) .parquet(outputPath); return 0; } @Override public void close() throws IOException { IOUtils.closeQuietly(javaSparkContext); } public static void main(String[] args) throws Exception { ToolRunner.run(new DataFrameJoin(), args); } } [addToAppearHere] Integration Test :
package com.big.data.spark; import com.big.data.avro.AvroUtils; import com.big.data.avro.schema.Employee; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.json.simple.JSONObject; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import parquet.avro.AvroParquetReader; import parquet.avro.AvroParquetWriter; import parquet.hadoop.ParquetReader; import parquet.hadoop.ParquetWriter; import parquet.hadoop.metadata.CompressionCodecName; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DataFrameJoinTest { private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class); private static final String BASEDIR = "/tmp/DataFrameJoinTest/avroparquetInputFile/" + System.currentTimeMillis() + "/"; private String parquetInput; private String jsonInput; private String output; private Employee employee; @Before public void inputDataPreperation() throws IOException { parquetInput = BASEDIR + "parquetInput/"; jsonInput = BASEDIR + "jsonInput"; output = BASEDIR + "output/"; // WRITE parquet file employee = new Employee(); employee.setEmpId(1); employee.setEmpName("Maverick"); employee.setEmpCountry("DE"); //Write parquet file with GZIP compression ParquetWriter<Object> writer = AvroParquetWriter .builder(new Path(parquetInput + "1.gz.parquet")) .withCompressionCodec(CompressionCodecName.GZIP) .withSchema(Employee.getClassSchema()) .build(); writer.write(employee); writer.close(); //Write the data into the local filesystem for Left input File tempFileleft = new File(jsonInput + "/input.txt"); Map<String,Object> jsonMap = new HashMap<>(); jsonMap.put("country","DE"); jsonMap.put("lang","german"); JSONObject obj = new JSONObject(jsonMap); FileUtils.writeStringToFile(tempFileleft, obj.toJSONString(), "UTF-8"); //FileUtils.writeStringToFile(tempFileleft, DataFrameJoin.NEW_LINE_DELIMETER, "UTF-8", true); } @Test public void testSuccess() throws Exception { String[] args = new String[]{"-D" + DataFrameJoin.PARQUET_INPUT_PATH + "=" + parquetInput, "-D" + DataFrameJoin.JSON_INPUT_PATH + "=" + jsonInput, "-D" + DataFrameJoin.OUTPUT_PATH + "=" + output, "-D" + DataFrameJoin.IS_RUN_LOCALLY + "=true", "-D" + DataFrameJoin.DEFAULT_FS + "=file:///", "-D" + DataFrameJoin.NUM_PARTITIONS + "=1"}; DataFrameJoin.main(args); ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build(); //Use .withConf(FS.getConf()) // for reading from a diferent HDFS and not local , by default the fs is local GenericData.Record event = (GenericData.Record) reader.read(); Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo (AvroUtils.convertAvroPOJOtoByteArray (event, Employee.getClassSchema()), Employee.getClassSchema()); reader.close(); LOG.info("Data read from Sparkoutput is {}", outputEvent.toString()); Assert.assertEquals(employee.getEmpId(), outputEvent.getEmpId()); } @After public void cleanup() throws IOException { FileUtils.deleteDirectory(new File(BASEDIR)); } }