Open Kilda Java Documentation
Producer.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka.producer;
17 
20 
21 import org.apache.kafka.clients.producer.Callback;
22 import org.apache.kafka.clients.producer.KafkaProducer;
23 import org.apache.kafka.clients.producer.RecordMetadata;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 
27 import java.util.HashMap;
28 import java.util.Map;
29 
30 public class Producer {
31 
32  private static final Logger logger = LoggerFactory.getLogger(Producer.class);
33 
34  private final org.apache.kafka.clients.producer.Producer producer;
35  private final Map<String, AbstractWorker> workersMap = new HashMap<>();
36 
37  public Producer(KafkaProducerConfig kafkaConfig) {
38  this(new KafkaProducer<>(kafkaConfig.createKafkaProducerProperties()));
39  }
40 
41  Producer(org.apache.kafka.clients.producer.Producer producer) {
42  this.producer = producer;
43  }
44 
48  public synchronized void enableGuaranteedOrder(String topic) {
49  logger.debug("Enable predictable order for topic {}", topic);
50  AbstractWorker worker = getWorker(topic);
51  workersMap.put(topic, new OrderAwareWorker(worker));
52  }
53 
57  public synchronized void disableGuaranteedOrder(String topic) {
58  logger.debug(
59  "Disable predictable order for topic {} (due to effect of transition period some future messages will "
60  + "be forced to have predictable order)", topic);
61  getWorker(topic).deactivate(1000);
62  }
63 
67  public synchronized void disableGuaranteedOrder(String topic, long transitionPeriod) {
68  logger.debug(
69  "Disable predictable order for topic {} (transition period {} ms)", topic, transitionPeriod);
70  getWorker(topic).deactivate(transitionPeriod);
71  }
72 
73  public void sendMessageAndTrack(String topic, Message message) {
74  getWorker(topic).sendMessage(message, new SendStatusCallback(this, topic, message));
75  }
76 
77  public SendStatus sendMessage(String topic, Message message) {
78  return getWorker(topic).sendMessage(message, null);
79  }
80 
81  private AbstractWorker getWorker(String topic) {
82  AbstractWorker worker = workersMap.computeIfAbsent(
83  topic, t -> new DefaultWorker(producer, t));
84  if (!worker.isActive()) {
85  worker = new DefaultWorker(producer, topic);
86  workersMap.put(topic, worker);
87  }
88  return worker;
89  }
90 
91  private void reportError(String topic, Message message, Exception exception) {
92  logger.error(
93  "Fail to send message(correlationId=\"{}\") in kafka topic={}: {}",
94  message.getCorrelationId(), topic, exception.toString());
95  }
96 
97  private static class SendStatusCallback implements Callback {
98  private final Producer producer;
99  private final String topic;
100  private final Message message;
101 
102  SendStatusCallback(Producer producer, String topic, Message message) {
103  this.producer = producer;
104  this.topic = topic;
105  this.message = message;
106  }
107 
108  @Override
109  public void onCompletion(RecordMetadata metadata, Exception exception) {
110  String error = exception == null ? null : exception.toString();
111  logger.debug("{}: {}, {}", this.getClass().getCanonicalName(), metadata, error);
112 
113  if (exception == null) {
114  return;
115  }
116  producer.reportError(topic, message, exception);
117  }
118  }
119 }
synchronized void disableGuaranteedOrder(String topic, long transitionPeriod)
Definition: Producer.java:67
synchronized void disableGuaranteedOrder(String topic)
Definition: Producer.java:57
SendStatus sendMessage(String topic, Message message)
Definition: Producer.java:77
Producer(KafkaProducerConfig kafkaConfig)
Definition: Producer.java:37
SendStatus sendMessage(Message payload, Callback callback)
void sendMessageAndTrack(String topic, Message message)
Definition: Producer.java:73
synchronized void enableGuaranteedOrder(String topic)
Definition: Producer.java:48