Open Kilda Java Documentation
KafkaMessageCollector.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka;
17 
21 
22 import net.floodlightcontroller.core.module.FloodlightModuleContext;
23 import net.floodlightcontroller.core.module.FloodlightModuleException;
24 import net.floodlightcontroller.core.module.IFloodlightModule;
25 import net.floodlightcontroller.core.module.IFloodlightService;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Map;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionHandler;
35 import java.util.concurrent.SynchronousQueue;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 
39 public class KafkaMessageCollector implements IFloodlightModule {
40  private static final Logger logger = LoggerFactory.getLogger(KafkaMessageCollector.class);
41 
45  @Override
46  public Collection<Class<? extends IFloodlightService>> getModuleServices() {
47  return null;
48  }
49 
50  @Override
51  public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
52  return null;
53  }
54 
55  @Override
56  public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
57  Collection<Class<? extends IFloodlightService>> services = new ArrayList<>();
59  return services;
60  }
61 
62  @Override
63  public void init(FloodlightModuleContext context) {
64  }
65 
66  @Override
67  public void startUp(FloodlightModuleContext moduleContext) throws FloodlightModuleException {
68  logger.info("Starting {}", this.getClass().getCanonicalName());
69 
70  ConfigurationProvider provider = ConfigurationProvider.of(moduleContext, this);
71  KafkaConsumerConfig consumerConfig = provider.getConfiguration(KafkaConsumerConfig.class);
72  KafkaTopicsConfig topicsConfig = provider.getConfiguration(KafkaTopicsConfig.class);
73 
74  logger.info("Consumer executor threads count is {} (fixed)", consumerConfig.getExecutorCount());
75 
76  // A thread pool of fixed sized and no work queue.
77  ExecutorService parseRecordExecutor = new ThreadPoolExecutor(consumerConfig.getExecutorCount(),
78  consumerConfig.getExecutorCount(), 0L, TimeUnit.MILLISECONDS,
79  new SynchronousQueue<>(), new RetryableExecutionHandler());
80 
81  ConsumerContext context = new ConsumerContext(moduleContext, topicsConfig);
82  RecordHandler.Factory handlerFactory = new RecordHandler.Factory(context);
83 
84  ISwitchManager switchManager = context.getSwitchManager();
85  String inputTopic = topicsConfig.getSpeakerTopic();
86 
87  try {
89  if (!consumerConfig.isTestingMode()) {
90  consumer = new Consumer(consumerConfig, parseRecordExecutor, handlerFactory, switchManager,
91  inputTopic);
92  } else {
93  consumer = new TestAwareConsumer(context,
94  consumerConfig, parseRecordExecutor, handlerFactory, switchManager, inputTopic);
95  }
96  Executors.newSingleThreadExecutor().execute(consumer);
97  } catch (Exception exception) {
98  logger.error("error", exception);
99  }
100  }
101 
106  private class RetryableExecutionHandler implements RejectedExecutionHandler {
107  @Override
108  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
109  if (!executor.isShutdown()) {
110  try {
111  executor.getQueue().put(r);
112  } catch (InterruptedException e) {
113  logger.error("Couldn't retry to process message", e);
114  }
115  }
116  }
117  }
118 }
static void fillDependencies(Collection< Class<? extends IFloodlightService >> dependencies)
Collection< Class<? extends IFloodlightService > > getModuleServices()
void startUp(FloodlightModuleContext moduleContext)
Map< Class<? extends IFloodlightService >, IFloodlightService > getServiceImpls()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
Collection< Class<? extends IFloodlightService > > getModuleDependencies()
net
Definition: plan-b.py:46