Read Kafka From Flink

Read Kafka from Flink with Integration Test

Problem Statement :  Flinks needs to read data from Kafka and write to Hdfs.


Dependency :

 <!--Fink dependencies -->

    <!--Kafka Connector -->

    <!-- Embedded Kakfa + zookeper unit  Dependencies-->
    <!-- kafka Unit jar has been relocated to avoid kafka 
          version conflict as it used kafak_2_11 -->



Code :


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

 * Simple example on how to read with a Kafka consumer
 * Note that the Kafka source is expecting the following parameters to be set
 *  - "bootstrap.servers" (comma separated list of kafka brokers)
 *  - "zookeeper.connect" (comma separated list of zookeeper servers)
 *  - "" the id of the consumer group
 *  - "topic" the name of the topic to read data from.
 * You can pass these required parameters using "--bootstrap.servers host:port,host1:port1
 *                                            --zookeeper.connect host:port --topic testTopic"
 * This is a valid input example:
 * 		--topic test --bootstrap.servers localhost:9092 --zookeeper.connect 
 *                                                       localhost:2181 myGroup
 * Read from Kafka And write to HDFS.
 * Version info
public class ReadFromKafka {
    public static final String OUTPUT_PATH = "output.path";
    public static final String PARALLELISM = "parallelism";
    public static final String TOPIC_NAME  = "topic";

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Using the parser provided by Flink
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        //parameterTool.getProperties() returns back props with all key=value field set
        DataStream<String> messageStream =
               env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired(TOPIC_NAME), 
                                         new SimpleStringSchema(), parameterTool.getProperties()));

       //the rebelance call is causing a repartitioning of the data so that all machines
       //see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            public String map(String value) throws Exception {
                return value;
                      //Write to hdfs



Integration Test:



import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

public class ReadFromKafkaTest {

    public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class);

    private static final String LOCAL_FILEURI_PREFIX = "file://";
    private static final String NEW_LINE_DELIMETER = "\n";
    public static final String TOPIC = "TOPIC";
    private static KafkaUnitAdmin admin;
    private static String baseDir;
    private static String outputDir;

    public static KafkaUnit cluster = new KafkaUnit(1);
    // KakaUnit(1)  number of Broker in the cluster

    public class MyThread extends Thread {

        public void run() {
            System.out.println("MyThread running");
            //--topic test --bootstrap.servers localhost:9092 
            // --zookeeper.connect localhost:2181 myGroup
            String[] args = new String[]{
                   "--topic", TOPIC, 
                   "--bootstrap.servers", cluster.getConfig().getKafkaBrokerString(), 
                   "--zookeeper.connect", cluster.getConfig().getZkString(), 
                   "", "myGroup",
                   "--auto.offset.reset", "earliest",
                   "--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir, 
                   "--" + WordCount.PARALLELISM, "1" };
            try {
            } catch (Exception e) {
      "Execption occured while launching Flink Kafka consumer");


    public void startup() throws Exception {

        //  create topic in embedded Kafka Cluster
        admin = new KafkaUnitAdmin(cluster);
        admin.createTopic(TOPIC, 1, 1, new Properties());

        //Input Directory
        baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();

        outputDir = baseDir + "/output";

        // produce data in Embedded Kafka (Kafka Unit)

    public static void producerDatainKafka() {
        long events = 10;
        Properties props = new Properties();"Broker list is : " + cluster.getConfig().getKafkaBrokerString());

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            String ip = "192.168.2." + nEvents;
            String msg = nEvents + "," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);

    public void readFromKafkaStoreInHdfs() throws Exception {"OutPutDir is {} ", outputDir);

        // Starting Flink from a different Thread , 
                else it will block as it will keep waiting for messaged in Kafka
        MyThread flinkThread = new MyThread();

        // Sleeping for 10 seconds so that Flink can comsume message and write in outputDir

        StringBuilder fileToString = new StringBuilder();

        //Read the data from the outputfile folder, 
                    OutPutFIle Folder has multiple output files named as 1 2 3 4
        Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true);
        while (files.hasNext()) {
            fileToString.append(FileUtils.readFileToString(, "UTF-8"));


        Map<String, String> wordToCount = new HashMap<>();

        //4 lines in output file, with one word per line -> {
            String[] wordCount = e.split(",");
            wordToCount.put(wordCount[0], wordCount[1]);
  "Event number {}   => ip {}", wordCount[0], wordCount[1]);

        //10 meesaged to consume from kafka
        Assert.assertEquals(10, wordToCount.size());


    public void deleteTopic() {