Read from Kafka and Write To Aerospike from Flink

Read from Kafka And write to Aerospike  through flink.

Problem statement :  On a streaming basis data needs to be read from Kafka and Aerospike needs to be populated . If possible also write the data  into  HDFS.

 

Dependency:

<dependencies>
    <!--Fink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <!--Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <!-- Embedded Kakfa + zookeper unit  Dependencies-->
    <!-- kafka Unit jar has been relocated to avoid kafka version conflict as it used kafak_2_11 9.0.0.0 -->
    <dependency>
        <groupId>com.big.data.</groupId>
        <artifactId>kafka-unit-flink</artifactId>
        <version>${project.version}</version>
    </dependency>
    <dependency>
        <groupId>com.aerospike.unit</groupId>
        <artifactId>aerospike-unit</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
<dependencies>

[addToAppearHere]

Code:

 

package com.big.data.flink;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * Simple example on how to read with a Kafka consumer
 * <p>
 * Note that the Kafka source is expecting the following parameters to be set
 * - "bootstrap.servers" (comma separated list of kafka brokers)
 * - "zookeeper.connect" (comma separated list of zookeeper servers)
 * - "group.id" the id of the consumer group
 * - "topic" the name of the topic to read data from.
 * <p>
 * You can pass these required parameters using "--bootstrap.servers host:port,host1:port1 --zookeeper.connect host:port --topic testTopic"
 * <p>
 * This is a valid input example:
 * --topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup
 * <p>
 * Read from Kafka And write to HDFS.
 * <p>
 * Version info
 * https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
 */

public class ReadFromKafkaWriteToAerospike {

    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";
    public static final String TOPIC_NAME = "topic";

    //Aerospike related properties
    public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
    public static final String AEROSPIKE_PORT = "aerospike.port";
    public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
    public static final String AEROSPIKE_SETNAME = "aerospike.set.name";
    public static final String VALUE_BIN_NAME = "value.bin.name";

    // Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
    // No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transiet
    // In the map , for each record insert into Aerospike , this can be coverted into batch too
    public static class InsertIntoAerospike implements MapFunction<String, String> {

        // not making it static , as it will not be serialized and sent to executors
        private final String aerospikeHostName;
        private final int aerospikePortNo;
        private final String aerospikeNamespace;
        private final String aerospikeSetName;
        private final String valueColumnName;

        // The Aerospike client is not serializable and 
        // neither there is a need to instatiate on driver
        private transient AerospikeClient client;
        private transient WritePolicy policy;

        public InsertIntoAerospike(String hostName, int portNo, String nameSpace,
                                                    String setName, String valueColumnName) {
            this.aerospikeHostName = hostName;
            this.aerospikePortNo = portNo;
            this.aerospikeNamespace = nameSpace;
            this.aerospikeSetName = setName;
            this.valueColumnName = valueColumnName;

            //Add Shutdown hook to close the client gracefully
            //This is the place where u can gracefully clean your Service resources 
            //as there is no cleanup() function in Spark Map
            JVMShutdownHook jvmShutdownHook = new JVMShutdownHook();
            Runtime.getRuntime().addShutdownHook(jvmShutdownHook);

        }

        @Override
        public String map(String value) throws Exception {
            // Intitialize on the first call
            if (client == null) {
                policy = new WritePolicy();
                // how to close the client gracefully ?
                client = new AerospikeClient(aerospikeHostName, aerospikePortNo);
            }

            String[] keyValue = value.split(",");
            // As rows have schema with fieldName and Values being part of the Row
            Key key = new Key(aerospikeNamespace, aerospikeSetName, keyValue[0]);
            Bin bin = new Bin(valueColumnName, keyValue[1]);
            client.put(policy, key, bin);

            return value;
        }

        //When JVM is going down close the client
        private class JVMShutdownHook extends Thread {
            @Override
            public void run() {
                System.out.println("JVM Shutdown Hook: Thread initiated , 
                                                 shutting down service gracefully");
                IOUtils.closeQuietly(client);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String aerospikeHostname = parameterTool.getRequired(AEROSPIKE_HOSTNAME);
        int aerospikePort = Integer.parseInt(parameterTool.getRequired(AEROSPIKE_PORT));
        String namespace = parameterTool.getRequired(AEROSPIKE_NAMESPACE);
        String setName = parameterTool.getRequired(AEROSPIKE_SETNAME);

        String valueBinName = parameterTool.getRequired(VALUE_BIN_NAME);

        //parameterTool.getProperties() returns back props with all key=value field set
        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>
                   (parameterTool.getRequired(TOPIC_NAME), new SimpleStringSchema(), 
                                                             parameterTool.getProperties()));

        // the rebelance call is causing a repartitioning of the data so that all machines
        // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new InsertIntoAerospike(aerospikeHostname, aerospikePort, 
                                                               namespace, setName, valueBinName))
                     .setParallelism(parameterTool.getInt(PARALLELISM))
                     //Write to hdfs
                     .writeAsText(parameterTool.get(OUTPUT_PATH));

        env.execute();

    }

}

[addToAppearHere]

Integration Test:

package com.big.data.flink;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.unit.impls.AerospikeRunTimeConfig;
import com.aerospike.unit.impls.AerospikeSingleNodeCluster;
import com.aerospike.unit.utils.AerospikeUtils;
import com.big.data.kafka.unit.flink.KafkaUnit;
import com.big.data.kafka.unit.flink.KafkaUnitAdmin;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;


public class ReadFromKafkaWriteToAerospikeTest {

    public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class);

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    public static final String TOPIC = "TOPIC";
    public static final String BIN_NAME = "ip";
    private static KafkaUnitAdmin admin;

    private static String baseDir;
    private static String outputDir;

    private AerospikeSingleNodeCluster aerospikeCluster;
    private AerospikeRunTimeConfig runtimConfig;
    private AerospikeClient client;
    private WritePolicy policy;

    //Aerospike unit related Params
    private String memorySize = "64M";
    private String setName = "BIGTABLE";
    private String binName = "BIGBIN";
    private String nameSpace = "RandomNameSpace";

    @ClassRule
    public static KafkaUnit kafkaUnitCluster = new KafkaUnit(1);
    // KakaUnit(1)  number of Broker in the kafkaUnitCluster

    public class MyThread extends Thread {

        public void run() {
            System.out.println("MyThread running");
            //--topic test --bootstrap.servers localhost:9092
            // --zookeeper.connect localhost:2181 --group.id myGroup
            String[] args = new String[]{"--topic", TOPIC,
                    "--bootstrap.servers", kafkaUnitCluster.getConfig().getKafkaBrokerString(),
                    "--zookeeper.connect", kafkaUnitCluster.getConfig().getZkString(),
                    "--group.id", "myGroup",
                    "--auto.offset.reset", "earliest",
                    "--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
                    "--" + WordCount.PARALLELISM, "1",
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_NAMESPACE,
                                                            runtimConfig.getNameSpaceName(),
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_HOSTNAME, "127.0.0.1",
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_PORT, 
                                               String.valueOf(runtimConfig.getServicePort()),
                    "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_SETNAME, setName,
                    "--" + ReadFromKafkaWriteToAerospike.VALUE_BIN_NAME, BIN_NAME};

            try {
                ReadFromKafkaWriteToAerospike.main(args);
            } catch (Exception e) {
                log.info("Execption occured while launching Flink Kafka consumer");
            }

        }
    }

    @Before
    public void startup() throws Exception {

        //  create topic in embedded Kafka Cluster
        admin = new KafkaUnitAdmin(kafkaUnitCluster);
        admin.createTopic(TOPIC, 1, 1, new Properties());

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        //OutPutDirectory
        outputDir = baseDir + "/output";

        // Start Aerospike MiniCluster
        // Instatiate the cluster with NameSpaceName , memory Size.
        // One can use the default constructor and retieve nameSpace,
        // Memory info from cluster.getRunTimeConfiguration();
        aerospikeCluster = new AerospikeSingleNodeCluster(nameSpace, memorySize);
        aerospikeCluster.start();
        // Get the runTime configuration of the cluster
        runtimConfig = aerospikeCluster.getRunTimeConfiguration();
        client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort());
        AerospikeUtils.printNodes(client);
        policy = new WritePolicy();

        // produce data in Embedded Kafka (Kafka Unit)
        producerDatainKafka();
    }

    public static void producerDatainKafka() {
        long events = 10;
        Properties props = new Properties();
        log.info("Broker list is : " + kafkaUnitCluster.getConfig().getKafkaBrokerString());

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                                         kafkaUnitCluster.getConfig().getKafkaBrokerString());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                          "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                            "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            String ip = "192.168.2." + nEvents;
            String msg = nEvents + "," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);
            producer.send(data);
            producer.flush();
        }
        producer.close();
    }

    private String getValueFromAerospike(String key) {
        //Fetch both the keys from Aerospike
        Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, key);
        Record result1 = client.get(policy, key1);
        return result1.getValue(BIN_NAME).toString();
    }

    @Test
    public void ReadFromKafkaWriteToAerospike() throws Exception {

        log.info("OutPutDir is {} ", outputDir);

        // Starting Flink from a different Thread , 
        // else it will block as it will keep waiting for messaged in Kafka
        MyThread flinkThread = new MyThread();
        flinkThread.start();

        // Sleeping for 10 seconds so that Flink can comsume message and write in outputDir
        Thread.sleep(10000);

        StringBuilder fileToString = new StringBuilder();

        //Read the data from the outputfile folder , 
        //OutPutFIle Folder has multiple output files named as 1 2 3 4
        Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true);
        while (files.hasNext()) {
            fileToString.append(FileUtils.readFileToString(files.next(), "UTF-8"));

        }

        Map<String, String> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.toString().split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], wordCount[1]);
            log.info("Event number {}   => ip {}", wordCount[0], wordCount[1]);
        });

        //10 meesaged to consume from kafka
        Assert.assertEquals(10, wordToCount.size());
        Assert.assertTrue(wordToCount.containsKey("0"));
        Assert.assertTrue(wordToCount.get("1").equals("192.168.2.1"));
        Assert.assertTrue(wordToCount.get("0").equals("192.168.2.0"));
        Assert.assertTrue(wordToCount.get("2").equals("192.168.2.2"));
        Assert.assertTrue(wordToCount.get("3").equals("192.168.2.3"));
        Assert.assertTrue(wordToCount.get("4").equals("192.168.2.4"));
        Assert.assertTrue(wordToCount.get("5").equals("192.168.2.5"));

        //Asserts from Aerospike
        Assert.assertEquals("192.168.2.0", getValueFromAerospike("0"));
        Assert.assertEquals("192.168.2.1", getValueFromAerospike("1"));
        Assert.assertEquals("192.168.2.2", getValueFromAerospike("2"));
        Assert.assertEquals("192.168.2.3", getValueFromAerospike("3"));
        Assert.assertEquals("192.168.2.4", getValueFromAerospike("4"));
        Assert.assertEquals("192.168.2.5", getValueFromAerospike("5"));

    }

    @After
    public void deleteTopic() {
        admin.deleteTopic(TOPIC);
    }

}

One Comments

  • rahul

    March 3, 2017

    Good

Comments are closed.