16 package org.openkilda.wfm.topology.stats.bolts;
30 import org.apache.storm.task.OutputCollector;
31 import org.apache.storm.task.TopologyContext;
32 import org.apache.storm.topology.OutputFieldsDeclarer;
33 import org.apache.storm.topology.base.BaseRichBolt;
34 import org.apache.storm.tuple.Tuple;
35 import org.apache.storm.tuple.Values;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import java.io.IOException;
43 private static final Logger logger = LoggerFactory.getLogger(
SpeakerBolt.class);
48 private OutputCollector outputCollector;
55 logger.debug(
"Ingoing tuple: {}", tuple);
56 String request = tuple.getString(0);
66 logger.debug(
"Port stats message: {}",
new Values(request));
67 outputCollector.emit(PORT_STATS_STREAM, tuple,
new Values(message));
69 logger.debug(
"Meter config stats message: {}",
new Values(request));
70 outputCollector.emit(METER_CFG_STATS_STREAM, tuple,
new Values(message));
72 logger.debug(
"Flow stats message: {}",
new Values(request));
73 outputCollector.emit(FLOW_STATS_STREAM, tuple,
new Values(message));
75 }
catch (IOException exception) {
76 logger.error(
"Could not deserialize message={}", request, exception);
78 outputCollector.ack(tuple);
79 logger.debug(
"Message ack: {}", request);
88 outputFieldsDeclarer.declareStream(PORT_STATS_STREAM, fieldMessage);
89 outputFieldsDeclarer.declareStream(METER_CFG_STATS_STREAM, fieldMessage);
90 outputFieldsDeclarer.declareStream(FLOW_STATS_STREAM, fieldMessage);
97 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
98 this.outputCollector = outputCollector;
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final Fields fieldMessage
Destination getDestination()
void execute(Tuple tuple)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)