Avro serialization de-serialization using Confluent Schema registry

Avro serialization de-serialization using Confluent Schema registry

Dependencies

<dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>2.0.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.9</version>
        </dependency>
        <dependency>
            <groupId>com.big.data</groupId>
            <artifactId>avro-schema</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>

 

 

 

SERDE using Confluent Schema Service

 

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;

import java.util.HashMap;
import java.util.Map;

public class ConfluentSchemaService {

    public static final String TOPIC = "DUMMYTOPIC";

    private KafkaAvroSerializer avroSerializer;
    private KafkaAvroDeserializer avroDeserializer;

    public ConfluentSchemaService(String conFluentSchemaRigistryURL) {

        //PropertiesMap
        Map<String, String> propMap = new HashMap<>();
        propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
        // Output afterDeserialize should be a specific Record and not Generic Record
        propMap.put("specific.avro.reader", "true");

        avroSerializer = new KafkaAvroSerializer();
        avroSerializer.configure(propMap, true);

        avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(propMap, true);
    }

    public String hexBytesToString(byte[] inputBytes) {
        return Hex.encodeHexString(inputBytes);
    }

    public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
        return Hex.decodeHex(hexEncodedString.toCharArray());
    }

    public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
        return avroSerializer.serialize(TOPIC, avroRecord);
    }

    public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
        return avroDeserializer.deserialize(TOPIC, avroBytearray);
    }
}

[addToAppearHere]

Key Take Aways:

  • Unlike Thrift, avro serialized objects do not hold any schema.
  • As there is no schema stored in the serialized byte array, one has to provide the schema with which it was written.
  • Confluent Schema Registry provides a service to maintain schema versions.
  • Confluent provides Cached Schema Client, which checks in cache first before sending the request over the network.
  • Json Schema present in “avsc” file is different from the schema present in Avro Object.
  • All Avro objects extends from Generic Record
  • During Serialization : based on schema of the Avro Object a schema Id is requested from the Confluent Schema Registry.
  • The schemaId which is a INTEGER is converted to Bytes and prepend to serialized AvroObject.
  • During Deserialization : First 4 bytes are removed from the ByteArray.  4 bytes are converted back to INTEGER(SchemaId)
  • Schema is requested from the Confluent Schema Registry and using this schema the byteArray is deserialized.

 

Test:


import com.big.data.avro.schema.Employee;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.DecoderException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class ConfluentSchemaServiceIT {

    private static final String REGISTRYURL = "http://YOUR-CONFLUENT_REGISTRY-SERVICE:8081";

    //OldRTProfile has department field
    private static final String oldEmployeeSchemaString = "{\n" +
            "    \"name\": \"com.big.data.avro.schema.Employee\",\n" +
            "    \"type\": \"record\",\n" +
            "    \"fields\": [{\n" +
            "            \"name\": \"emp_id\",\n" +
            "            \"type\": \"int\",\n" +
            "            \"doc\": \"employee Id of the employee\"\n" +
            "        },\n" +
            "        {\n" +
            "             \"name\": \"emp_name\",\n" +
            "             \"type\": \"string\",\n" +
            "             \"doc\": \"employee name of the employee\"\n" +
            "        },\n" +
            "        {\n" +
            "              \"name\": \"emp_country\",\n" +
            "              \"type\": \"string\",\n" +
            "              \"doc\": \"country of residence\"\n" +
            "        },\n" +
            "        {      \"name\": \"bonus\",\n" +
            "               \"type\": [\"null\", \"long\"],\n" +
            "               \"default\": null,\n" +
            "               \"doc\": \"bonus received on a yearly basis\"\n" +
            "        },\n" +
            "       {\n" +
            "               \"name\": \"subordinates\",\n" +
            "               \"type\": [\"null\", {\"type\": \"map\", \"values\": \"string\"}],\n" +
            "               \"default\": null,\n" +
            "               \"doc\": \"map of subordinates Name and Designation\"\n" +
            "        }\n" +
            "        ]\n" +
            " }";

    public static final String EMP_ID = "emp_id";
    public static final String EMP_NAME = "emp_name";
    public static final String EMP_COUNTRY = "emp_country";
    public static final String COUNTRY = "NETHERLANDS";
    private List departMentList;
    private Map<String, String> subordinateMap;
    private ConfluentSchemaService serde;

    @Before
    public void setUp() {

        //Initialize the ConfluentSerDe
        serde = new ConfluentSchemaService(REGISTRYURL);

        departMentList = Arrays.asList("cat1", "cat2");

        subordinateMap = new HashMap<>();
        subordinateMap.put("maverick", "Junior SE");
    }

    // Write data with new Schema and read with New Schema
    @Test
    public void serDeAvroWriteNewAndReadNewSchemaTest() throws DecoderException {

        Employee employee = Employee.newBuilder()
                                    .setEmpId(1)
                                    .setEmpCountry(COUNTRY)
                                    .setDepartments(departMentList)
                                    .setSubordinates(subordinateMap)
                                    .build();

        // add schemaVersion to SerializeAvroPOJO + encode to HEX
        String hexProfileString = serde.
                           hexBytesToString(serde.serializeAvroPOJOToBytes(employee));

        // Decode HexString to Bytes +  remove the avro schema version and seserialize
        Employee employeeNew = (Employee) serde.
                           deserializeBytesToAvroPOJO(serde.hexStringToBytes(hexProfileString));

        Assert.assertEquals(new Long(1), employeeNew.getEmpId());
        Assert.assertEquals(COUNTRY, employeeNew.getEmpCountry());
        Assert.assertEquals(departMentList, employeeNew);
        Assert.assertEquals(subordinateMap, employeeNew.getSubordinates());

    }


    // Write data with old Schema but read with New Schema
    @Test
    public void serDeAvroWriteOldAndReadNewSchemaTest() throws Exception {

        // Old RTprofile Schema
        Schema oldEmployeeSchema = new Schema.Parser().parse(oldEmployeeSchemaString);

        // Get a Generic record from oldRtProfileSchema
        GenericRecord oldEmployeeRecord = new GenericData.Record(oldEmployeeSchema);

        oldEmployeeRecord.put(EMP_ID, 1L);
        oldEmployeeRecord.put(EMP_NAME, "Mavrick");
        oldEmployeeRecord.put(EMP_COUNTRY, COUNTRY);
        oldEmployeeRecord.put("subordinates", subordinateMap);

        // add schemaVersion to SerializeAvroPOJO + encode to HEX
        String hexProfileString = serde.
                              hexBytesToString(serde.serializeAvroPOJOToBytes(oldEmployeeRecord));

        // Decode HexString to Bytes +  remove the avro schema version and seserialize
        Employee employeeNew = (Employee) serde.
                              deserializeBytesToAvroPOJO(serde.hexStringToBytes(hexProfileString));


        Assert.assertNotNull(employeeNew);
        Assert.assertEquals(new Long(1), employeeNew.getEmpId());
        Assert.assertEquals(COUNTRY, employeeNew.getEmpCountry());
        Assert.assertTrue(employeeNew.getDepartments().isEmpty());
        Assert.assertEquals(subordinateMap, employeeNew.getSubordinates());

    }

}