16 package org.openkilda.northbound.config;
23 import org.apache.kafka.clients.consumer.ConsumerConfig;
24 import org.apache.kafka.common.serialization.StringDeserializer;
25 import org.springframework.beans.factory.annotation.Value;
26 import org.springframework.context.annotation.Bean;
27 import org.springframework.context.annotation.Configuration;
28 import org.springframework.context.annotation.PropertySource;
29 import org.springframework.kafka.annotation.EnableKafka;
30 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
31 import org.springframework.kafka.core.ConsumerFactory;
32 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
34 import java.util.HashMap;
42 @PropertySource(
"classpath:northbound.properties")
47 private static final int POLL_TIMEOUT = 3000;
52 @Value(
"${kafka.hosts}")
53 private String kafkaHosts;
58 @Value(
"#{kafkaGroupConfig.getGroupId()}")
59 private String groupId;
68 Map<String, Object> props =
new HashMap<>();
69 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
70 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
71 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
72 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
73 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
74 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
91 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
103 ConcurrentKafkaListenerContainerFactory<String, String> factory =
104 new ConcurrentKafkaListenerContainerFactory<>();
105 factory.setConsumerFactory(consumerFactory());
106 factory.getContainerProperties().setPollTimeout(POLL_TIMEOUT);
HealthCheckMessageConsumer healthCheckMessageConsumer()
ConcurrentKafkaListenerContainerFactory< String, String > kafkaListenerContainerFactory()
ConsumerFactory< String, String > consumerFactory()
MessageConsumer messageConsumer()
Map< String, Object > consumerConfigs()