1 package org.openkilda.wfm.topology.portstate.bolt;
3 import com.google.common.collect.HashBasedTable;
4 import com.google.common.collect.Table;
5 import org.apache.storm.task.OutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.topology.base.BaseRichBolt;
9 import org.apache.storm.tuple.Tuple;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 import java.io.IOException;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
26 private static final Logger logger = LoggerFactory.getLogger(
ParsePortInfoBolt.class);
27 private static final String METRIC_NAME =
"pen.switch.state";
28 private OutputCollector collector;
29 private Table<String, Integer, Map<String, String>> tagsTable;
32 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
33 this.collector = outputCollector;
34 tagsTable = HashBasedTable.create();
45 if (getStateAsInt(
port) >= 0) {
47 logger.debug(
"Emitting: {}",
result);
50 else if(logger.isDebugEnabled()) {
52 logger.debug(
"Skip: {}",
result);
55 }
catch (IOException e) {
56 logger.error(
"Error creating tsdbDatapoint for: {}", tuple.toString(), e);
64 private List<Object> makeTsdbDatapoint(
PortInfoData data)
throws IOException {
65 return tsdbTuple(METRIC_NAME,
86 private static List<Object> tsdbTuple(String metric,
long timestamp, Number
value, Map<String, String> tag)
88 Datapoint datapoint =
new Datapoint(metric, timestamp, tag,
value);
89 return Collections.singletonList(
Utils.
MAPPER.writeValueAsString(datapoint));
92 private Map<String, String> getTags(PortInfoData
data) {
93 Map<String, String> tag = tagsTable.get(
data.getSwitchId(),
data.getPortNo());
95 tag =
new HashMap<>();
96 tag.put(
"switchid",
data.getSwitchId().toOtsdFormat());
97 tag.put(
"port", String.valueOf(
data.getPortNo()));
98 tagsTable.put(
data.getSwitchId().toString(),
data.getPortNo(), tag);
static final ObjectMapper MAPPER
void execute(Tuple tuple)
static final Fields fieldMessage
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
void declareOutputFields(OutputFieldsDeclarer declarer)
static final String FIELD_NAME