Kafka Unit framework
Kafka unit test Framework to help in writing Integration Test .
<dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency> </dependencies> [addToAppearHere]
Kafka unit is broadly divided in 5 different components
Component | Configuration | Usage | Composed within |
EmbeddedZookeeper | EmbeddedZookeeperConfig | Embedded Zookeeper | KafkaUnit |
KafkaBroker | BrokerConfig | Embedded Kafka | KafkaUnit |
KafkaUnit | kafkaUnitConfig | Kafka Unit Famework to spawn Brokers and zookeper | No |
KafkaUnitAdmin | No | Admin class to create and delete topic | No |
FreeRandomPort | No | get random port to spawn zookeper and Brokers | No |
Code :
EmbeddedZookeeper KafkaBroker KafkaUnit KafkaUnitAdmin FreeRandomPort
package com.big.data.kafka.unit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
public class KafkaUnitTest {
public static final Logger log = LoggerFactory.getLogger(KafkaUnitTest.class);
public static final String TOPIC = "TOPIC";
private KafkaUnitAdmin admin;
@ClassRule
public static KafkaUnit cluster = new KafkaUnit(1);
// KakaUnit(1) number of Broker in the cluster
@Before
public void testKafkaUnit() throws Throwable {
admin = new KafkaUnitAdmin(cluster);
admin.createTopic(TOPIC, 1, 1, new Properties());
}
public void producerTest() {
long events = 10;
Properties props = new Properties();
log.info("Broker list is : " + cluster.getConfig().getKafkaBrokerString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.getConfig().getKafkaBrokerString());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 2);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (long nEvents = 0; nEvents < events; nEvents++) {
String ip = "192.168.2." + nEvents;
String msg = "www.example.com," + ip;
ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, ip, msg);
producer.send(data);
producer.flush();
}
producer.close();
}
@Test
public void consumerTest() {
producerTest();
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
KafkaGenericConsumer consumer = new KafkaGenericConsumer
(cluster.getConfig().getZkString(), "1", TOPIC);
consumer.run(1);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
Assert.assertEquals(consumer.getResultQueue().size(), 10);
Assert.assertEquals(consumer.getResultQueue()
.contains("www.example.com,192.168.2.0"), true);
Assert.assertEquals(consumer.getResultQueue()
.contains("www.example.com,192.168.2.9"), true);
Assert.assertEquals(consumer.getResultQueue()
.contains("www.example.com,192.168.2.5"), true);
consumer.shutdown();
}
//@Test
public void genericConsumerTest() {
producerTest();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.getConfig().getKafkaBrokerString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
producerTest();
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("OUTPUT = %d, key = %s, value = %s",
record.offset(), record.key(), record.value());
}
//Assert.assertEquals(consumer.r.size(), 10);
//Assert.assertEquals(consumer.resultQueue().contains("www.example.com,192.168.2.0"), true);
//Assert.assertEquals(consumer.resultQueue().contains("www.example.com,192.168.2.9"), true);
//Assert.assertEquals(consumer.resultQueue().contains("www.example.com,192.168.2.5"), true);
//consumer.close();
}
@After
public void deleteTopic() {
admin.deleteTopic(TOPIC);
}
}