16 package org.openkilda;
19 import static java.lang.String.format;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import org.apache.kafka.clients.consumer.ConsumerConfig;
32 import org.apache.kafka.clients.consumer.ConsumerRecord;
33 import org.apache.kafka.clients.consumer.ConsumerRecords;
34 import org.apache.kafka.clients.consumer.KafkaConsumer;
35 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
36 import org.apache.kafka.clients.producer.KafkaProducer;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.apache.kafka.clients.producer.RecordMetadata;
39 import org.apache.kafka.common.TopicPartition;
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.Properties;
47 import java.util.concurrent.ExecutionException;
53 private final Properties connectDefaults;
56 connectDefaults =
new Properties();
57 connectDefaults.put(
"bootstrap.servers", settings.getBootstrapServers());
58 connectDefaults.put(
"client.id",
"ATDD");
59 connectDefaults.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
60 connectDefaults.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
64 throws IOException, ExecutionException, InterruptedException {
66 try (KafkaProducer<Object, Object>
producer =
new KafkaProducer<>(connectDefaults)) {
67 String messageString = MAPPER.writeValueAsString(message);
68 return producer.send(
new ProducerRecord<>(
topic, messageString)).get();
69 }
catch (JsonProcessingException exception) {
70 System.out.println(
format(
"Error during json serialization: %s",
71 exception.getMessage()));
72 exception.printStackTrace();
74 }
catch (InterruptedException | ExecutionException exception) {
75 System.out.println(
format(
"Error during KafkaProducer::send: %s",
76 exception.getMessage()));
77 exception.printStackTrace();
85 ConsumerRecords<String, String> records =
consumer.poll(5000);
89 Properties props =
new Properties();
90 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapServers());
91 props.put(ConsumerConfig.GROUP_ID_CONFIG,
"ATDD");
92 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
93 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"1000");
94 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
95 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
96 "org.apache.kafka.common.serialization.StringDeserializer");
97 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
98 "org.apache.kafka.common.serialization.StringDeserializer");
99 return new KafkaConsumer<>(props);
103 long timestamp = System.currentTimeMillis();
104 String correlationId =
format(
"atdd-%d", timestamp);
106 correlationId, WFM_CTRL);
108 RecordMetadata postedMessage =
postMessage(settings.getControlTopic(), dumpRequest);
111 consumer.subscribe(Collections.singletonList(settings.getControlTopic()),
112 new NoOpConsumerRebalanceListener() {
114 public void onPartitionsAssigned(
115 Collection<TopicPartition> partitions) {
116 System.out.println(
"Seek to offset: " + postedMessage.offset());
117 for (TopicPartition topicPartition : partitions) {
118 consumer.seek(topicPartition, postedMessage.offset());
123 List<CtrlResponse> buffer =
new ArrayList<>();
125 final int BOLT_COUNT = 4;
126 final int NUMBER_OF_ATTEMPTS = 5;
128 while (buffer.size() < BOLT_COUNT && attempt++ < NUMBER_OF_ATTEMPTS) {
129 for (ConsumerRecord<String, String> record :
consumer.poll(1000)) {
131 "Received message: (" + record.key() +
", " + record.value()
132 +
") at offset " + record.offset());
134 Message message = MAPPER.readValue(record.value(),
Message.class);
136 .equals(correlationId)) {
145 }
catch (Exception e) {
156 long timestamp = System.currentTimeMillis();
157 String correlationId =
format(
"atdd-%d", timestamp);
160 correlationId, WFM_CTRL);
162 RecordMetadata postedMessage =
postMessage(settings.getControlTopic(), clearStateRequest);
164 consumer.subscribe(Collections.singletonList(settings.getControlTopic()),
165 new NoOpConsumerRebalanceListener() {
167 public void onPartitionsAssigned(
168 Collection<TopicPartition> partitions) {
169 for (TopicPartition topicPartition : partitions) {
170 consumer.seek(topicPartition, postedMessage.offset());
175 final int NUMBER_OF_ATTEMPTS = 5;
177 while (attempt++ < NUMBER_OF_ATTEMPTS) {
178 for (ConsumerRecord<String, String> record :
consumer.poll(1000)) {
179 Message message = MAPPER.readValue(record.value(),
Message.class);
189 }
catch (IOException | ExecutionException | InterruptedException e) {
195 return connectDefaults;
void pollMessage(String topic)
Properties getConnectDefaults()
static final ObjectMapper MAPPER
static DumpStateManager fromResponsesList(List< CtrlResponse > responses)
KafkaConsumer< String, String > createConsumer()
CtrlResponse clearTopologyComponentState(String topology, String componentId)
DumpStateManager getStateDumpsFromBolts()
Destination getDestination()
RecordMetadata postMessage(String topic, Message message)
String getCorrelationId()