16 package org.openkilda.floodlight.kafka.producer;
21 import com.fasterxml.jackson.core.JsonProcessingException;
22 import org.apache.kafka.clients.producer.Callback;
23 import org.apache.kafka.clients.producer.Producer;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 private static final Logger log = LoggerFactory.getLogger(
AbstractWorker.class);
31 private final String topic;
34 this.kafkaProducer = kafkaProducer;
42 log.debug(
"Send kafka message: {} <== {}",
getTopic(), payload);
43 String json =
encode(payload);
44 return send(json, callback);
47 protected abstract SendStatus send(String payload, Callback callback);
53 }
catch (JsonProcessingException e) {
54 throw new IllegalArgumentException(String.format(
"Can not serialize message: %s", e.toString()), e);
68 void deactivate(
long transitionPeriod) {}
static final ObjectMapper MAPPER
AbstractWorker(Producer< String, String > kafkaProducer, String topic)
String encode(Message message)
abstract SendStatus send(String payload, Callback callback)
SendStatus sendMessage(Message payload, Callback callback)
Producer< String, String > getKafkaProducer()