16 package org.openkilda.wfm;
18 import static org.hamcrest.core.Is.is;
19 import static org.junit.Assert.assertThat;
26 import kafka.consumer.Consumer;
27 import kafka.consumer.ConsumerConfig;
28 import kafka.consumer.ConsumerIterator;
29 import kafka.consumer.KafkaStream;
30 import kafka.javaapi.consumer.ConsumerConnector;
31 import kafka.message.MessageAndMetadata;
32 import kafka.serializer.StringDecoder;
33 import org.apache.kafka.clients.producer.KafkaProducer;
34 import org.apache.kafka.clients.producer.Producer;
35 import org.apache.kafka.clients.producer.ProducerRecord;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.kohsuke.args4j.CmdLineException;
42 import java.util.HashMap;
43 import java.util.List;
45 import java.util.Properties;
52 public static final String
topic =
"simple-kafka";
53 private static final String[] NO_ARGS = {};
59 private Producer<String, String> producer;
60 private ConsumerConnector consumerConnector;
65 zooKeeperConfig = configurationProvider.getConfiguration(
ZookeeperConfig.class);
66 kafkaConfig = configurationProvider.getConfiguration(
KafkaConfig.class);
70 public void setup() throws Exception {
78 consumerConnector.shutdown();
89 producer =
new KafkaProducer<>(producerProps());
95 MessageAndMetadata<String, String> messageAndMetadata = it.next();
96 String
value = messageAndMetadata.message();
97 assertThat(
value, is(
"message"));
100 private ConsumerIterator<String, String> buildConsumer(String
topic) {
101 Properties props = consumerProperties();
103 Map<String, Integer> topicCountMap =
new HashMap<>();
104 topicCountMap.put(
topic, 1);
105 ConsumerConfig consumerConfig =
new ConsumerConfig(props);
106 consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
107 Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(
108 topicCountMap,
new StringDecoder(null),
new StringDecoder(null));
109 KafkaStream<String, String> stream = consumers.get(
topic).get(0);
110 return stream.iterator();
113 private Properties consumerProperties() {
114 Properties props =
new Properties();
115 props.put(
"zookeeper.connect", zooKeeperConfig.
getHosts());
116 props.put(
"group.id",
"group1");
117 props.put(
"auto.offset.reset",
"smallest");
121 private Properties producerProps() {
122 Properties props =
new Properties();
123 props.put(
"bootstrap.servers", kafkaConfig.
getHosts());
124 props.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
125 props.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
126 props.put(
"request.required.acks",
"1");
ConfigurationProvider getConfigurationProvider(String... prefixes)
static final String topic
void shouldWriteThenRead()
static void allocateConfig()