1 package org.openkilda.atdd.floodlight;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
11 import java.util.Properties;
12 import java.util.concurrent.ExecutionException;
15 private static final Logger logger = LoggerFactory.getLogger(
KafkaBreaker.class);
17 private int MAX_ATTEMPTS = 3;
19 private KafkaProducer<String, String> producer;
22 producer =
new KafkaProducer<>(kafkaConfig);
37 case FLOODLIGHT_CONSUMER:
38 case FLOODLIGHT_PRODUCER:
39 topic =
"kilda.speaker";
45 ProducerRecord<String, String> record =
new ProducerRecord<>(
48 for (
int attempt = 0; attempt < MAX_ATTEMPTS; attempt += 1)
try {
50 "Send {} to {} ({} of {}})",
51 record.value(),
target.toString(), attempt + 1, MAX_ATTEMPTS);
52 producer.send(record).get();
54 }
catch (InterruptedException e) {
55 logger.warn(
"producer was interrupted (attempts {} of {})", attempt, MAX_ATTEMPTS);
57 }
catch (ExecutionException e) {
58 throw new KafkaBreakException(
"Unable to publish control message", e);
KafkaBreaker(Properties kafkaConfig)
void shutoff(KafkaBreakTarget target)
void restore(KafkaBreakTarget target)