16 package org.openkilda.northbound.messaging.kafka;
27 import org.apache.commons.collections4.map.PassiveExpiringMap;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import org.slf4j.MDC.MDCCloseable;
32 import org.springframework.beans.factory.annotation.Value;
33 import org.springframework.context.annotation.PropertySource;
34 import org.springframework.kafka.annotation.KafkaListener;
35 import org.springframework.stereotype.Component;
37 import java.io.IOException;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.TimeUnit;
41 import javax.annotation.PostConstruct;
47 @PropertySource(
"classpath:northbound.properties")
52 public static final String INTERRUPTED_ERROR_MESSAGE =
"Unable to poll message";
57 public static final String TIMEOUT_ERROR_MESSAGE =
"Timeout for message poll";
64 @Value(
"${northbound.messages.expiration.minutes}")
65 private
int expiredTime;
67 @Value("
#{kafkaTopicsConfig.getNorthboundTopic()}") 68 private String northboundTopic;
73 private Map<String, Message> messages;
77 messages =
new PassiveExpiringMap<>(expiredTime, TimeUnit.MINUTES,
new ConcurrentHashMap<String, Message>());
85 @KafkaListener(
id =
"northbound-listener", topics =
"#{kafkaTopicsConfig.getNorthboundTopic()}")
86 public
void receive(final String record) {
90 logger.trace(
"message received: {}", record);
91 message = MAPPER.readValue(record,
Message.class);
92 }
catch (IOException exception) {
93 logger.error(
"Could not deserialize message: {}", record, exception);
97 try (MDCCloseable closable = MDC.putCloseable(CORRELATION_ID, message.
getCorrelationId())) {
98 logger.debug(
"message received: {}", message);
109 for (
int i = POLL_TIMEOUT / POLL_PAUSE;
i < POLL_TIMEOUT;
i += POLL_PAUSE) {
110 if (messages.containsKey(correlationId)) {
111 return messages.remove(correlationId);
113 Thread.sleep(POLL_PAUSE);
115 }
catch (InterruptedException exception) {
116 logger.error(
"{}: {}={}", INTERRUPTED_ERROR_MESSAGE, CORRELATION_ID, correlationId);
118 INTERNAL_ERROR, INTERRUPTED_ERROR_MESSAGE, northboundTopic);
120 logger.error(
"{}: {}={}", TIMEOUT_ERROR_MESSAGE, CORRELATION_ID, correlationId);
122 OPERATION_TIMED_OUT, TIMEOUT_ERROR_MESSAGE, northboundTopic);
static final ObjectMapper MAPPER
Message poll(final String correlationId)
def receive(bootstrap_servers, topic, callback, offset)
static final String CORRELATION_ID
String getCorrelationId()