16 package org.openkilda.wfm.topology;
20 import org.apache.kafka.clients.producer.KafkaProducer;
21 import org.apache.kafka.clients.producer.ProducerRecord;
22 import org.apache.storm.utils.Utils;
24 import java.util.Properties;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
30 private static final long SEND_TIMEOUT = 1000;
31 private final KafkaProducer<String, String> producer;
34 this.producer =
new KafkaProducer<>(properties);
38 ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic, PAYLOAD,
data);
40 producer.send(producerRecord).get(SEND_TIMEOUT, TimeUnit.MILLISECONDS);
42 System.out.println(String.format(
"send to %s: %s",
topic,
data));
43 Utils.sleep(SEND_TIMEOUT);
44 }
catch (InterruptedException | TimeoutException | ExecutionException e) {
45 System.out.println(e.getMessage());
50 ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic, PAYLOAD,
data);
51 producer.send(producerRecord);
static final String PAYLOAD
void pushMessage(final String topic, final String data)
void pushMessageAsync(final String topic, final String data)
TestKafkaProducer(final Properties properties)