Open Kilda Java Documentation
MessageConsumerConfig.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.northbound.config;
17 
22 
23 import org.apache.kafka.clients.consumer.ConsumerConfig;
24 import org.apache.kafka.common.serialization.StringDeserializer;
25 import org.springframework.beans.factory.annotation.Value;
26 import org.springframework.context.annotation.Bean;
27 import org.springframework.context.annotation.Configuration;
28 import org.springframework.context.annotation.PropertySource;
29 import org.springframework.kafka.annotation.EnableKafka;
30 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
31 import org.springframework.kafka.core.ConsumerFactory;
32 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
33 
34 import java.util.HashMap;
35 import java.util.Map;
36 
40 @Configuration
41 @EnableKafka
42 @PropertySource("classpath:northbound.properties")
43 public class MessageConsumerConfig {
47  private static final int POLL_TIMEOUT = 3000;
48 
52  @Value("${kafka.hosts}")
53  private String kafkaHosts;
54 
58  @Value("#{kafkaGroupConfig.getGroupId()}")
59  private String groupId;
60 
66  @Bean
67  public Map<String, Object> consumerConfigs() {
68  Map<String, Object> props = new HashMap<>();
69  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
70  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
71  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
72  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
73  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
74  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
75  //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
76  //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
77  //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
78  return props;
79  }
80 
89  @Bean
90  public ConsumerFactory<String, String> consumerFactory() {
91  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
92  }
93 
101  @Bean
102  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
103  ConcurrentKafkaListenerContainerFactory<String, String> factory =
104  new ConcurrentKafkaListenerContainerFactory<>();
105  factory.setConsumerFactory(consumerFactory());
106  factory.getContainerProperties().setPollTimeout(POLL_TIMEOUT);
107  //factory.setConcurrency(10);
108  return factory;
109  }
110 
118  @Bean
120  return new KafkaMessageConsumer();
121  }
122 
130  @Bean
132  return new KafkaHealthCheckMessageConsumer();
133  }
134 }
ConcurrentKafkaListenerContainerFactory< String, String > kafkaListenerContainerFactory()