Read from Kafka and Write To Aerospike from Flink
Read from Kafka And write to Aerospike through flink.
Problem statement : On a streaming basis data needs to be read from Kafka and Aerospike needs to be populated . If possible also write the data into HDFS.
<dependencies> <!--Fink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.2.0</version> </dependency> <!--Kafka Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.2.0</version> </dependency> <!-- Embedded Kakfa + zookeper unit Dependencies--> <!-- kafka Unit jar has been relocated to avoid kafka version conflict as it used kafak_2_11 9.0.0.0 --> <dependency> <groupId>com.big.data.</groupId> <artifactId>kafka-unit-flink</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>com.aerospike.unit</groupId> <artifactId>aerospike-unit</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> <dependencies>
[addToAppearHere]
package com.big.data.flink;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
* Simple example on how to read with a Kafka consumer
* <p>
* Note that the Kafka source is expecting the following parameters to be set
* - "bootstrap.servers" (comma separated list of kafka brokers)
* - "zookeeper.connect" (comma separated list of zookeeper servers)
* - "group.id" the id of the consumer group
* - "topic" the name of the topic to read data from.
* <p>
* You can pass these required parameters using "--bootstrap.servers host:port,host1:port1 --zookeeper.connect host:port --topic testTopic"
* <p>
* This is a valid input example:
* --topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup
* <p>
* Read from Kafka And write to HDFS.
* <p>
* Version info
* https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
*/
public class ReadFromKafkaWriteToAerospike {
public static final String OUTPUT_PATH = "output.path";
public static final String PARALLELISM = "parallelism";
public static final String TOPIC_NAME = "topic";
//Aerospike related properties
public static final String AEROSPIKE_HOSTNAME = "aerospike.host.name";
public static final String AEROSPIKE_PORT = "aerospike.port";
public static final String AEROSPIKE_NAMESPACE = "aerospike.name.space";
public static final String AEROSPIKE_SETNAME = "aerospike.set.name";
public static final String VALUE_BIN_NAME = "value.bin.name";
// Do remember all the lambda function are instantiated on driver, serialized and sent to driver.
// No need to initialize the Service(Aerospike , Hbase on driver ) hence making it transiet
// In the map , for each record insert into Aerospike , this can be coverted into batch too
public static class InsertIntoAerospike implements MapFunction<String, String> {
// not making it static , as it will not be serialized and sent to executors
private final String aerospikeHostName;
private final int aerospikePortNo;
private final String aerospikeNamespace;
private final String aerospikeSetName;
private final String valueColumnName;
// The Aerospike client is not serializable and
// neither there is a need to instatiate on driver
private transient AerospikeClient client;
private transient WritePolicy policy;
public InsertIntoAerospike(String hostName, int portNo, String nameSpace,
String setName, String valueColumnName) {
this.aerospikeHostName = hostName;
this.aerospikePortNo = portNo;
this.aerospikeNamespace = nameSpace;
this.aerospikeSetName = setName;
this.valueColumnName = valueColumnName;
//Add Shutdown hook to close the client gracefully
//This is the place where u can gracefully clean your Service resources
//as there is no cleanup() function in Spark Map
JVMShutdownHook jvmShutdownHook = new JVMShutdownHook();
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}
@Override
public String map(String value) throws Exception {
// Intitialize on the first call
if (client == null) {
policy = new WritePolicy();
// how to close the client gracefully ?
client = new AerospikeClient(aerospikeHostName, aerospikePortNo);
}
String[] keyValue = value.split(",");
// As rows have schema with fieldName and Values being part of the Row
Key key = new Key(aerospikeNamespace, aerospikeSetName, keyValue[0]);
Bin bin = new Bin(valueColumnName, keyValue[1]);
client.put(policy, key, bin);
return value;
}
//When JVM is going down close the client
private class JVMShutdownHook extends Thread {
@Override
public void run() {
System.out.println("JVM Shutdown Hook: Thread initiated ,
shutting down service gracefully");
IOUtils.closeQuietly(client);
}
}
}
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Using the parser provided by Flink
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String aerospikeHostname = parameterTool.getRequired(AEROSPIKE_HOSTNAME);
int aerospikePort = Integer.parseInt(parameterTool.getRequired(AEROSPIKE_PORT));
String namespace = parameterTool.getRequired(AEROSPIKE_NAMESPACE);
String setName = parameterTool.getRequired(AEROSPIKE_SETNAME);
String valueBinName = parameterTool.getRequired(VALUE_BIN_NAME);
//parameterTool.getProperties() returns back props with all key=value field set
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>
(parameterTool.getRequired(TOPIC_NAME), new SimpleStringSchema(),
parameterTool.getProperties()));
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new InsertIntoAerospike(aerospikeHostname, aerospikePort,
namespace, setName, valueBinName))
.setParallelism(parameterTool.getInt(PARALLELISM))
//Write to hdfs
.writeAsText(parameterTool.get(OUTPUT_PATH));
env.execute();
}
}
[addToAppearHere]
package com.big.data.flink; import com.aerospike.client.AerospikeClient; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.policy.WritePolicy; import com.aerospike.unit.impls.AerospikeRunTimeConfig; import com.aerospike.unit.impls.AerospikeSingleNodeCluster; import com.aerospike.unit.utils.AerospikeUtils; import com.big.data.kafka.unit.flink.KafkaUnit; import com.big.data.kafka.unit.flink.KafkaUnitAdmin; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.UUID; public class ReadFromKafkaWriteToAerospikeTest { public static final Logger log = LoggerFactory.getLogger(ReadFromKafkaTest.class); private static final String LOCAL_FILEURI_PREFIX = "file://"; private static final String NEW_LINE_DELIMETER = "\n"; public static final String TOPIC = "TOPIC"; public static final String BIN_NAME = "ip"; private static KafkaUnitAdmin admin; private static String baseDir; private static String outputDir; private AerospikeSingleNodeCluster aerospikeCluster; private AerospikeRunTimeConfig runtimConfig; private AerospikeClient client; private WritePolicy policy; //Aerospike unit related Params private String memorySize = "64M"; private String setName = "BIGTABLE"; private String binName = "BIGBIN"; private String nameSpace = "RandomNameSpace"; @ClassRule public static KafkaUnit kafkaUnitCluster = new KafkaUnit(1); // KakaUnit(1) number of Broker in the kafkaUnitCluster public class MyThread extends Thread { public void run() { System.out.println("MyThread running"); //--topic test --bootstrap.servers localhost:9092 // --zookeeper.connect localhost:2181 --group.id myGroup String[] args = new String[]{"--topic", TOPIC, "--bootstrap.servers", kafkaUnitCluster.getConfig().getKafkaBrokerString(), "--zookeeper.connect", kafkaUnitCluster.getConfig().getZkString(), "--group.id", "myGroup", "--auto.offset.reset", "earliest", "--" + ReadFromKafka.OUTPUT_PATH, LOCAL_FILEURI_PREFIX + outputDir, "--" + WordCount.PARALLELISM, "1", "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_NAMESPACE, runtimConfig.getNameSpaceName(), "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_HOSTNAME, "127.0.0.1", "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_PORT, String.valueOf(runtimConfig.getServicePort()), "--" + ReadFromKafkaWriteToAerospike.AEROSPIKE_SETNAME, setName, "--" + ReadFromKafkaWriteToAerospike.VALUE_BIN_NAME, BIN_NAME}; try { ReadFromKafkaWriteToAerospike.main(args); } catch (Exception e) { log.info("Execption occured while launching Flink Kafka consumer"); } } } @Before public void startup() throws Exception { // create topic in embedded Kafka Cluster admin = new KafkaUnitAdmin(kafkaUnitCluster); admin.createTopic(TOPIC, 1, 1, new Properties()); //Input Directory baseDir = "/tmp/mapreduce/wordcount/" + UUID.randomUUID().toString(); //OutPutDirectory outputDir = baseDir + "/output"; // Start Aerospike MiniCluster // Instatiate the cluster with NameSpaceName , memory Size. // One can use the default constructor and retieve nameSpace, // Memory info from cluster.getRunTimeConfiguration(); aerospikeCluster = new AerospikeSingleNodeCluster(nameSpace, memorySize); aerospikeCluster.start(); // Get the runTime configuration of the cluster runtimConfig = aerospikeCluster.getRunTimeConfiguration(); client = new AerospikeClient("127.0.0.1", runtimConfig.getServicePort()); AerospikeUtils.printNodes(client); policy = new WritePolicy(); // produce data in Embedded Kafka (Kafka Unit) producerDatainKafka(); } public static void producerDatainKafka() { long events = 10; Properties props = new Properties(); log.info("Broker list is : " + kafkaUnitCluster.getConfig().getKafkaBrokerString()); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUnitCluster.getConfig().getKafkaBrokerString()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 2); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (long nEvents = 0; nEvents < events; nEvents++) { String ip = "192.168.2." + nEvents; String msg = nEvents + "," + ip; ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg); producer.send(data); producer.flush(); } producer.close(); } private String getValueFromAerospike(String key) { //Fetch both the keys from Aerospike Key key1 = new Key(runtimConfig.getNameSpaceName(), setName, key); Record result1 = client.get(policy, key1); return result1.getValue(BIN_NAME).toString(); } @Test public void ReadFromKafkaWriteToAerospike() throws Exception { log.info("OutPutDir is {} ", outputDir); // Starting Flink from a different Thread , // else it will block as it will keep waiting for messaged in Kafka MyThread flinkThread = new MyThread(); flinkThread.start(); // Sleeping for 10 seconds so that Flink can comsume message and write in outputDir Thread.sleep(10000); StringBuilder fileToString = new StringBuilder(); //Read the data from the outputfile folder , //OutPutFIle Folder has multiple output files named as 1 2 3 4 Iterator<File> files = FileUtils.iterateFiles(new File(outputDir), null, true); while (files.hasNext()) { fileToString.append(FileUtils.readFileToString(files.next(), "UTF-8")); } Map<String, String> wordToCount = new HashMap<>(); //4 lines in output file, with one word per line Arrays.stream(fileToString.toString().split(NEW_LINE_DELIMETER)).forEach(e -> { String[] wordCount = e.split(","); wordToCount.put(wordCount[0], wordCount[1]); log.info("Event number {} => ip {}", wordCount[0], wordCount[1]); }); //10 meesaged to consume from kafka Assert.assertEquals(10, wordToCount.size()); Assert.assertTrue(wordToCount.containsKey("0")); Assert.assertTrue(wordToCount.get("1").equals("192.168.2.1")); Assert.assertTrue(wordToCount.get("0").equals("192.168.2.0")); Assert.assertTrue(wordToCount.get("2").equals("192.168.2.2")); Assert.assertTrue(wordToCount.get("3").equals("192.168.2.3")); Assert.assertTrue(wordToCount.get("4").equals("192.168.2.4")); Assert.assertTrue(wordToCount.get("5").equals("192.168.2.5")); //Asserts from Aerospike Assert.assertEquals("192.168.2.0", getValueFromAerospike("0")); Assert.assertEquals("192.168.2.1", getValueFromAerospike("1")); Assert.assertEquals("192.168.2.2", getValueFromAerospike("2")); Assert.assertEquals("192.168.2.3", getValueFromAerospike("3")); Assert.assertEquals("192.168.2.4", getValueFromAerospike("4")); Assert.assertEquals("192.168.2.5", getValueFromAerospike("5")); } @After public void deleteTopic() { admin.deleteTopic(TOPIC); } }
rahul
March 3, 2017Good