CombineParquetFileInputFormat
CombineParquetInputFormat to read small parquet files in one task
Problem : Implement CombineParquetFileInputFormat to handle too many small parquet file problem on consumer side. (Github)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.9.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
[addToAppearHere]
Spark :
package com.big.data.spark;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
import java.io.IOException;
public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
return new CombineFileRecordReader(combineSplit,
context, CombineParquetrecordReader.class);
}
private static class CombineParquetrecordReader<T>
extends CombineFileRecordReaderWrapper<Void, T> {
public CombineParquetrecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer idx) throws
IOException, InterruptedException {
super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
}
}
}
Key Take Aways :
1. CombineParquetInputFormat spawns less number of Task to read the files.
2. CombineFileRecordReaderWrapper is the wrapper to initialize the recordReader with appropriate Combine split
3. It doesnt solve too many small file problem on HDFS.
[addToAppearHere]
Integration Test : (Github)
package com.big.data.spark;
import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.File;
import java.io.IOException;
public class CombineParquetInputFormatTest {
private static final Logger LOG = LoggerFactory.getLogger(CombineParquetInputFormatTest.class);
private static final String BASE_TEMP_FOLDER =
"/tmp/inputData/CombineParquetInputFormatIT/avroparquetInputFile/"
+ System.currentTimeMillis() + "/";
private Employee employee;
private JavaSparkContext sc;
private SQLContext sqlContext;
private String inputPath;
private String outputPath;
protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
final String defaultFs,
final Class<T> tClass) {
final SparkConf sparkConf = new SparkConf()
// Set spark conf here ,
// After one gets spark context you can set hadoop configuration for InputFormats
.setAppName(tClass.getSimpleName())
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
if (isRunLocal) {
sparkConf.setMaster("local[*]");
}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
if (defaultFs != null) {
sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
}
return sparkContext;
}
public static Object[] avroPojoToObjectArray(Employee input) {
Object[] outputValues = new Object[Employee.getClassSchema().getFields().size()];
for (Schema.Field field : input.getSchema().getFields()) {
Object fieldValue = input.get(field.name());
Integer fieldPos = field.pos();
if (fieldPos != null) {
outputValues[fieldPos] = fieldValue;
}
}
return outputValues;
}
@Before
public void setUp() throws IOException {
// Setting input and Output Path
inputPath = BASE_TEMP_FOLDER + "input/";
outputPath = BASE_TEMP_FOLDER + "output";
employee = new Employee();
employee.setEmpId(1);
employee.setEmpName("Maverick");
employee.setEmpCountry("DE");
//Write parquet file with GZIP compression
ParquetWriter<Object> writer = AvroParquetWriter
.builder(new Path(inputPath + "7.gz.parquet"))
.withCompressionCodec(CompressionCodecName.GZIP)
.withSchema(Employee.getClassSchema())
.build();
writer.write(employee);
writer.write(employee);
writer.write(employee);
writer.write(employee);
writer.close();
// Write another parquet File
ParquetWriter<Object> writer1 = AvroParquetWriter
.builder(new Path(inputPath + "8.gz.parquet"))
.withCompressionCodec(CompressionCodecName.GZIP)
.withSchema(Employee.getClassSchema())
.build();
writer1.write(employee);
writer1.write(employee);
writer1.write(employee);
writer1.write(employee);
writer1.close();
// For two files the InputFormat will use two partitions
}
@Test
public void testPartitionsInCombinedInputParquetFormat() throws IOException {
sc = getJavaSparkContext(true, "file:///", CombineParquetInputFormatTest.class);
sqlContext = new SQLContext(sc);
sc.hadoopConfiguration().setLong
("mapreduce.input.fileinputformat.split.maxsize", (long) 1024 * 1024);
JavaRDD<Row> profileJavaRDD = sc.newAPIHadoopFile
(inputPath, CombineParquetInputFormat.class,
Void.class, Employee.class, sc.hadoopConfiguration())
.values()
.map(p -> {
Row row = RowFactory.create(avroPojoToObjectArray((Employee) p));
return row;
});
StructType outputSchema = (StructType) SchemaConverters
.toSqlType(Employee.getClassSchema())
.dataType();
final DataFrame profileDataFrame = sqlContext.createDataFrame(profileJavaRDD, outputSchema);
profileDataFrame.cache();
profileDataFrame.printSchema();
profileDataFrame.show(100);
Assert.assertEquals(8, profileDataFrame.collect().length);
//Two files still only one task has been spawned so num partitions =1
Assert.assertEquals(1, profileDataFrame.rdd().partitions().length);
Assert.assertEquals(1, profileDataFrame.rdd().getNumPartitions());
//reading via standard spark datafarme which uses parquetInputFormat to read data
final DataFrame standardDataFrame = sqlContext
.read()
.format(Employee.class.getCanonicalName())
.parquet(inputPath);
standardDataFrame.cache();
Assert.assertEquals(8, standardDataFrame.collect().length);
// 2 is the no of partitions as two files are present in input
standardDataFrame.show(100);
Assert.assertEquals(2, standardDataFrame.rdd().partitions().length);
Assert.assertEquals(2, standardDataFrame.rdd().getNumPartitions());
}
@After
public void cleanup() throws IOException {
if (sc != null) {
sc.close();
}
FileUtils.deleteDirectory(new File(BASE_TEMP_FOLDER));
}
}