16 package org.openkilda.floodlight.kafka.producer;
18 import org.apache.kafka.clients.producer.Callback;
19 import org.apache.kafka.clients.producer.Producer;
20 import org.apache.kafka.clients.producer.ProducerRecord;
21 import org.apache.kafka.clients.producer.RecordMetadata;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 import java.util.concurrent.Future;
28 private static final Logger log = LoggerFactory.getLogger(
OrderAwareWorker.class);
31 private long expireAt = 0;
32 private Integer partition;
39 this.deep += other.deep;
40 this.partition = other.partition;
45 super(kafkaProducer, topic);
50 ProducerRecord<String, String> record;
51 if (partition == null) {
52 record =
new ProducerRecord<>(
getTopic(), payload);
54 record =
new ProducerRecord<>(
getTopic(), partition, null, payload);
58 if (partition == null) {
60 partition = promise.get().partition();
61 }
catch (Exception e) {
62 log.error(
"Can't determine kafka topic partition for order aware writing, due to send error " 63 +
"(will retry on next send attempt). Error: {}", e.toString());
71 void deactivate(
long transitionPeriod) {
73 throw new IllegalStateException(
"Number of .diable() calls have overcome number of .enable() calls");
78 expireAt = System.currentTimeMillis() + transitionPeriod;
87 return System.currentTimeMillis() < expireAt;
synchronized SendStatus send(String payload, Callback callback)
OrderAwareWorker(Producer< String, String > kafkaProducer, String topic)
Producer< String, String > getKafkaProducer()
OrderAwareWorker(AbstractWorker worker)