Open Kilda Java Documentation
WfmStatsParseBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.portstate.bolt;
2 
3 import org.apache.storm.topology.OutputFieldsDeclarer;
4 import org.apache.storm.tuple.Fields;
5 import org.apache.storm.tuple.Tuple;
6 import org.apache.storm.tuple.Values;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 
15 import java.io.IOException;
16 
18  private static final Logger logger = LoggerFactory.getLogger(WfmStatsParseBolt.class);
19  public static final String WFM_TO_PARSE_PORT_INFO_STREAM = "wfm.to.parse.port.info.stream";
20 
21  @Override
22  public void execute(Tuple tuple) {
23  logger.debug("Ingoing tuple: {}", tuple);
24  String request = tuple.getString(0);
25  try {
26  InfoData data = getInfoData(tuple);
27  if (data instanceof SwitchPortStatusData) {
28  doParseSwitchPortsData((SwitchPortStatusData) data);
29  }
30  } catch (MessageException e) {
31  logger.error("Not an InfoMessage in queue message={}", request);
32  } catch (IOException exception) {
33  logger.error("Could not deserialize message={} exception={}", request,
34  exception.getMessage());
35  } finally {
36  collector.ack(tuple);
37  logger.debug("Message ack: {}", request);
38  }
39  }
40 
41  private void doParseSwitchPortsData(SwitchPortStatusData data) {
42  data.getPorts()
43  .stream()
44  .forEach(port -> collector.emit(WFM_TO_PARSE_PORT_INFO_STREAM, new Values(
45  new PortInfoData(data.getSwitchId(), port.getId(), port.getStatus()))));
46  }
47 
48  @Override
49  public void declareOutputFields(OutputFieldsDeclarer declarer) {
50  declarer.declareStream(WFM_TO_PARSE_PORT_INFO_STREAM, new Fields(TopoDiscoParseBolt.FIELD_NAME));
51  }
52 }