16 package org.openkilda.floodlight.kafka;
24 import org.apache.kafka.clients.consumer.ConsumerRecord;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ExecutorService;
33 private static final Logger logger = LoggerFactory.getLogger(
TestAwareConsumer.class);
36 private List<KafkaBreakTrigger> expectedTriggers;
40 super(kafkaConfig, handlersPool, handlerFactory, switchManager,
topic, moreTopics);
44 expectedTriggers =
new ArrayList<>();
45 expectedTriggers.add(breakTrigger);
54 protected void handle(ConsumerRecord<String, String> record) {
55 boolean isHandled =
false;
57 if (!trigger.handle(record.key(), record.value())) {
69 logger.info(
"Suppress record - key: {}, value: {}", record.key(), record.value());
boolean isCommunicationEnabled()
void handle(ConsumerRecord< String, String > record)
TestAwareConsumer(ConsumerContext context, KafkaConsumerConfig kafkaConfig, ExecutorService handlersPool, Factory handlerFactory, ISwitchManager switchManager, String topic, String... moreTopics)
KafkaMessageProducer getKafkaProducer()