16 package org.openkilda.floodlight.kafka.producer;
21 import org.apache.kafka.clients.producer.Callback;
22 import org.apache.kafka.clients.producer.KafkaProducer;
23 import org.apache.kafka.clients.producer.RecordMetadata;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import java.util.HashMap;
32 private static final Logger logger = LoggerFactory.getLogger(
Producer.class);
34 private final org.apache.kafka.clients.producer.Producer producer;
35 private final Map<String, AbstractWorker> workersMap =
new HashMap<>();
41 Producer(
org.apache.kafka.clients.producer.Producer producer) {
42 this.producer = producer;
49 logger.debug(
"Enable predictable order for topic {}",
topic);
59 "Disable predictable order for topic {} (due to effect of transition period some future messages will " 60 +
"be forced to have predictable order)",
topic);
61 getWorker(
topic).deactivate(1000);
69 "Disable predictable order for topic {} (transition period {} ms)",
topic, transitionPeriod);
70 getWorker(
topic).deactivate(transitionPeriod);
84 if (!worker.isActive()) {
86 workersMap.put(
topic, worker);
91 private void reportError(String
topic,
Message message, Exception exception) {
93 "Fail to send message(correlationId=\"{}\") in kafka topic={}: {}",
97 private static class SendStatusCallback
implements Callback {
99 private final String
topic;
103 this.producer = producer;
105 this.message = message;
109 public void onCompletion(RecordMetadata metadata, Exception exception) {
110 String error = exception == null ? null : exception.
toString();
111 logger.debug(
"{}: {}, {}", this.getClass().getCanonicalName(), metadata, error);
113 if (exception == null) {
116 producer.reportError(
topic, message, exception);
synchronized void disableGuaranteedOrder(String topic, long transitionPeriod)
synchronized void disableGuaranteedOrder(String topic)
SendStatus sendMessage(String topic, Message message)
Producer(KafkaProducerConfig kafkaConfig)
SendStatus sendMessage(Message payload, Callback callback)
void sendMessageAndTrack(String topic, Message message)
synchronized void enableGuaranteedOrder(String topic)
String getCorrelationId()
default Properties createKafkaProducerProperties()