16 package org.openkilda.wfm.topology.islstats.bolts;
18 import org.apache.storm.task.OutputCollector;
19 import org.apache.storm.task.TopologyContext;
20 import org.apache.storm.topology.OutputFieldsDeclarer;
21 import org.apache.storm.topology.base.BaseRichBolt;
22 import org.apache.storm.tuple.Tuple;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import java.io.IOException;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
43 private OutputCollector collector;
44 private static final Logger logger = LoggerFactory.getLogger(
IslStatsBolt.class);
46 private static List<Object> tsdbTuple(String metric,
long timestamp, Number
value, Map<String, String> tag)
49 return Collections.singletonList(
Utils.
MAPPER.writeValueAsString(datapoint));
53 public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
54 this.collector = collector;
58 List<PathNode>
path =
data.getPath();
59 Map<String, String> tags =
new HashMap<>();
60 tags.put(
"src_switch",
path.get(0).getSwitchId().toOtsdFormat());
61 tags.put(
"src_port", String.valueOf(
path.get(0).getPortNo()));
62 tags.put(
"dst_switch",
path.get(1).getSwitchId().toOtsdFormat());
63 tags.put(
"dst_port", String.valueOf(
path.get(1).getPortNo()));
65 return tsdbTuple(
"pen.isl.latency", timestamp,
data.getLatency(), tags);
69 return tuple.getString(0);
78 throw new Exception(message.getClass().getName() +
" is not an InfoMessage");
86 throw new Exception(
data.getClass().getName() +
" is not an IslInfoData");
93 logger.debug(
"tuple: " + tuple);
99 logger.debug(
"emit: " + results);
100 collector.emit(results);
101 }
catch(IOException e) {
102 logger.error(
"Could not deserialize message={}", json, e);
103 }
catch(Exception e) {
106 collector.ack(tuple);
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer declarer)
static final Fields fieldMessage
String getJson(Tuple tuple)
Message getMessage(String json)
IslInfoData getIslInfoData(InfoData data)
List< Object > buildTsdbTuple(IslInfoData data, long timestamp)
InfoData getInfoData(Message message)
void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
void execute(Tuple tuple)