Open Kilda Java Documentation
AbstractKafkaParserBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.utils;
2 
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.base.BaseRichBolt;
6 import org.apache.storm.tuple.Tuple;
12 
13 import java.io.IOException;
14 import java.util.Map;
15 
16 public abstract class AbstractKafkaParserBolt extends BaseRichBolt {
17  protected OutputCollector collector;
18 
19  protected String getJson(Tuple tuple) {
20  return tuple.getString(0);
21  }
22 
23  protected Message getMessage(String json) throws IOException {
24  return Utils.MAPPER.readValue(json, Message.class);
25  }
26 
27  protected InfoData getInfoData(Message message) throws MessageException {
28  if (!(message instanceof InfoMessage)) {
29  throw new MessageException(message.getClass().getName() + " is not an InfoMessage");
30  }
31  InfoData data = ((InfoMessage) message).getData();
32  return data;
33  }
34 
35  protected InfoData getInfoData(Tuple tuple) throws IOException, MessageException {
36  return getInfoData(getMessage(getJson(tuple)));
37  }
38 
39  @Override
40  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
41  this.collector = outputCollector;
42  }
43 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)