Open Kilda Java Documentation
KafkaBreaker.java
Go to the documentation of this file.
1 package org.openkilda.atdd.floodlight;
2 
5 
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 
11 import java.util.Properties;
12 import java.util.concurrent.ExecutionException;
13 
14 public class KafkaBreaker {
15  private static final Logger logger = LoggerFactory.getLogger(KafkaBreaker.class);
16 
17  private int MAX_ATTEMPTS = 3;
18 
19  private KafkaProducer<String, String> producer;
20 
21  public KafkaBreaker(Properties kafkaConfig) {
22  producer = new KafkaProducer<>(kafkaConfig);
23  }
24 
27  }
28 
31  }
32 
33  private void setState(KafkaBreakTarget target, KafkaBreakerAction action) throws KafkaBreakException {
34  String topic;
35 
36  switch (target) {
37  case FLOODLIGHT_CONSUMER:
38  case FLOODLIGHT_PRODUCER:
39  topic = "kilda.speaker"; // FIXME(surabujin) - wait till @nmarchenko push kafka related stuff for ATDD
40  break;
41  default:
42  throw new KafkaBreakException(String.format("Unsupported target: %s", target.toString()));
43  }
44 
45  ProducerRecord<String, String> record = new ProducerRecord<>(
46  topic, target.toString(), action.toString());
47  try {
48  for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt += 1) try {
49  logger.debug(
50  "Send {} to {} ({} of {}})",
51  record.value(), target.toString(), attempt + 1, MAX_ATTEMPTS);
52  producer.send(record).get();
53  break;
54  } catch (InterruptedException e) {
55  logger.warn("producer was interrupted (attempts {} of {})", attempt, MAX_ATTEMPTS);
56  }
57  } catch (ExecutionException e) {
58  throw new KafkaBreakException("Unable to publish control message", e);
59  }
60  }
61 }
void shutoff(KafkaBreakTarget target)
void restore(KafkaBreakTarget target)
target
Definition: nodes.py:50