Read Kafka From Flink

Read Kafka from Flink with Integration Test

Problem Statement :  Flinks needs to read data from Kafka and write to Hdfs.

 

Dependency :

 <!--Fink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <!--Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.2.0</version>
    </dependency>

    <!-- Embedded Kakfa + zookeper unit  Dependencies-->
    <!-- kafka Unit jar has been relocated to avoid kafka 
          version conflict as it used kafak_2_11 9.0.0.0 -->
    <dependency>
        <groupId>com.big.data.</groupId>
        <artifactId>kafka-unit-flink</artifactId>
        <version>${project.version}</version>
    </dependency>

 

[addToAppearHere]

 

Code :

package com.big.data.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * Simple example on how to read with a Kafka consumer
 *
 * Note that the Kafka source is expecting the following parameters to be set
 *  - "bootstrap.servers" (comma separated list of kafka brokers)
 *  - "zookeeper.connect" (comma separated list of zookeeper servers)
 *  - "group.id" the id of the consumer group
 *  - "topic" the name of the topic to read data from.
 *
 * You can pass these required parameters using "--bootstrap.servers host:port,host1:port1
 *                                            --zookeeper.connect host:port --topic testTopic"
 *
 * This is a valid input example:
 * 		--topic test --bootstrap.servers localhost:9092 --zookeeper.connect 
 *                                                       localhost:2181 --group.id myGroup
 *
 * Read from Kafka And write to HDFS.
 *
 * Version info
 * https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
 */
public class ReadFromKafka {
    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";
    public static final String TOPIC_NAME  = "topic";



    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        //parameterTool.getProperties() returns back props with all key=value field set
        DataStream<String> messageStream =
               env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired(TOPIC_NAME), 
                                         new SimpleStringSchema(), parameterTool.getProperties()));

       //the rebelance call is causing a repartitioning of the data so that all machines
       //see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return value;
            }
        })
                     .setParallelism(parameterTool.getInt(PARALLELISM))
                      //Write to hdfs
                     .writeAsText(parameterTool.get(OUTPUT_PATH));

        env.execute();

    }
}

[addToAppearHere]

Integration Test:

 

package com.big.data.flink;

import com.big.data.kafka.unit.flink.KafkaUnitAdmin;
import com.big.data.kafka.unit.flink.KafkaUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;


public class ReadFromKafkaTest {

    public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class);

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    public static final String TOPIC = "TOPIC";
    private static KafkaUnitAdmin admin;
    private static String baseDir;
    private static String outputDir;

    @ClassRule
    public static KafkaUnit cluster = new KafkaUnit(1);
    // KakaUnit(1)  number of Broker in the cluster

    public class MyThread extends Thread {

        public void run() {
            System.out.println("MyThread running");
            //--topic test --bootstrap.servers localhost:9092 
            // --zookeeper.connect localhost:2181 --group.id myGroup
            String[] args = new String[]{
                   "--topic", TOPIC, 
                   "--bootstrap.servers", cluster.getConfig().getKafkaBrokerString(), 
                   "--zookeeper.connect", cluster.getConfig().getZkString(), 
                   "--group.id", "myGroup",
                   "--auto.offset.reset", "earliest",
                   "--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir, 
                   "--" + WordCount.PARALLELISM, "1" };
            try {
                ReadFromKafka.main(args);
            } catch (Exception e) {
                log.info("Execption occured while launching Flink Kafka consumer");
            }

        }
    }

    @Before
    public void startup() throws Exception {

        //  create topic in embedded Kafka Cluster
        admin = new KafkaUnitAdmin(cluster);
        admin.createTopic(TOPIC, 1, 1, new Properties());

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        //OutPutDirectory
        outputDir = baseDir + "/output";

        // produce data in Embedded Kafka (Kafka Unit)
        producerDatainKafka();
    }

    public static void producerDatainKafka() {
        long events = 10;
        Properties props = new Properties();
        log.info("Broker list is : " + cluster.getConfig().getKafkaBrokerString());

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                                    cluster.getConfig().getKafkaBrokerString());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                     "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                     "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            String ip = "192.168.2." + nEvents;
            String msg = nEvents + "," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);
            producer.send(data);
            producer.flush();
        }
        producer.close();
    }

    @Test
    public void readFromKafkaStoreInHdfs() throws Exception {

        log.info("OutPutDir is {} ", outputDir);

        // Starting Flink from a different Thread , 
                else it will block as it will keep waiting for messaged in Kafka
        MyThread flinkThread = new MyThread();
        flinkThread.start();

        // Sleeping for 10 seconds so that Flink can comsume message and write in outputDir
        Thread.sleep(10000);

        StringBuilder fileToString = new StringBuilder();

        //Read the data from the outputfile folder, 
                    OutPutFIle Folder has multiple output files named as 1 2 3 4
        Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true);
        while (files.hasNext()) {
            fileToString.append(FileUtils.readFileToString(files.next(), "UTF-8"));

        }

        Map<String, String> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line
        Arrays.stream(fileToString.toString().split(NEW_LINE_DELIMETER)).forEach(e -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], wordCount[1]);
            log.info("Event number {}   => ip {}", wordCount[0], wordCount[1]);
        });

        //10 meesaged to consume from kafka
        Assert.assertEquals(10, wordToCount.size());
        Assert.assertTrue(wordToCount.containsKey("0"));
        Assert.assertTrue(wordToCount.get("1").equals("192.168.2.1"));
        Assert.assertTrue(wordToCount.get("0").equals("192.168.2.0"));
        Assert.assertTrue(wordToCount.get("2").equals("192.168.2.2"));
        Assert.assertTrue(wordToCount.get("3").equals("192.168.2.3"));
        Assert.assertTrue(wordToCount.get("4").equals("192.168.2.4"));
        Assert.assertTrue(wordToCount.get("5").equals("192.168.2.5"));

    }

    @After
    public void deleteTopic() {
        admin.deleteTopic(TOPIC);
    }

}