Convert Avro Object To Row

Avro to Row conversion

Dependency :

<dependencies>
  <dependency>
        <groupId>com.twitter</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.5.0-cdh5.9.0</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.1</version> 
  </dependency> 
</dependencies>

[addToAppearHere]

Code :

 

package com.big.data.spark.converter;

import com.databricks.spark.avro.SchemaConverters;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;


public class AvroToRowConverter {

    private AvroToRowConverter() {
        // do nothing
    }

        public static <A extends GenericRecord> Row avroToRowConverter(A avrorecord) {

        if (null == avrorecord) {
            return null;
        }

        Object[] objectArray = new Object[avrorecord.getSchema().getFields().size()];
        StructType structType = (StructType) SchemaConverters.toSqlType(avrorecord.getSchema()).dataType();

        for (Schema.Field field : avrorecord.getSchema().getFields()) {
            objectArray[field.pos()] = avrorecord.get(field.pos());
        }

        return new GenericRowWithSchema(objectArray, structType);
    }
}


[addToAppearHere]

Test:

 

package com.big.data.spark.converter;

import com.big.data.avro.schema.Employee;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.junit.Assert;
import org.junit.Test;


public class AvroToRowConverterTest {

    private static final Schema AVRO_SCHEMA = SchemaBuilder
            .record("customRecord").namespace("com.big.data.spark.converter.avro")
            .fields()
            .name("int_field").type().intType().noDefault()
            .name("str_field").type().stringType().noDefault()
            .name("arr_str_field").type().array().items().stringType().noDefault()
            .name("map_str_field").type().map().values().stringType().noDefault()
            .endRecord();

    @Test
    public void avroToRowTest() throws Exception {
        // Avro Objects Extends GenericRecord
        Employee employee = new Employee();
        Row row =  AvroToRowConverter.avroToRowConverter(employee);
        Assert.assertEquals(row.get(0),employee.get(0));

    }

    @Test
    public void testEmptyRowMapping() throws Exception {
        final Row row = AvroToRowConverter.avroToRowConverter(new GenericData.Record(AVRO_SCHEMA));
        Assert.assertTrue(row.size() == AVRO_SCHEMA.getFields().size());
    }

    @Test
    public void testArrayRowMapping() throws Exception {
        final GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
        final String[] testStrArr = {"test_val1", "test_val2"};
        record.put("arr_str_field", testStrArr);
        final Row row = AvroToRowConverter.avroToRowConverter(record);
        Assert.assertTrue(row.size() == AVRO_SCHEMA.getFields().size());
    }

}