16 package org.openkilda.floodlight.kafka;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static java.lang.String.format;
20 import static java.util.Objects.requireNonNull;
24 import com.google.common.annotations.VisibleForTesting;
25 import org.apache.kafka.clients.consumer.ConsumerRecord;
26 import org.apache.kafka.clients.consumer.ConsumerRecords;
27 import org.apache.kafka.clients.consumer.KafkaConsumer;
28 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
29 import org.apache.kafka.common.TopicPartition;
30 import org.apache.kafka.common.errors.InterruptException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.Map.Entry;
40 import java.util.concurrent.ExecutorService;
43 private static final Logger logger = LoggerFactory.getLogger(
Consumer.class);
45 private final List<String> topics;
47 private final ExecutorService handlersPool;
48 private final RecordHandler.Factory handlerFactory;
53 RecordHandler.Factory handlerFactory,
ISwitchManager switchManager,
54 String
topic, String... moreTopics) {
55 this.topics =
new ArrayList<>(moreTopics.length + 1);
56 this.topics.add(requireNonNull(
topic));
57 this.topics.addAll(Arrays.asList(moreTopics));
59 this.kafkaConfig = requireNonNull(kafkaConfig);
60 this.handlersPool = requireNonNull(handlersPool);
61 this.handlerFactory = requireNonNull(handlerFactory);
62 this.switchManager = requireNonNull(switchManager);
77 try (KafkaConsumer<String, String>
consumer =
81 KafkaOffsetRegistry offsetRegistry =
86 ConsumerRecords<String, String> batch =
consumer.poll(100);
87 if (batch.isEmpty()) {
91 logger.debug(
"Received records batch contain {} messages", batch.count());
93 for (ConsumerRecord<String, String> record : batch) {
96 offsetRegistry.addAndCommit(record);
100 offsetRegistry.commitOffsets();
105 }
catch (InterruptException ex) {
108 }
catch (Exception e) {
110 logger.error(
"Exception received during main kafka consumer loop: {}", e);
115 protected void handle(ConsumerRecord<String, String> record) {
116 logger.trace(
"received message: {} - {}", record.offset(), record.value());
117 handlersPool.execute(handlerFactory.produce(record));
126 static class KafkaOffsetRegistry {
127 private final KafkaConsumer<String, String>
consumer;
128 private final long autoCommitInterval;
129 private final Map<TopicPartition, Long> partitionToUncommittedOffset =
new HashMap<>();
130 private long lastCommitTime;
132 KafkaOffsetRegistry(KafkaConsumer<String, String>
consumer,
long autoCommitInterval) {
133 this.consumer = requireNonNull(
consumer);
134 checkArgument(autoCommitInterval > 0,
"autoCommitInterval must be positive");
135 this.autoCommitInterval = autoCommitInterval;
137 lastCommitTime = System.currentTimeMillis();
144 void addAndCommit(ConsumerRecord<String, String> record) {
145 TopicPartition partition =
new TopicPartition(record.topic(), record.partition());
147 Long previousOffset = partitionToUncommittedOffset.get(partition);
148 if (previousOffset != null && previousOffset > record.offset()) {
149 throw new IllegalArgumentException(
150 format(
"The record has offset %d which less than the previously added %d.",
151 record.offset(), previousOffset));
154 partitionToUncommittedOffset.put(partition, record.offset());
157 if ((System.currentTimeMillis() - lastCommitTime) >= autoCommitInterval) {
165 void commitOffsets() {
166 if (!partitionToUncommittedOffset.isEmpty()) {
167 Map<TopicPartition, OffsetAndMetadata> partitionToMetadata =
new HashMap<>();
168 for (Entry<TopicPartition, Long> e : partitionToUncommittedOffset.entrySet()) {
169 partitionToMetadata.put(e.getKey(),
new OffsetAndMetadata(e.getValue() + 1));
172 consumer.commitSync(partitionToMetadata);
174 partitionToUncommittedOffset.clear();
177 lastCommitTime = System.currentTimeMillis();
long getAutoCommitInterval()
Consumer(KafkaConsumerConfig kafkaConfig, ExecutorService handlersPool, RecordHandler.Factory handlerFactory, ISwitchManager switchManager, String topic, String... moreTopics)
default Properties createKafkaConsumerProperties()
void handle(ConsumerRecord< String, String > record)