Open Kilda Java Documentation
TopoDiscoParseBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.portstate.bolt;
2 
3 import org.apache.storm.topology.OutputFieldsDeclarer;
4 import org.apache.storm.tuple.Fields;
5 import org.apache.storm.tuple.Tuple;
6 import org.apache.storm.tuple.Values;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 
15 import java.io.IOException;
16 
18  private static final Logger logger = LoggerFactory.getLogger(TopoDiscoParseBolt.class);
19  public static final String TOPO_TO_PORT_INFO_STREAM = "parse.port.info.stream";
20  public static final String FIELD_NAME = PortInfoData.class.getSimpleName();
21 
22  @Override
23  public void execute(Tuple tuple) {
24  switch (tuple.getSourceComponent()) {
26  doParseMessage(tuple);
27  break;
28  default:
29  collector.ack(tuple);
30  }
31  }
32 
33  private void doParseMessage(Tuple tuple) {
34  try {
35  InfoData infoData = getInfoData(tuple);
36  if (infoData instanceof PortInfoData) {
37  collector.emit(TOPO_TO_PORT_INFO_STREAM, new Values((PortInfoData) infoData));
38  }
39  } catch (IOException e) {
40  logger.error("Error processing: {}", tuple.toString(), e);
41  } catch (MessageException e){
42  // don't really have to do anything but exception is thrown so catch it
43  } finally {
44  collector.ack(tuple);
45  }
46  }
47 
48  @Override
49  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
50  outputFieldsDeclarer.declareStream(TOPO_TO_PORT_INFO_STREAM, new Fields(FIELD_NAME));
51  }
52 }
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)