Open Kilda Java Documentation
KafkaMessageConsumer.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.northbound.messaging.kafka;
17 
19 import static org.openkilda.messaging.Utils.MAPPER;
22 
26 
27 import org.apache.commons.collections4.map.PassiveExpiringMap;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.slf4j.MDC;
31 import org.slf4j.MDC.MDCCloseable;
32 import org.springframework.beans.factory.annotation.Value;
33 import org.springframework.context.annotation.PropertySource;
34 import org.springframework.kafka.annotation.KafkaListener;
35 import org.springframework.stereotype.Component;
36 
37 import java.io.IOException;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.TimeUnit;
41 import javax.annotation.PostConstruct;
42 
46 @Component
47 @PropertySource("classpath:northbound.properties")
48 public class KafkaMessageConsumer implements MessageConsumer<Message> {
52  public static final String INTERRUPTED_ERROR_MESSAGE = "Unable to poll message";
53 
57  public static final String TIMEOUT_ERROR_MESSAGE = "Timeout for message poll";
58 
62  private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);
63 
64  @Value("${northbound.messages.expiration.minutes}")
65  private int expiredTime;
66 
67  @Value("#{kafkaTopicsConfig.getNorthboundTopic()}")
68  private String northboundTopic;
69 
73  private Map<String, Message> messages;
74 
75  @PostConstruct
76  public void setUp() {
77  messages = new PassiveExpiringMap<>(expiredTime, TimeUnit.MINUTES, new ConcurrentHashMap<String, Message>());
78  }
79 
85  @KafkaListener(id = "northbound-listener", topics = "#{kafkaTopicsConfig.getNorthboundTopic()}")
86  public void receive(final String record) {
87  Message message;
88 
89  try {
90  logger.trace("message received: {}", record);
91  message = MAPPER.readValue(record, Message.class);
92  } catch (IOException exception) {
93  logger.error("Could not deserialize message: {}", record, exception);
94  return;
95  }
96 
97  try (MDCCloseable closable = MDC.putCloseable(CORRELATION_ID, message.getCorrelationId())) {
98  logger.debug("message received: {}", message);
99  messages.put(message.getCorrelationId(), message);
100  }
101  }
102 
106  @Override
107  public Message poll(final String correlationId) {
108  try {
109  for (int i = POLL_TIMEOUT / POLL_PAUSE; i < POLL_TIMEOUT; i += POLL_PAUSE) {
110  if (messages.containsKey(correlationId)) {
111  return messages.remove(correlationId);
112  }
113  Thread.sleep(POLL_PAUSE);
114  }
115  } catch (InterruptedException exception) {
116  logger.error("{}: {}={}", INTERRUPTED_ERROR_MESSAGE, CORRELATION_ID, correlationId);
117  throw new MessageException(correlationId, System.currentTimeMillis(),
118  INTERNAL_ERROR, INTERRUPTED_ERROR_MESSAGE, northboundTopic);
119  }
120  logger.error("{}: {}={}", TIMEOUT_ERROR_MESSAGE, CORRELATION_ID, correlationId);
121  throw new MessageException(correlationId, System.currentTimeMillis(),
122  OPERATION_TIMED_OUT, TIMEOUT_ERROR_MESSAGE, northboundTopic);
123  }
124 
125  //todo(Nikita C): rewrite current poll method using async way.
126  /*
127  @Async
128  public CompletableFuture<Message> asyncPoll(final String correlationId) {
129  try {
130  for (int i = POLL_TIMEOUT / POLL_PAUSE; i < POLL_TIMEOUT; i += POLL_PAUSE) {
131  if (messages.containsKey(correlationId)) {
132  return CompletableFuture.completedFuture(messages.remove(correlationId));
133  } else if (messages.containsKey(SYSTEM_CORRELATION_ID)) {
134  return CompletableFuture.completedFuture(messages.remove(SYSTEM_CORRELATION_ID));
135  }
136  Thread.sleep(POLL_PAUSE);
137  }
138  } catch (InterruptedException exception) {
139  logger.error("{}: {}={}", INTERRUPTED_ERROR_MESSAGE, CORRELATION_ID, correlationId);
140  throw new MessageException(correlationId, System.currentTimeMillis(),
141  INTERNAL_ERROR, INTERRUPTED_ERROR_MESSAGE, Topic.NORTHBOUND);
142  }
143  logger.error("{}: {}={}", TIMEOUT_ERROR_MESSAGE, CORRELATION_ID, correlationId);
144  throw new MessageException(correlationId, System.currentTimeMillis(),
145  OPERATION_TIMED_OUT, TIMEOUT_ERROR_MESSAGE, Topic.NORTHBOUND);
146  }
147  */
148 
152  @Override
153  public void clear() {
154  //we shouldn't clear up collection, outdated messages are removing by default.
155  //messages.clear();
156  }
157 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
def receive(bootstrap_servers, topic, callback, offset)
Definition: messaging.py:80
static final String CORRELATION_ID
Definition: Utils.java:43