Open Kilda Java Documentation
ParsePortInfoBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.portstate.bolt;
2 
3 import com.google.common.collect.HashBasedTable;
4 import com.google.common.collect.Table;
5 import org.apache.storm.task.OutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.topology.base.BaseRichBolt;
9 import org.apache.storm.tuple.Tuple;
15 
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 
19 import java.io.IOException;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 
25 public class ParsePortInfoBolt extends BaseRichBolt {
26  private static final Logger logger = LoggerFactory.getLogger(ParsePortInfoBolt.class);
27  private static final String METRIC_NAME = "pen.switch.state";
28  private OutputCollector collector;
29  private Table<String, Integer, Map<String, String>> tagsTable;
30 
31  @Override
32  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
33  this.collector = outputCollector;
34  tagsTable = HashBasedTable.create();
35  }
36 
37  @Override
38  public void execute(Tuple tuple) {
39  try {
40  if (tuple.getValueByField(TopoDiscoParseBolt.FIELD_NAME) instanceof PortInfoData) {
41  try {
43  .getValueByField(TopoDiscoParseBolt.FIELD_NAME);
44 
45  if (getStateAsInt(port) >= 0) {
46  List<Object> result = makeTsdbDatapoint(port);
47  logger.debug("Emitting: {}", result);
48  collector.emit(result);
49  }
50  else if(logger.isDebugEnabled()) {
51  List<Object> result = makeTsdbDatapoint(port);
52  logger.debug("Skip: {}", result);
53  }
54 
55  } catch (IOException e) {
56  logger.error("Error creating tsdbDatapoint for: {}", tuple.toString(), e);
57  }
58  }
59  } finally {
60  collector.ack(tuple);
61  }
62  }
63 
64  private List<Object> makeTsdbDatapoint(PortInfoData data) throws IOException {
65  return tsdbTuple(METRIC_NAME,
66  data.getTimestamp(),
67  getStateAsInt(data),
68  getTags(data));
69  }
70 
71  private int getStateAsInt(PortInfoData data) {
72 
73  PortChangeType state = data.getState();
74 
75  if (state.equals(PortChangeType.UP) || state.equals(PortChangeType.ADD)) {
76  return 1;
77  }
78 
79  if (state.equals(PortChangeType.DOWN) || state.equals(PortChangeType.DELETE)) {
80  return 0;
81  }
82 
83  return -1; // We skip others state here
84  }
85 
86  private static List<Object> tsdbTuple(String metric, long timestamp, Number value, Map<String, String> tag)
87  throws IOException {
88  Datapoint datapoint = new Datapoint(metric, timestamp, tag, value);
89  return Collections.singletonList(Utils.MAPPER.writeValueAsString(datapoint));
90  }
91 
92  private Map<String, String> getTags(PortInfoData data) {
93  Map<String, String> tag = tagsTable.get(data.getSwitchId(), data.getPortNo());
94  if (tag == null) {
95  tag = new HashMap<>();
96  tag.put("switchid", data.getSwitchId().toOtsdFormat());
97  tag.put("port", String.valueOf(data.getPortNo()));
98  tagsTable.put(data.getSwitchId().toString(), data.getPortNo(), tag);
99  }
100  return tag;
101  }
102 
103  @Override
104  public void declareOutputFields(OutputFieldsDeclarer declarer) {
105  declarer.declare(AbstractTopology.fieldMessage);
106  }
107 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
value
Definition: nodes.py:62
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
list result
Definition: plan-d.py:72