16 package org.openkilda.floodlight.kafka;
22 import net.floodlightcontroller.core.module.FloodlightModuleContext;
23 import net.floodlightcontroller.core.module.FloodlightModuleException;
24 import net.floodlightcontroller.core.module.IFloodlightModule;
25 import net.floodlightcontroller.core.module.IFloodlightService;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import java.util.ArrayList;
30 import java.util.Collection;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionHandler;
35 import java.util.concurrent.SynchronousQueue;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
51 public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
57 Collection<Class<? extends IFloodlightService>> services =
new ArrayList<>();
63 public void init(FloodlightModuleContext context) {
67 public void startUp(FloodlightModuleContext moduleContext)
throws FloodlightModuleException {
68 logger.info(
"Starting {}", this.getClass().getCanonicalName());
74 logger.info(
"Consumer executor threads count is {} (fixed)", consumerConfig.
getExecutorCount());
77 ExecutorService parseRecordExecutor =
new ThreadPoolExecutor(consumerConfig.
getExecutorCount(),
79 new SynchronousQueue<>(),
new RetryableExecutionHandler());
82 RecordHandler.Factory handlerFactory =
new RecordHandler.Factory(context);
90 consumer =
new Consumer(consumerConfig, parseRecordExecutor, handlerFactory, switchManager,
94 consumerConfig, parseRecordExecutor, handlerFactory, switchManager, inputTopic);
96 Executors.newSingleThreadExecutor().execute(
consumer);
97 }
catch (Exception exception) {
98 logger.error(
"error", exception);
106 private class RetryableExecutionHandler
implements RejectedExecutionHandler {
108 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
109 if (!executor.isShutdown()) {
111 executor.getQueue().put(r);
112 }
catch (InterruptedException e) {
113 logger.error(
"Couldn't retry to process message", e);
void init(FloodlightModuleContext context)
static void fillDependencies(Collection< Class<? extends IFloodlightService >> dependencies)
Collection< Class<? extends IFloodlightService > > getModuleServices()
void startUp(FloodlightModuleContext moduleContext)
Map< Class<? extends IFloodlightService >, IFloodlightService > getServiceImpls()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
default boolean isTestingMode()
Collection< Class<? extends IFloodlightService > > getModuleDependencies()