16 package org.openkilda.wfm;
22 import static org.hamcrest.core.Is.is;
23 import static org.junit.Assert.assertThat;
25 import kafka.consumer.Consumer;
26 import kafka.consumer.ConsumerConfig;
27 import kafka.consumer.ConsumerIterator;
28 import kafka.consumer.KafkaStream;
29 import kafka.javaapi.consumer.ConsumerConnector;
30 import kafka.message.MessageAndMetadata;
31 import kafka.serializer.StringDecoder;
32 import kafka.server.KafkaConfig;
33 import kafka.server.KafkaServerStartable;
34 import org.apache.curator.test.TestingServer;
35 import org.apache.kafka.clients.producer.KafkaProducer;
36 import org.apache.kafka.clients.producer.Producer;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
42 import java.io.IOException;
43 import java.util.HashMap;
44 import java.util.List;
46 import java.util.Properties;
50 public static final String
topic =
"topic1-" + System.currentTimeMillis();
52 private KafkaTestFixture server;
53 private Producer<String, String> producer;
54 private ConsumerConnector consumerConnector;
57 public void setup() throws Exception {
58 server =
new KafkaTestFixture();
59 server.start(serverProperties());
65 consumerConnector.shutdown();
73 ConsumerIterator<String, String> it = buildConsumer(
Original.
topic);
76 producer =
new KafkaProducer<>(producerProps());
79 producer.send(
new ProducerRecord<>(
Original.
topic,
"message")).get();
82 MessageAndMetadata<String, String> messageAndMetadata = it.next();
83 String
value = messageAndMetadata.message();
84 assertThat(
value, is(
"message"));
87 private ConsumerIterator<String, String> buildConsumer(String
topic) {
88 Properties props = consumerProperties();
90 Map<String, Integer> topicCountMap =
new HashMap<>();
91 topicCountMap.put(
topic, 1);
92 ConsumerConfig consumerConfig =
new ConsumerConfig(props);
93 consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
94 Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap,
new StringDecoder(null),
new StringDecoder(null));
95 KafkaStream<String, String> stream = consumers.get(
topic).get(0);
96 return stream.iterator();
99 private Properties consumerProperties() {
100 Properties props =
new Properties();
101 props.put(
"zookeeper.connect", serverProperties().
get(
"zookeeper.connect"));
102 props.put(
"group.id",
"group1");
103 props.put(
"auto.offset.reset",
"smallest");
107 private Properties producerProps() {
108 Properties props =
new Properties();
109 props.put(
"bootstrap.servers",
"localhost:9092");
110 props.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
111 props.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
112 props.put(
"request.required.acks",
"1");
116 private Properties serverProperties() {
117 Properties props =
new Properties();
118 props.put(
"zookeeper.connect",
"localhost:2181");
119 props.put(
"broker.id",
"1");
123 private static class KafkaTestFixture {
124 private TestingServer zk;
125 private KafkaServerStartable kafka;
127 public void start(Properties properties)
throws Exception {
128 Integer
port = getZkPort(properties);
129 zk =
new TestingServer(
port);
132 KafkaConfig kafkaConfig =
new KafkaConfig(properties);
133 kafka =
new KafkaServerStartable(kafkaConfig);
137 public void stop() throws IOException {
143 private int getZkPort(Properties properties) {
144 String
url = (String) properties.get(
"zookeeper.connect");
145 String
port =
url.split(
":")[1];
146 return Integer.valueOf(
port);
void shouldWriteThenRead()
static final String topic