16 package org.openkilda.wfm.topology.stats.bolts;
37 import org.apache.storm.task.OutputCollector;
38 import org.apache.storm.task.TopologyContext;
39 import org.apache.storm.topology.OutputFieldsDeclarer;
40 import org.apache.storm.topology.base.BaseRichBolt;
41 import org.apache.storm.tuple.Fields;
42 import org.apache.storm.tuple.Tuple;
43 import org.apache.storm.tuple.Values;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import java.util.HashMap;
61 private static final Logger logger = LoggerFactory.getLogger(
67 private final Auth pathComputerAuth;
69 private TopologyContext context;
70 private OutputCollector outputCollector;
75 private Map<Long, CacheFlowEntry> cookieToFlow =
new HashMap<>();
83 this.pathComputerAuth = pathComputerAuth;
86 private void initFlowCache() {
92 flow.getSourceSwitch().toOtsdFormat(),
93 flow.getDestinationSwitch().toOtsdFormat()))
95 logger.debug(
"initFlowCache: {}", cookieToFlow);
96 logger.info(
"Stats Cache: Initialized");
97 }
catch (Exception ex) {
98 logger.error(
"Error on initFlowCache", ex);
106 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
107 this.context = topologyContext;
108 this.outputCollector = outputCollector;
122 if (componentId == STATS_CACHE_FILTER_BOLT) {
133 updateCacheEntry(cookie, flow, sw, measurePoint);
136 cookieToFlow.remove(cookie);
139 logger.error(
"invalid command");
143 logger.debug(
"updated cookieToFlow: {}", cookieToFlow);
144 }
else if (componentId == STATS_OFS_BOLT) {
149 Map<Long, CacheFlowEntry> dataCache =
new HashMap<>();
152 if (cookieToFlow.containsKey(entry.getCookie())) {
153 CacheFlowEntry cacheFlowEntry = cookieToFlow.get(entry.getCookie());
154 dataCache.put(entry.getCookie(), cacheFlowEntry);
158 logger.debug(
"execute:dataCache: {}", dataCache);
159 Values values =
new Values(message, dataCache);
160 outputCollector.emit(FLOW_STATS.name(), tuple, values);
163 outputCollector.ack(tuple);
172 outputFieldsDeclarer.declareStream(FLOW_STATS.name(),
176 private void updateCacheEntry(Long cookie, String flowId, String sw,
MeasurePoint measurePoint) {
179 if (current != null) {
180 replacement = current.
replace(sw, measurePoint);
184 cookieToFlow.put(cookie, replacement);
CacheFlowEntry replace(String sw, MeasurePoint point)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final String MESSAGE_FIELD
void execute(Tuple tuple)
CacheBolt(Auth pathComputerAuth)
def command(payload, fields)
static final Fields fieldsMessageFlowStats
default List< Flow > getAllFlows()
static final String CACHE_FIELD
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)