Open Kilda Java Documentation
Consumer.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka;
17 
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static java.lang.String.format;
20 import static java.util.Objects.requireNonNull;
21 
23 
24 import com.google.common.annotations.VisibleForTesting;
25 import org.apache.kafka.clients.consumer.ConsumerRecord;
26 import org.apache.kafka.clients.consumer.ConsumerRecords;
27 import org.apache.kafka.clients.consumer.KafkaConsumer;
28 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
29 import org.apache.kafka.common.TopicPartition;
30 import org.apache.kafka.common.errors.InterruptException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.concurrent.ExecutorService;
41 
42 public class Consumer implements Runnable {
43  private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
44 
45  private final List<String> topics;
46  private final KafkaConsumerConfig kafkaConfig;
47  private final ExecutorService handlersPool;
48  private final RecordHandler.Factory handlerFactory;
49  private final ISwitchManager switchManager; // HACK alert.. adding to facilitate safeSwitchTick()
50 
51 
52  public Consumer(KafkaConsumerConfig kafkaConfig, ExecutorService handlersPool,
53  RecordHandler.Factory handlerFactory, ISwitchManager switchManager,
54  String topic, String... moreTopics) {
55  this.topics = new ArrayList<>(moreTopics.length + 1);
56  this.topics.add(requireNonNull(topic));
57  this.topics.addAll(Arrays.asList(moreTopics));
58 
59  this.kafkaConfig = requireNonNull(kafkaConfig);
60  this.handlersPool = requireNonNull(handlersPool);
61  this.handlerFactory = requireNonNull(handlerFactory);
62  this.switchManager = requireNonNull(switchManager);
63  }
64 
65  @Override
66  public void run() {
67  while (true) {
68  /*
69  * Ensure we try to keep processing messages. It is possible that the consumer needs
70  * to be re-created, either due to internal error, or if it fails to poll within the
71  * max.poll.interval.ms seconds.
72  *
73  * From the Kafka source code, here are the default values for the following fields:
74  * - max.poll.interval.ms = 300000 (ie 300 seconds)
75  * - max.poll.records = 500 (must be able to process about 2 records per second
76  */
77  try (KafkaConsumer<String, String> consumer =
78  new KafkaConsumer<>(kafkaConfig.createKafkaConsumerProperties())) {
79  consumer.subscribe(topics);
80 
81  KafkaOffsetRegistry offsetRegistry =
82  new KafkaOffsetRegistry(consumer, kafkaConfig.getAutoCommitInterval());
83 
84  while (true) {
85  try {
86  ConsumerRecords<String, String> batch = consumer.poll(100);
87  if (batch.isEmpty()) {
88  continue;
89  }
90 
91  logger.debug("Received records batch contain {} messages", batch.count());
92 
93  for (ConsumerRecord<String, String> record : batch) {
94  handle(record);
95 
96  offsetRegistry.addAndCommit(record);
97  }
98  } finally {
99  // force to commit after each completed batch or in a case of an exception / error.
100  offsetRegistry.commitOffsets();
101  }
102 
103  switchManager.safeModeTick(); // HACK alert .. should go in its own timer loop
104  }
105  } catch (InterruptException ex) {
106  // Leave if the thread has been interrupted.
107  throw ex;
108  } catch (Exception e) {
109  // Just log the exception, and start processing again with a new consumer.
110  logger.error("Exception received during main kafka consumer loop: {}", e);
111  }
112  }
113  }
114 
115  protected void handle(ConsumerRecord<String, String> record) {
116  logger.trace("received message: {} - {}", record.offset(), record.value());
117  handlersPool.execute(handlerFactory.produce(record));
118  }
119 
125  @VisibleForTesting
126  static class KafkaOffsetRegistry {
127  private final KafkaConsumer<String, String> consumer;
128  private final long autoCommitInterval;
129  private final Map<TopicPartition, Long> partitionToUncommittedOffset = new HashMap<>();
130  private long lastCommitTime;
131 
132  KafkaOffsetRegistry(KafkaConsumer<String, String> consumer, long autoCommitInterval) {
133  this.consumer = requireNonNull(consumer);
134  checkArgument(autoCommitInterval > 0, "autoCommitInterval must be positive");
135  this.autoCommitInterval = autoCommitInterval;
136 
137  lastCommitTime = System.currentTimeMillis();
138  }
139 
144  void addAndCommit(ConsumerRecord<String, String> record) {
145  TopicPartition partition = new TopicPartition(record.topic(), record.partition());
146 
147  Long previousOffset = partitionToUncommittedOffset.get(partition);
148  if (previousOffset != null && previousOffset > record.offset()) {
149  throw new IllegalArgumentException(
150  format("The record has offset %d which less than the previously added %d.",
151  record.offset(), previousOffset));
152  }
153 
154  partitionToUncommittedOffset.put(partition, record.offset());
155 
156  // commit offsets of processed messages
157  if ((System.currentTimeMillis() - lastCommitTime) >= autoCommitInterval) {
158  commitOffsets();
159  }
160  }
161 
165  void commitOffsets() {
166  if (!partitionToUncommittedOffset.isEmpty()) {
167  Map<TopicPartition, OffsetAndMetadata> partitionToMetadata = new HashMap<>();
168  for (Entry<TopicPartition, Long> e : partitionToUncommittedOffset.entrySet()) {
169  partitionToMetadata.put(e.getKey(), new OffsetAndMetadata(e.getValue() + 1));
170  }
171 
172  consumer.commitSync(partitionToMetadata);
173 
174  partitionToUncommittedOffset.clear();
175  }
176 
177  lastCommitTime = System.currentTimeMillis();
178  }
179  }
180 }
Consumer(KafkaConsumerConfig kafkaConfig, ExecutorService handlersPool, RecordHandler.Factory handlerFactory, ISwitchManager switchManager, String topic, String... moreTopics)
Definition: Consumer.java:52
void handle(ConsumerRecord< String, String > record)
Definition: Consumer.java:115