Read Kafka From Flink
Read Kafka from Flink with Integration Test
Problem Statement : Flinks needs to read data from Kafka and write to Hdfs.
<!--Fink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<!--Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<!-- Embedded Kakfa + zookeper unit Dependencies-->
<!-- kafka Unit jar has been relocated to avoid kafka
version conflict as it used kafak_2_11 9.0.0.0 -->
<dependency>
<groupId>com.big.data.</groupId>
<artifactId>kafka-unit-flink</artifactId>
<version>${project.version}</version>
</dependency>
[addToAppearHere]
Code :
package com.big.data.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
* Simple example on how to read with a Kafka consumer
*
* Note that the Kafka source is expecting the following parameters to be set
* - "bootstrap.servers" (comma separated list of kafka brokers)
* - "zookeeper.connect" (comma separated list of zookeeper servers)
* - "group.id" the id of the consumer group
* - "topic" the name of the topic to read data from.
*
* You can pass these required parameters using "--bootstrap.servers host:port,host1:port1
* --zookeeper.connect host:port --topic testTopic"
*
* This is a valid input example:
* --topic test --bootstrap.servers localhost:9092 --zookeeper.connect
* localhost:2181 --group.id myGroup
*
* Read from Kafka And write to HDFS.
*
* Version info
* https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
*/
public class ReadFromKafka {
public static final String OUTPUT_PATH = "output.path";
public static final String PARALLELISM = "parallelism";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Using the parser provided by Flink
ParameterTool parameterTool = ParameterTool.fromArgs(args);
//parameterTool.getProperties() returns back props with all key=value field set
DataStream<String> messageStream =
env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired(TOPIC_NAME),
new SimpleStringSchema(), parameterTool.getProperties()));
//the rebelance call is causing a repartitioning of the data so that all machines
//see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return value;
}
})
.setParallelism(parameterTool.getInt(PARALLELISM))
//Write to hdfs
.writeAsText(parameterTool.get(OUTPUT_PATH));
env.execute();
}
}
[addToAppearHere]
package com.big.data.flink;
import com.big.data.kafka.unit.flink.KafkaUnitAdmin;
import com.big.data.kafka.unit.flink.KafkaUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
public class ReadFromKafkaTest {
public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class);
private static final String LOCAL_FILEURI_PREFIX = "file://";
private static final String NEW_LINE_DELIMETER = "\n";
public static final String TOPIC = "TOPIC";
private static KafkaUnitAdmin admin;
private static String baseDir;
private static String outputDir;
@ClassRule
public static KafkaUnit cluster = new KafkaUnit(1);
// KakaUnit(1) number of Broker in the cluster
public class MyThread extends Thread {
public void run() {
System.out.println("MyThread running");
//--topic test --bootstrap.servers localhost:9092
// --zookeeper.connect localhost:2181 --group.id myGroup
String[] args = new String[]{
"--topic", TOPIC,
"--bootstrap.servers", cluster.getConfig().getKafkaBrokerString(),
"--zookeeper.connect", cluster.getConfig().getZkString(),
"--group.id", "myGroup",
"--auto.offset.reset", "earliest",
"--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir,
"--" + WordCount.PARALLELISM, "1" };
try {
ReadFromKafka.main(args);
} catch (Exception e) {
log.info("Execption occured while launching Flink Kafka consumer");
}
}
}
@Before
public void startup() throws Exception {
// create topic in embedded Kafka Cluster
admin = new KafkaUnitAdmin(cluster);
admin.createTopic(TOPIC, 1, 1, new Properties());
//Input Directory
baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString();
//OutPutDirectory
outputDir = baseDir + "/output";
// produce data in Embedded Kafka (Kafka Unit)
producerDatainKafka();
}
public static void producerDatainKafka() {
long events = 10;
Properties props = new Properties();
log.info("Broker list is : " + cluster.getConfig().getKafkaBrokerString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.getConfig().getKafkaBrokerString());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (long nEvents = 0; nEvents < events; nEvents++) {
String ip = "192.168.2." + nEvents;
String msg = nEvents + "," + ip;
ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);
producer.send(data);
producer.flush();
}
producer.close();
}
@Test
public void readFromKafkaStoreInHdfs() throws Exception {
log.info("OutPutDir is {} ", outputDir);
// Starting Flink from a different Thread ,
else it will block as it will keep waiting for messaged in Kafka
MyThread flinkThread = new MyThread();
flinkThread.start();
// Sleeping for 10 seconds so that Flink can comsume message and write in outputDir
Thread.sleep(10000);
StringBuilder fileToString = new StringBuilder();
//Read the data from the outputfile folder,
OutPutFIle Folder has multiple output files named as 1 2 3 4
Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true);
while (files.hasNext()) {
fileToString.append(FileUtils.readFileToString(files.next(), "UTF-8"));
}
Map<String, String> wordToCount = new HashMap<>();
//4 lines in output file, with one word per line
Arrays.stream(fileToString.toString().split(NEW_LINE_DELIMETER)).forEach(e -> {
String[] wordCount = e.split(",");
wordToCount.put(wordCount[0], wordCount[1]);
log.info("Event number {} => ip {}", wordCount[0], wordCount[1]);
});
//10 meesaged to consume from kafka
Assert.assertEquals(10, wordToCount.size());
Assert.assertTrue(wordToCount.containsKey("0"));
Assert.assertTrue(wordToCount.get("1").equals("192.168.2.1"));
Assert.assertTrue(wordToCount.get("0").equals("192.168.2.0"));
Assert.assertTrue(wordToCount.get("2").equals("192.168.2.2"));
Assert.assertTrue(wordToCount.get("3").equals("192.168.2.3"));
Assert.assertTrue(wordToCount.get("4").equals("192.168.2.4"));
Assert.assertTrue(wordToCount.get("5").equals("192.168.2.5"));
}
@After
public void deleteTopic() {
admin.deleteTopic(TOPIC);
}
}