Read Write Parquet Files using Spark

Read Write  Parquet Files using Spark

  Problem : Using spark read and write Parquet Files , data schema available as Avro. (Github)

Solution :  JavaSparkContext => SQLContext => DataFrame => Row => DataFrame => parquet

Dependency:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.big.data</groupId>
        <artifactId>avro-schema</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.5.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
    </dependency>
  </dependencies>
  
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
   </repositories>





Spark :

 

package com.big.data.spark;

import com.big.data.avro.schema.Employee;
import com.databricks.spark.avro.SchemaConverters;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;

import java.io.Closeable;
import java.io.IOException;


public class ReadWriteAvroParquetFiles extends Configured implements Tool, Closeable {

    public static final String INPUT_PATH = "spark.input.path";
    public static final String OUTPUT_PATH = "spark.output.path";
    public static final String IS_RUN_LOCALLY = "spark.is.run.local";
    public static final String DEFAULT_FS = "spark.default.fs";
    public static final String NUM_PARTITIONS = "spark.num.partitions";
    private static final String NEW_LINE_DELIMETER = "\n";

    private SQLContext sqlContext;
    private JavaSparkContext javaSparkContext;

    protected <T> JavaSparkContext getJavaSparkContext(final boolean isRunLocal,
                                                       final String defaultFs,
                                                       final Class<T> tClass) {
        final SparkConf sparkConf = new SparkConf()
                // Set spark conf here , 
                // after one gets spark context you can set hadoop configuration for InputFormats
                .setAppName(tClass.getSimpleName())
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        if (isRunLocal) {
            sparkConf.setMaster("local[*]");
        }

        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        if (defaultFs != null) {
            sparkContext.hadoopConfiguration().set("fs.defaultFS", defaultFs);
        }

        return sparkContext;
    }

    @Override
    public int run(String[] args) throws Exception {

        //The arguments passed has been split into Key value by ToolRunner
        Configuration conf = getConf();
        String inputPath = conf.get(INPUT_PATH);

        String outputPath = conf.get(OUTPUT_PATH);

        //Get spark context, This is the central context , which can be wrapped in Any Other context
        javaSparkContext = getJavaSparkContext(conf.getBoolean(IS_RUN_LOCALLY, Boolean.FALSE), 
                                                            conf.get(DEFAULT_FS), this.getClass());
        sqlContext = new SQLContext(javaSparkContext);

        // No input path has been read, no job has not been started yet .
        // To set any configuration use javaSparkContext.hadoopConfiguration().set(Key,value);
        // To set any custom inputformat use javaSparkContext.newAPIHadoopFile() and get a RDD

        // Avro schema to StructType conversion
        final StructType outPutSchemaStructType = (StructType) SchemaConverters.toSqlType
                                                         (Employee.getClassSchema()).dataType();

        // read data from parquetfile, the schema of the data is taken from the avro schema
        DataFrame inputDf = sqlContext.read().format(Employee.class.getCanonicalName()).parquet(inputPath);

        // convert DataFrame into JavaRDD
        // The rows read from the parquetfile is converted into a Row object . 
        // Row has same schema as that of the parquet file roe
        JavaRDD<Row> rowJavaRDD = inputDf.javaRDD();

        DataFrame outputDf = sqlContext.createDataFrame(rowJavaRDD, outPutSchemaStructType);

        // Convert JavaRDD to dataframe and save into parquet file
        outputDf
                .write()
                .format(Employee.class.getCanonicalName())
                .parquet(outputPath);

        return 0;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(javaSparkContext);
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new ReadWriteAvroParquetFiles(), args);
    }

}

Key Take Aways :
1. Spark context is used to get SQLContext.
2. Using SQLContext one can read parquet files and get dataFrames.
3. From DataFrame one can get Rows if needed
4. Using DataFrame one can write back as parquet Files.
5. Rows can be converted into DataFrame using sqlContext.createDataFrame()
6. SchemaConverters.toSqlType can be used for converting AvroSchema to StructType

Integration Test : (Github)

 

package com.big.data.spark;

import com.big.data.avro.AvroUtils;
import com.big.data.avro.schema.Employee;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.avro.AvroParquetReader;
import parquet.avro.AvroParquetWriter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.metadata.CompressionCodecName;

import java.io.File;
import java.io.IOException;


public class ReadWriteAvroParquetFilesTest {

    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteAvroParquetFilesTest.class);
    private static final String BASEDIR = "/tmp/ReadWriteAvroParquetFilesTest/avroparquetInputFile/"
                                                  + System.currentTimeMillis() + "/";
    private String input;
    private String output;

    private Employee employee;

    @Before
    public void setUp() throws IOException {

        input = BASEDIR + "input/";
        output = BASEDIR + "output/";

        employee = new Employee();
        employee.setEmpId(1);
        employee.setEmpName("Maverick");
        employee.setEmpCountry("DE");

        //Write parquet file with GZIP compression
        ParquetWriter<Object> writer = AvroParquetWriter.builder(new Path(input + "1.gz.parquet"))
                                      .withCompressionCodec(CompressionCodecName.GZIP)
                                      .withSchema(Employee.getClassSchema())
                                      .build();
        writer.write(employee);
        writer.close();

    }

    @Test
    public void testSuccess() throws Exception {

        String[] args = new String[]{"-D" + ReadWriteAvroParquetFiles.INPUT_PATH + "=" + input,
                "-D" + ReadWriteAvroParquetFiles.OUTPUT_PATH + "=" + output,
                "-D" + ReadWriteAvroParquetFiles.IS_RUN_LOCALLY + "=true",
                "-D" + ReadWriteAvroParquetFiles.DEFAULT_FS + "=file:///",
                "-D" + ReadWriteAvroParquetFiles.NUM_PARTITIONS + "=1"};

        ReadWriteAvroParquetFiles.main(args);

        ParquetReader<GenericRecord> reader = AvroParquetReader.builder(new Path(output)).build();
        //Use .withConf(FS.getConf()) for reading from a diferent HDFS and not local
        // by default the fs is local

        GenericData.Record event = (GenericData.Record) reader.read();
        Employee outputEvent = AvroUtils.convertByteArraytoAvroPojo
                          (AvroUtils.convertAvroPOJOtoByteArray(event, Employee.getClassSchema()), 
                                                                          Employee.getClassSchema());
        reader.close();
        LOG.info("Data read from Sparkoutput is {}", outputEvent.toString());
        Assert.assertEquals(employee.getEmpId(), outputEvent.getEmpId());
    }

    @After
    public void cleanup() throws IOException {
        FileUtils.deleteDirectory(new File(BASEDIR));
    }

}