Read from Kafka and Write To Aerospike from Flink

Read from Kafka And write to Aerospike  through flink.

Problem statement :  On a streaming basis data needs to be read from Kafka and Aerospike needs to be populated . If possible also write the data  into  HDFS.



    <!--Fink dependencies -->

    <!--Kafka Connector -->

    <!-- Embedded Kakfa + zookeper unit  Dependencies-->
    <!-- kafka Unit jar has been relocated to avoid kafka version conflict as it used kafak_2_11 -->




import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

 * Simple example on how to read with a Kafka consumer
 * <p>
 * Note that the Kafka source is expecting the following parameters to be set
 * - "bootstrap.servers" (comma separated list of kafka brokers)
 * - "zookeeper.connect" (comma separated list of zookeeper servers)
 * - "" the id of the consumer group
 * - "topic" the name of the topic to read data from.
 * <p>
 * You can pass these required parameters using "--bootstrap.servers host:port,host1:port1 --zookeeper.connect host:port --topic testTopic"
 * <p>
 * This is a valid input example:
 * --topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 myGroup
 * <p>
 * Read from Kafka And write to HDFS.
 * <p>
 * Version info

public class ReadFromKafkaWriteToAerospike {

    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";
    public static final String TOPIC_NAME = "topic";

    //Aerospike related properties
    public static final String AEROSPIKE_HOSTNAME = "";
    public static final String AEROSPIKE_PORT = "aerospike.port";
    public static final String AEROSPIKE_NAMESPACE = "";
    public static final String AEROSPIKE_SETNAME = "";
    public static final String VALUE_BIN_NAME = "";

    // Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
    // No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transiet
    // In the map , for each record insert into Aerospike , this can be coverted into batch too
    public static class InsertIntoAerospike implements MapFunction<String, String> {

        // not making it static , as it will not be serialized and sent to executors
        private final String aerospikeHostName;
        private final int aerospikePortNo;
        private final String aerospikeNamespace;
        private final String aerospikeSetName;
        private final String valueColumnName;

        // The Aerospike client is not serializable and 
        // neither there is a need to instatiate on driver
        private transient AerospikeClient client;
        private transient WritePolicy policy;

        public InsertIntoAerospike(String hostName, int portNo, String nameSpace,
                                                    String setName, String valueColumnName) {
            this.aerospikeHostName = hostName;
            this.aerospikePortNo = portNo;
            this.aerospikeNamespace = nameSpace;
            this.aerospikeSetName = setName;
            this.valueColumnName = valueColumnName;

            //Add Shutdown hook to close the client gracefully
            //This is the place where u can gracefully clean your Service resources 
            //as there is no cleanup() function in Spark Map
            JVMShutdownHook jvmShutdownHook = new JVMShutdownHook();


        public String map(String value) throws Exception {
            // Intitialize on the first call
            if (client == null) {
                policy = new WritePolicy();
                // how to close the client gracefully ?
                client = new AerospikeClient(aerospikeHostName, aerospikePortNo);

            String[] keyValue = value.split(",");
            // As rows have schema with fieldName and Values being part of the Row
            Key key = new Key(aerospikeNamespace, aerospikeSetName, keyValue[0]);
            Bin bin = new Bin(valueColumnName, keyValue[1]);
            client.put(policy, key, bin);

            return value;

        //When JVM is going down close the client
        private class JVMShutdownHook extends Thread {
            public void run() {
                System.out.println("JVM Shutdown Hook: Thread initiated , 
                                                 shutting down service gracefully");

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String aerospikeHostname = parameterTool.getRequired(AEROSPIKE_HOSTNAME);
        int aerospikePort = Integer.parseInt(parameterTool.getRequired(AEROSPIKE_PORT));
        String namespace = parameterTool.getRequired(AEROSPIKE_NAMESPACE);
        String setName = parameterTool.getRequired(AEROSPIKE_SETNAME);

        String valueBinName = parameterTool.getRequired(VALUE_BIN_NAME);

        //parameterTool.getProperties() returns back props with all key=value field set
        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>
                   (parameterTool.getRequired(TOPIC_NAME), new SimpleStringSchema(), 

        // the rebelance call is causing a repartitioning of the data so that all machines
        // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new InsertIntoAerospike(aerospikeHostname, aerospikePort, 
                                                               namespace, setName, valueBinName))
                     //Write to hdfs




Integration Test:


import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

public class ReadFromKafkaWriteToAerospikeTest {

    public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class);

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    public static final String TOPIC = "TOPIC";
    public static final String BIN_NAME = "ip";
    private static KafkaUnitAdmin admin;

    private static String baseDir;
    private static String outputDir;

    private AerospikeSingleNodeCluster aerospikeCluster;
    private AerospikeRunTimeConfig runtimConfig;
    private AerospikeClient client;
    private WritePolicy policy;

    //Aerospike unit related Params
    private String memorySize = "64M";
    private String setName = "BIGTABLE";
    private String binName = "BIGBIN";
    private String nameSpace = "RandomNameSpace";

    public static KafkaUnit kafkaUnitCluster = new KafkaUnit(1);
    // KakaUnit(1)  number of Broker in the kafkaUnitCluster

    public class MyThread extends Thread {

        public void run() {
            System.out.println("MyThread running");
            //--topic test --bootstrap.servers localhost:9092
            // --zookeeper.connect localhost:2181 myGroup
            String[] args = new String[]{"--topic", TOPIC,
                    "--bootstrap.servers", kafkaUnitCluster.getConfig().getKafkaBrokerString(),
                    "--zookeeper.connect", kafkaUnitCluster.getConfig().getZkString(),
                    "", "myGroup",
                    "--auto.offset.reset", "earliest",
                    "--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
                    "--" + WordCount.PARALLELISM, "1",
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_NAMESPACE,
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_HOSTNAME, "",
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_PORT, 
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_SETNAME, setName,
                    "--" + ReadFromKafkaWriteToAerospike.VALUE_BIN_NAME, BIN_NAME};

            try {
            } catch (Exception e) {
      "Execption occured while launching Flink Kafka consumer");


    public void startup() throws Exception {

        //  create topic in embedded Kafka Cluster
        admin = new KafkaUnitAdmin(kafkaUnitCluster);
        admin.createTopic(TOPIC, 1, 1, new Properties());

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        outputDir = baseDir + "/output";

        // Start Aerospike MiniCluster
        // Instatiate the cluster with NameSpaceName , memory Size.
        // One can use the default constructor and retieve nameSpace,
        // Memory info from cluster.getRunTimeConfiguration();
        aerospikeCluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
        // Get the runTime configuration of the cluster
        runtimConfig = aerospikeCluster.getRunTimeConfiguration();
        client = new AerospikeClient("", runtimConfig.getServicePort());
        policy = new WritePolicy();

        // produce data in Embedded Kafka (Kafka Unit)

    public static void producerDatainKafka() {
        long events = 10;
        Properties props = new Properties();"Broker list is : " + kafkaUnitCluster.getConfig().getKafkaBrokerString());

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            String ip = "192.168.2." + nEvents;
            String msg = nEvents + "," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);

    private String getValueFromAerospike(String key) {
        //Fetch both the keys from Aerospike
        Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, key);
        Record result1 = client.get(policy, key1);
        return result1.getValue(BIN_NAME).toString();

    public void ReadFromKafkaWriteToAerospike() throws Exception {"OutPutDir is {} ", outputDir);

        // Starting Flink from a different Thread , 
        // else it will block as it will keep waiting for messaged in Kafka
        MyThread flinkThread = new MyThread();

        // Sleeping for 10 seconds so that Flink can comsume message and write in outputDir

        StringBuilder fileToString = new StringBuilder();

        //Read the data from the outputfile folder , 
        //OutPutFIle Folder has multiple output files named as 1 2 3 4
        Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true);
        while (files.hasNext()) {
            fileToString.append(FileUtils.readFileToString(, "UTF-8"));


        Map<String, String> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], wordCount[1]);
  "Event number {}   => ip {}", wordCount[0], wordCount[1]);

        //10 meesaged to consume from kafka
        Assert.assertEquals(10, wordToCount.size());

        //Asserts from Aerospike
        Assert.assertEquals("", getValueFromAerospike("0"));
        Assert.assertEquals("", getValueFromAerospike("1"));
        Assert.assertEquals("", getValueFromAerospike("2"));
        Assert.assertEquals("", getValueFromAerospike("3"));
        Assert.assertEquals("", getValueFromAerospike("4"));
        Assert.assertEquals("", getValueFromAerospike("5"));


    public void deleteTopic() {