Open Kilda Java Documentation
KafkaUtils.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda;
17 
18 
19 import static java.lang.String.format;
22 import static org.openkilda.messaging.Utils.MAPPER;
23 
29 
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import org.apache.kafka.clients.consumer.ConsumerConfig;
32 import org.apache.kafka.clients.consumer.ConsumerRecord;
33 import org.apache.kafka.clients.consumer.ConsumerRecords;
34 import org.apache.kafka.clients.consumer.KafkaConsumer;
35 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
36 import org.apache.kafka.clients.producer.KafkaProducer;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.apache.kafka.clients.producer.RecordMetadata;
39 import org.apache.kafka.common.TopicPartition;
40 
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.Properties;
47 import java.util.concurrent.ExecutionException;
48 
49 
50 public class KafkaUtils {
51 
52  private KafkaParameters settings = new KafkaParameters();
53  private final Properties connectDefaults;
54 
55  public KafkaUtils() throws IOException {
56  connectDefaults = new Properties();
57  connectDefaults.put("bootstrap.servers", settings.getBootstrapServers());
58  connectDefaults.put("client.id", "ATDD");
59  connectDefaults.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
60  connectDefaults.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
61  }
62 
63  public RecordMetadata postMessage(String topic, Message message)
64  throws IOException, ExecutionException, InterruptedException {
65 
66  try (KafkaProducer<Object, Object> producer = new KafkaProducer<>(connectDefaults)) {
67  String messageString = MAPPER.writeValueAsString(message);
68  return producer.send(new ProducerRecord<>(topic, messageString)).get();
69  } catch (JsonProcessingException exception) {
70  System.out.println(format("Error during json serialization: %s",
71  exception.getMessage()));
72  exception.printStackTrace();
73  throw exception;
74  } catch (InterruptedException | ExecutionException exception) {
75  System.out.println(format("Error during KafkaProducer::send: %s",
76  exception.getMessage()));
77  exception.printStackTrace();
78  throw exception;
79  }
80  }
81 
82  public void pollMessage(String topic) {
83  KafkaConsumer<String, String> consumer = createConsumer();
84  consumer.subscribe(Collections.singletonList(topic));
85  ConsumerRecords<String, String> records = consumer.poll(5000);
86  }
87 
88  public KafkaConsumer<String, String> createConsumer() {
89  Properties props = new Properties();
90  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapServers());
91  props.put(ConsumerConfig.GROUP_ID_CONFIG, "ATDD");
92  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
93  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
94  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
95  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
96  "org.apache.kafka.common.serialization.StringDeserializer");
97  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
98  "org.apache.kafka.common.serialization.StringDeserializer");
99  return new KafkaConsumer<>(props);
100  }
101 
103  long timestamp = System.currentTimeMillis();
104  String correlationId = format("atdd-%d", timestamp);
105  CtrlRequest dumpRequest = new CtrlRequest("*", new RequestData("dump"), timestamp,
106  correlationId, WFM_CTRL);
107  try {
108  RecordMetadata postedMessage = postMessage(settings.getControlTopic(), dumpRequest);
109  KafkaConsumer<String, String> consumer = createConsumer();
110  try {
111  consumer.subscribe(Collections.singletonList(settings.getControlTopic()),
112  new NoOpConsumerRebalanceListener() {
113  @Override
114  public void onPartitionsAssigned(
115  Collection<TopicPartition> partitions) {
116  System.out.println("Seek to offset: " + postedMessage.offset());
117  for (TopicPartition topicPartition : partitions) {
118  consumer.seek(topicPartition, postedMessage.offset());
119  }
120  }
121  });
122 
123  List<CtrlResponse> buffer = new ArrayList<>();
124 
125  final int BOLT_COUNT = 4;
126  final int NUMBER_OF_ATTEMPTS = 5;
127  int attempt = 0;
128  while (buffer.size() < BOLT_COUNT && attempt++ < NUMBER_OF_ATTEMPTS) {
129  for (ConsumerRecord<String, String> record : consumer.poll(1000)) {
130  System.out.println(
131  "Received message: (" + record.key() + ", " + record.value()
132  + ") at offset " + record.offset());
133 
134  Message message = MAPPER.readValue(record.value(), Message.class);
135  if (message.getDestination() == CTRL_CLIENT && message.getCorrelationId()
136  .equals(correlationId)) {
137  buffer.add((CtrlResponse) message);
138  }
139  }
140  }
141  return DumpStateManager.fromResponsesList(buffer);
142  } finally {
143  consumer.close();
144  }
145  } catch (Exception e) {
146  e.printStackTrace();
147  return null;
148  }
149  }
150 
154  public CtrlResponse clearTopologyComponentState(String topology, String componentId) {
155  String ctrlRoute = format("%s/%s", topology, componentId);
156  long timestamp = System.currentTimeMillis();
157  String correlationId = format("atdd-%d", timestamp);
158 
159  CtrlRequest clearStateRequest = new CtrlRequest(ctrlRoute, new RequestData("clearState"), timestamp,
160  correlationId, WFM_CTRL);
161  try {
162  RecordMetadata postedMessage = postMessage(settings.getControlTopic(), clearStateRequest);
163  try (KafkaConsumer<String, String> consumer = createConsumer()) {
164  consumer.subscribe(Collections.singletonList(settings.getControlTopic()),
165  new NoOpConsumerRebalanceListener() {
166  @Override
167  public void onPartitionsAssigned(
168  Collection<TopicPartition> partitions) {
169  for (TopicPartition topicPartition : partitions) {
170  consumer.seek(topicPartition, postedMessage.offset());
171  }
172  }
173  });
174 
175  final int NUMBER_OF_ATTEMPTS = 5;
176  int attempt = 0;
177  while (attempt++ < NUMBER_OF_ATTEMPTS) {
178  for (ConsumerRecord<String, String> record : consumer.poll(1000)) {
179  Message message = MAPPER.readValue(record.value(), Message.class);
180  if (message.getDestination() == CTRL_CLIENT
181  && message.getCorrelationId().equals(correlationId)) {
182  return (CtrlResponse) message;
183  }
184  }
185  }
186 
187  return null;
188  }
189  } catch (IOException | ExecutionException | InterruptedException e) {
190  throw new TopologyCtrlProcessingException(format("Unable to clear state on '%s'.", ctrlRoute), e);
191  }
192  }
193 
194  public Properties getConnectDefaults() {
195  return connectDefaults;
196  }
197 }
void pollMessage(String topic)
Definition: KafkaUtils.java:82
Properties getConnectDefaults()
static final ObjectMapper MAPPER
Definition: Utils.java:31
static DumpStateManager fromResponsesList(List< CtrlResponse > responses)
KafkaConsumer< String, String > createConsumer()
Definition: KafkaUtils.java:88
CtrlResponse clearTopologyComponentState(String topology, String componentId)
DumpStateManager getStateDumpsFromBolts()
RecordMetadata postMessage(String topic, Message message)
Definition: KafkaUtils.java:63