Open Kilda Java Documentation
AbstractWorker.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka.producer;
17 
20 
21 import com.fasterxml.jackson.core.JsonProcessingException;
22 import org.apache.kafka.clients.producer.Callback;
23 import org.apache.kafka.clients.producer.Producer;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 
27 public abstract class AbstractWorker {
28  private static final Logger log = LoggerFactory.getLogger(AbstractWorker.class);
29 
30  private final Producer<String, String> kafkaProducer;
31  private final String topic;
32 
33  public AbstractWorker(Producer<String, String> kafkaProducer, String topic) {
34  this.kafkaProducer = kafkaProducer;
35  this.topic = topic;
36  }
37 
41  public SendStatus sendMessage(Message payload, Callback callback) {
42  log.debug("Send kafka message: {} <== {}", getTopic(), payload);
43  String json = encode(payload);
44  return send(json, callback);
45  }
46 
47  protected abstract SendStatus send(String payload, Callback callback);
48 
49  protected String encode(Message message) {
50  String encoded;
51  try {
52  encoded = Utils.MAPPER.writeValueAsString(message);
53  } catch (JsonProcessingException e) {
54  throw new IllegalArgumentException(String.format("Can not serialize message: %s", e.toString()), e);
55  }
56 
57  return encoded;
58  }
59 
61  return kafkaProducer;
62  }
63 
64  protected String getTopic() {
65  return topic;
66  }
67 
68  void deactivate(long transitionPeriod) {}
69 
70  boolean isActive() {
71  return true;
72  }
73 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
AbstractWorker(Producer< String, String > kafkaProducer, String topic)
abstract SendStatus send(String payload, Callback callback)
SendStatus sendMessage(Message payload, Callback callback)