16 package org.openkilda.wfm.topology;
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.ConsumerRecords;
25 import org.apache.kafka.clients.consumer.KafkaConsumer;
26 import org.apache.kafka.common.errors.WakeupException;
28 import java.io.IOException;
29 import java.util.Collections;
30 import java.util.Optional;
31 import java.util.Properties;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.TimeUnit;
37 private static final long CONSUMER_QUEUE_OFFER_TIMEOUT = 1000;
38 private static final long KAFKA_MESSAGE_POLL_TIMEOUT = 30000;
39 private static final long KAFKA_CONSUMER_POLL_TIMEOUT = 100;
40 private final KafkaConsumer<String, String> consumer;
41 private final String topic;
43 private boolean checkDestination =
true;
44 private volatile BlockingQueue<ConsumerRecord<String, String>> records =
new ArrayBlockingQueue<>(100);
48 this.consumer =
new KafkaConsumer<>(properties);
50 this.destination = destination;
54 this(topic, null, properties);
55 checkDestination =
false;
59 System.out.println(
"Starting Kafka Consumer for " + topic);
60 consumer.subscribe(Collections.singletonList(topic));
63 ConsumerRecords<String, String> records = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT);
64 for (ConsumerRecord<String, String> record : records) {
65 if (checkDestination(record.value())) {
66 this.records.offer(record, CONSUMER_QUEUE_OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
67 consumer.commitSync();
68 System.out.println(String.format(
"Received message with destination %s: %s",
69 destination, record.value()));
73 }
catch (WakeupException e) {
74 System.out.println(
"Stopping Kafka Consumer for " + topic);
75 }
catch (InterruptedException e) {
76 System.out.println(
"Interrupting Kafka Consumer for " + topic);
78 consumer.unsubscribe();
83 public ConsumerRecord<String, String>
pollMessage() throws InterruptedException {
87 public ConsumerRecord<String, String>
pollMessage(
final long timeout)
throws InterruptedException {
88 return records.poll(timeout, TimeUnit.MILLISECONDS);
103 private boolean checkDestination(
final String recordValue) {
105 if (!checkDestination) {
111 if (destination != null) {
117 Message message = MAPPER.readValue(recordValue, Message.class);
118 if (message != null) {
122 }
catch (IOException exception) {
123 System.out.println(String.format(
"Can not deserialize %s with destination %s ", recordValue, destination));
124 exception.printStackTrace();
static final ObjectMapper MAPPER
TestKafkaConsumer(final String topic, final Destination destination, final Properties properties)
TestKafkaConsumer(final String topic, final Properties properties)
Destination getDestination()
String pollMessageValue()
ConsumerRecord< String, String > pollMessage(final long timeout)
ConsumerRecord< String, String > pollMessage()