16 package org.openkilda.northbound.messaging.kafka;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import org.slf4j.MDC.MDCCloseable;
33 import org.springframework.kafka.annotation.KafkaListener;
35 import java.io.IOException;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.stream.Collectors;
47 private static final int HEALTH_CHECK_COMPONENTS_COUNT = 5;
57 private volatile Map<String, HealthCheckInfoData> messages =
new ConcurrentHashMap<>();
64 @KafkaListener(
id =
"northbound-listener-health-check", topics =
"#{kafkaTopicsConfig.getHealthCheckTopic()}")
65 public
void receive(final String record) {
69 logger.trace(
"message received: {}", record);
70 message = MAPPER.readValue(record,
Message.class);
71 }
catch (IOException exception) {
72 logger.error(
"Could not deserialize message: {}", record, exception);
76 try (MDCCloseable closable = MDC.putCloseable(CORRELATION_ID, message.
getCorrelationId())) {
78 logger.debug(
"message received: {}", message);
81 messages.put(healthCheck.
getId(), healthCheck);
83 logger.trace(
"Skip message: {}", message);
92 public Map<String, String>
poll(
final String correlationId) {
95 if (HEALTH_CHECK_COMPONENTS_COUNT == messages.size()) {
96 return messages.values().stream().collect(Collectors.toMap(
101 }
catch (InterruptedException exception) {
102 String errorMessage =
"Unable to poll message";
103 logger.error(
"{}: {}={}", errorMessage, CORRELATION_ID, correlationId);
105 INTERNAL_ERROR, errorMessage,
"kilda-test");
107 return messages.values().stream().collect(Collectors.toMap(
static final ObjectMapper MAPPER
Map< String, String > poll(final String correlationId)
void receive(final String record)
static final String CORRELATION_ID
Destination getDestination()
String getCorrelationId()