Open Kilda Java Documentation
TestKafkaConsumer.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
22 
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.ConsumerRecords;
25 import org.apache.kafka.clients.consumer.KafkaConsumer;
26 import org.apache.kafka.common.errors.WakeupException;
27 
28 import java.io.IOException;
29 import java.util.Collections;
30 import java.util.Optional;
31 import java.util.Properties;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 
36 public class TestKafkaConsumer extends Thread {
37  private static final long CONSUMER_QUEUE_OFFER_TIMEOUT = 1000;
38  private static final long KAFKA_MESSAGE_POLL_TIMEOUT = 30000;
39  private static final long KAFKA_CONSUMER_POLL_TIMEOUT = 100;
40  private final KafkaConsumer<String, String> consumer;
41  private final String topic;
42  private final Destination destination;
43  private boolean checkDestination = true;
44  private volatile BlockingQueue<ConsumerRecord<String, String>> records = new ArrayBlockingQueue<>(100);
45 
46 
47  public TestKafkaConsumer(final String topic, final Destination destination, final Properties properties) {
48  this.consumer = new KafkaConsumer<>(properties);
49  this.topic = topic;
50  this.destination = destination;
51  }
52 
53  public TestKafkaConsumer(final String topic, final Properties properties) {
54  this(topic, null, properties);
55  checkDestination = false;
56  }
57 
58  public void run() {
59  System.out.println("Starting Kafka Consumer for " + topic);
60  consumer.subscribe(Collections.singletonList(topic));
61  try {
62  while (true) {
63  ConsumerRecords<String, String> records = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT);
64  for (ConsumerRecord<String, String> record : records) {
65  if (checkDestination(record.value())) {
66  this.records.offer(record, CONSUMER_QUEUE_OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
67  consumer.commitSync();
68  System.out.println(String.format("Received message with destination %s: %s",
69  destination, record.value()));
70  }
71  }
72  }
73  } catch (WakeupException e) {
74  System.out.println("Stopping Kafka Consumer for " + topic);
75  } catch (InterruptedException e) {
76  System.out.println("Interrupting Kafka Consumer for " + topic);
77  } finally {
78  consumer.unsubscribe();
79  consumer.close();
80  }
81  }
82 
83  public ConsumerRecord<String, String> pollMessage() throws InterruptedException {
84  return pollMessage(KAFKA_MESSAGE_POLL_TIMEOUT);
85  }
86 
87  public ConsumerRecord<String, String> pollMessage(final long timeout) throws InterruptedException {
88  return records.poll(timeout, TimeUnit.MILLISECONDS);
89  }
90 
91  public String pollMessageValue() throws InterruptedException {
92  return Optional.ofNullable(pollMessage()).map(ConsumerRecord::value).orElse(null);
93  }
94 
95  public void clear() {
96  records.clear();
97  }
98 
99  public void wakeup() {
100  consumer.wakeup();
101  }
102 
103  private boolean checkDestination(final String recordValue) {
104 
105  if (!checkDestination) {
106  return true;
107  }
108 
109  boolean result = false;
110  try {
111  if (destination != null) {
112  Message message = MAPPER.readValue(recordValue, Message.class);
113  if (destination.equals(message.getDestination())) {
114  result = true;
115  }
116  } else {
117  Message message = MAPPER.readValue(recordValue, Message.class);
118  if (message != null) {
119  result = true;
120  }
121  }
122  } catch (IOException exception) {
123  System.out.println(String.format("Can not deserialize %s with destination %s ", recordValue, destination));
124  exception.printStackTrace();
125  }
126  return result;
127  }
128 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
value
Definition: nodes.py:62
TestKafkaConsumer(final String topic, final Destination destination, final Properties properties)
TestKafkaConsumer(final String topic, final Properties properties)
list result
Definition: plan-d.py:72
ConsumerRecord< String, String > pollMessage(final long timeout)
ConsumerRecord< String, String > pollMessage()