Avro serialization de-serialization using Confluent Schema registry
Avro serialization de-serialization using Confluent Schema registry
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.big.data</groupId>
<artifactId>avro-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
SERDE using Confluent Schema Service
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import java.util.HashMap;
import java.util.Map;
public class ConfluentSchemaService {
public static final String TOPIC = "DUMMYTOPIC";
private KafkaAvroSerializer avroSerializer;
private KafkaAvroDeserializer avroDeserializer;
public ConfluentSchemaService(String conFluentSchemaRigistryURL) {
//PropertiesMap
Map<String, String> propMap = new HashMap<>();
propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
// Output afterDeserialize should be a specific Record and not Generic Record
propMap.put("specific.avro.reader", "true");
avroSerializer = new KafkaAvroSerializer();
avroSerializer.configure(propMap, true);
avroDeserializer = new KafkaAvroDeserializer();
avroDeserializer.configure(propMap, true);
}
public String hexBytesToString(byte[] inputBytes) {
return Hex.encodeHexString(inputBytes);
}
public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
return Hex.decodeHex(hexEncodedString.toCharArray());
}
public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
return avroSerializer.serialize(TOPIC, avroRecord);
}
public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
return avroDeserializer.deserialize(TOPIC, avroBytearray);
}
}
[addToAppearHere]
Key Take Aways:
- Unlike Thrift, avro serialized objects do not hold any schema.
- As there is no schema stored in the serialized byte array, one has to provide the schema with which it was written.
- Confluent Schema Registry provides a service to maintain schema versions.
- Confluent provides Cached Schema Client, which checks in cache first before sending the request over the network.
- Json Schema present in “avsc” file is different from the schema present in Avro Object.
- All Avro objects extends from Generic Record
- During Serialization : based on schema of the Avro Object a schema Id is requested from the Confluent Schema Registry.
- The schemaId which is a INTEGER is converted to Bytes and prepend to serialized AvroObject.
- During Deserialization : First 4 bytes are removed from the ByteArray. 4 bytes are converted back to INTEGER(SchemaId)
- Schema is requested from the Confluent Schema Registry and using this schema the byteArray is deserialized.
Test:
import com.big.data.avro.schema.Employee;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.DecoderException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConfluentSchemaServiceIT {
private static final String REGISTRYURL = "http://YOUR-CONFLUENT_REGISTRY-SERVICE:8081";
//OldRTProfile has department field
private static final String oldEmployeeSchemaString = "{\n" +
" \"name\": \"com.big.data.avro.schema.Employee\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"emp_id\",\n" +
" \"type\": \"int\",\n" +
" \"doc\": \"employee Id of the employee\"\n" +
" },\n" +
" {\n" +
" \"name\": \"emp_name\",\n" +
" \"type\": \"string\",\n" +
" \"doc\": \"employee name of the employee\"\n" +
" },\n" +
" {\n" +
" \"name\": \"emp_country\",\n" +
" \"type\": \"string\",\n" +
" \"doc\": \"country of residence\"\n" +
" },\n" +
" { \"name\": \"bonus\",\n" +
" \"type\": [\"null\", \"long\"],\n" +
" \"default\": null,\n" +
" \"doc\": \"bonus received on a yearly basis\"\n" +
" },\n" +
" {\n" +
" \"name\": \"subordinates\",\n" +
" \"type\": [\"null\", {\"type\": \"map\", \"values\": \"string\"}],\n" +
" \"default\": null,\n" +
" \"doc\": \"map of subordinates Name and Designation\"\n" +
" }\n" +
" ]\n" +
" }";
public static final String EMP_ID = "emp_id";
public static final String EMP_NAME = "emp_name";
public static final String EMP_COUNTRY = "emp_country";
public static final String COUNTRY = "NETHERLANDS";
private List departMentList;
private Map<String, String> subordinateMap;
private ConfluentSchemaService serde;
@Before
public void setUp() {
//Initialize the ConfluentSerDe
serde = new ConfluentSchemaService(REGISTRYURL);
departMentList = Arrays.asList("cat1", "cat2");
subordinateMap = new HashMap<>();
subordinateMap.put("maverick", "Junior SE");
}
// Write data with new Schema and read with New Schema
@Test
public void serDeAvroWriteNewAndReadNewSchemaTest() throws DecoderException {
Employee employee = Employee.newBuilder()
.setEmpId(1)
.setEmpCountry(COUNTRY)
.setDepartments(departMentList)
.setSubordinates(subordinateMap)
.build();
// add schemaVersion to SerializeAvroPOJO + encode to HEX
String hexProfileString = serde.
hexBytesToString(serde.serializeAvroPOJOToBytes(employee));
// Decode HexString to Bytes + remove the avro schema version and seserialize
Employee employeeNew = (Employee) serde.
deserializeBytesToAvroPOJO(serde.hexStringToBytes(hexProfileString));
Assert.assertEquals(new Long(1), employeeNew.getEmpId());
Assert.assertEquals(COUNTRY, employeeNew.getEmpCountry());
Assert.assertEquals(departMentList, employeeNew);
Assert.assertEquals(subordinateMap, employeeNew.getSubordinates());
}
// Write data with old Schema but read with New Schema
@Test
public void serDeAvroWriteOldAndReadNewSchemaTest() throws Exception {
// Old RTprofile Schema
Schema oldEmployeeSchema = new Schema.Parser().parse(oldEmployeeSchemaString);
// Get a Generic record from oldRtProfileSchema
GenericRecord oldEmployeeRecord = new GenericData.Record(oldEmployeeSchema);
oldEmployeeRecord.put(EMP_ID, 1L);
oldEmployeeRecord.put(EMP_NAME, "Mavrick");
oldEmployeeRecord.put(EMP_COUNTRY, COUNTRY);
oldEmployeeRecord.put("subordinates", subordinateMap);
// add schemaVersion to SerializeAvroPOJO + encode to HEX
String hexProfileString = serde.
hexBytesToString(serde.serializeAvroPOJOToBytes(oldEmployeeRecord));
// Decode HexString to Bytes + remove the avro schema version and seserialize
Employee employeeNew = (Employee) serde.
deserializeBytesToAvroPOJO(serde.hexStringToBytes(hexProfileString));
Assert.assertNotNull(employeeNew);
Assert.assertEquals(new Long(1), employeeNew.getEmpId());
Assert.assertEquals(COUNTRY, employeeNew.getEmpCountry());
Assert.assertTrue(employeeNew.getDepartments().isEmpty());
Assert.assertEquals(subordinateMap, employeeNew.getSubordinates());
}
}