DataFrame Broadcast Join
Broadcast Join
Problem : Given a json file(small size) containing contry -> language mapping , and a big parquet file containing Employee info. The job is expected to outtput Employee to language based on the country. (Github)
1. Parquet file (Huge file on HDFS ) , Schema :
root
|– emp_id: integer (nullable = false)
|– emp_name: string (nullable = false)
|– emp_country: string (nullable = false)
|– subordinates: map (nullable = true)
| |– key: string
| |– value: string (valueContainsNull = false)
2. Json File with schema (Small file that can be held in memory ):
root
|– country: string (nullable = true)
|– lang: string (nullable = true)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.big.data</groupId>
<artifactId>avro-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark; import com.big.data.avro.schema.Employee; import com.databricks.spark.avro.SchemaConverters; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; public class DataFrameBroadcastJoin extends Configured implements Tool, Closeable { public static final String PARQUET_INPUT_PATH = "spark.parquet.input.path"; public static final String JSON_INPUT_PATH = "spark.json.input.path"; public static final String OUTPUT_PATH = "spark.output.path"; public static final String IS_RUN_LOCALLY = "spark.is.run.local"; public static final String DEFAULT_FS = "spark.default.fs"; public static final String NUM_PARTITIONS = "spark.num.partitions"; public static final String NEW_LINE_DELIMETER = "\n"; private SQLContext sqlContext; private JavaSparkContext javaSparkContext; private static final Logger LOG = LoggerFactory.getLogger(DataFrameJoin.class); protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal, final String defaultFs, final Class<T> tClass) { final SparkConf sparkConf = new SparkConf() //Set spark conf here , //after one gets spark context you can set hadoop configuration for InputFormats .setAppName(tClass.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); if (isRunLocal) { sparkConf.setMaster("local[*]"); } final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); if (defaultFs != null) { sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs); } return sparkContext; } @Override public int run(String[] args) throws Exception { //The arguments passed has been split into Key value by ToolRunner Configuration conf = getConf(); //Left path Parquet String parquetinputPath = conf.get(PARQUET_INPUT_PATH); //Right Path JSON String jsonInputPath = conf.get(JSON_INPUT_PATH); String outputPath = conf.get(OUTPUT_PATH); //Get spark context, //This is the central context , which can be wrapped in Any Other context javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), conf.get(DEFAULT_FS), this.getClass()); sqlContext = new SQLContext(javaSparkContext); // No input path has been read, no job has not been started yet . //To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value); // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD // Avro schema to StructType conversion final StructType outPutSchemaStructType = (StructType) SchemaConverters .toSqlType(Employee.getClassSchema()).dataType(); // read data from parquetfile, // the schema of the data is taken from the avro schema (Schema Employee -> Country) DataFrame parquetDFLeft = sqlContext.read() .format(Employee.class.getCanonicalName()) .parquet(parquetinputPath); // Show is a action so donot have it enabled unecessary if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from parquet file is "); parquetDFLeft.printSchema(); parquetDFLeft.show(); } // Read Json file from Right (Json file schema : country -> langage ) DataFrame jsonDataframeRight = sqlContext.read().json(jsonInputPath); if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from Json file is "); jsonDataframeRight.printSchema(); jsonDataframeRight.show(); } // Inner Join // various Join Type => "inner", "outer", "full", "fullouter", //"leftouter", "left", "rightouter", "right", "leftsemi" // Broadcast => functions.broadcast(DataFrame) DataFrame join = parquetDFLeft.join (functions.broadcast(jsonDataframeRight), parquetDFLeft.col("emp_country"). equalTo(jsonDataframeRight.col("country"))); if (LOG.isDebugEnabled()) { LOG.info("Schema and Data from Join is "); join.printSchema(); join.show(); } join .write() .format(Employee.class.getCanonicalName()) .parquet(outputPath); return 0; } @Override public void close() throws IOException { IOUtils.closeQuietly(javaSparkContext); } public static void main(String[] args) throws Exception { ToolRunner.run(new DataFrameJoin(), args); } } [addToAppearHere]
Integration Test : (Github)
package com.big.data.spark; import com.big.data.avro.AvroUtils; import com.big.data.avro.schema.Employee; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.json.simple.JSONObject; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import parquet.avro.AvroParquetReader; import parquet.avro.AvroParquetWriter; import parquet.hadoop.ParquetReader; import parquet.hadoop.ParquetWriter; import parquet.hadoop.metadata.CompressionCodecName; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DataFrameBroadcastJoinTest { private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class); private static final String BASEDIR = "/tmp/BroadcastJoinTest/avroparquetInputFile/" + System.currentTimeMillis() + "/"; private String parquetInput; private String jsonInput; private String output; private Employee employee; @Before public void inputDataPreperation() throws IOException { parquetInput = BASEDIR + "parquetInput/"; jsonInput = BASEDIR + "jsonInput"; output = BASEDIR + "output/"; // WRITE parquet file employee = new Employee(); employee.setEmpId(1); employee.setEmpName("Maverick"); employee.setEmpCountry("DE"); //Write parquet file with GZIP compression ParquetWriter<Object> writer = AvroParquetWriter .builder(new Path(parquetInput + "1.gz.parquet")) .withCompressionCodec(CompressionCodecName.GZIP) .withSchema(Employee.getClassSchema()) .build(); writer.write(employee); writer.close(); //Write the data into the local filesystem for Left input File tempFileleft = new File(jsonInput + "/input.txt"); Map<String,Object> jsonMap = new HashMap<>(); jsonMap.put("country","DE"); jsonMap.put("lang","german"); JSONObject obj = new JSONObject(jsonMap); FileUtils.writeStringToFile(tempFileleft, obj.toJSONString(), "UTF-8"); //FileUtils.writeStringToFile(tempFileleft, DataFrameJoin.NEW_LINE_DELIMETER, "UTF-8",true); } @Test public void testSuccess() throws Exception { String[] args = new String[]{"-D" + DataFrameJoin.PARQUET_INPUT_PATH + "=" + parquetInput, "-D" + DataFrameJoin.JSON_INPUT_PATH + "=" + jsonInput, "-D" + DataFrameJoin.OUTPUT_PATH + "=" + output, "-D" + DataFrameJoin.IS_RUN_LOCALLY + "=true", "-D" + DataFrameJoin.DEFAULT_FS + "=file:///", "-D" + DataFrameJoin.NUM_PARTITIONS + "=1"}; DataFrameJoin.main(args); ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)) .build(); //Use .withConf(FS.getConf()) // For reading from a diferent HDFS and not local , by default the fs is local GenericData.Record event = (GenericData.Record) reader.read(); Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo (AvroUtils.convertAvroPOJOtoByteArray(event, Employee.getClassSchema()), Employee.getClassSchema()); reader.close(); LOG.info("Data read from Sparkoutput is {}", outputEvent.toString()); Assert.assertEquals(employee.getEmpId(), outputEvent.getEmpId()); } @After public void cleanup() throws IOException { FileUtils.deleteDirectory(new File(BASEDIR)); } }