Open Kilda Java Documentation
CacheBolt.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats.bolts;
17 
22 
36 
37 import org.apache.storm.task.OutputCollector;
38 import org.apache.storm.task.TopologyContext;
39 import org.apache.storm.topology.OutputFieldsDeclarer;
40 import org.apache.storm.topology.base.BaseRichBolt;
41 import org.apache.storm.tuple.Fields;
42 import org.apache.storm.tuple.Tuple;
43 import org.apache.storm.tuple.Values;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 
47 import java.util.HashMap;
48 import java.util.Map;
49 
50 public class CacheBolt extends BaseRichBolt {
51 
52  public static final String CACHE_FIELD = "cache";
53 
54  public static final Fields fieldsMessageFlowStats =
55  new Fields(
56  MESSAGE_FIELD,
57  CACHE_FIELD);
61  private static final Logger logger = LoggerFactory.getLogger(
62  CacheBolt.class);
63 
67  private final Auth pathComputerAuth;
68 
69  private TopologyContext context;
70  private OutputCollector outputCollector;
71 
75  private Map<Long, CacheFlowEntry> cookieToFlow = new HashMap<>();
76 
82  public CacheBolt(Auth pathComputerAuth) {
83  this.pathComputerAuth = pathComputerAuth;
84  }
85 
86  private void initFlowCache() {
87  try {
88  PathComputer pathComputer = new NeoDriver(pathComputerAuth.getDriver());
89  pathComputer.getAllFlows().forEach(
90  flow -> cookieToFlow.put(flow.getCookie(), new CacheFlowEntry(
91  flow.getFlowId(),
92  flow.getSourceSwitch().toOtsdFormat(),
93  flow.getDestinationSwitch().toOtsdFormat()))
94  );
95  logger.debug("initFlowCache: {}", cookieToFlow);
96  logger.info("Stats Cache: Initialized");
97  } catch (Exception ex) {
98  logger.error("Error on initFlowCache", ex);
99  }
100  }
101 
105  @Override
106  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
107  this.context = topologyContext;
108  this.outputCollector = outputCollector;
109  initFlowCache();
110  }
111 
115  @Override
116  public void execute(Tuple tuple) {
117 
118  try {
119 
120  StatsComponentType componentId = StatsComponentType.valueOf(tuple.getSourceComponent());
121 
122  if (componentId == STATS_CACHE_FILTER_BOLT) {
123 
124  Long cookie = tuple.getLongByField(FieldsNames.COOKIE.name());
125  String flow = tuple.getStringByField(FieldsNames.FLOW.name());
126  String sw = new SwitchId(tuple.getValueByField(FieldsNames.SWITCH.name()).toString()).toOtsdFormat();
127 
128  Commands command = (Commands) tuple.getValueByField(FieldsNames.COMMAND.name());
129  MeasurePoint measurePoint = (MeasurePoint) tuple.getValueByField(FieldsNames.MEASURE_POINT.name());
130 
131  switch (command) {
132  case UPDATE:
133  updateCacheEntry(cookie, flow, sw, measurePoint);
134  break;
135  case REMOVE:
136  cookieToFlow.remove(cookie);
137  break;
138  default:
139  logger.error("invalid command");
140  break;
141  }
142 
143  logger.debug("updated cookieToFlow: {}", cookieToFlow);
144  } else if (componentId == STATS_OFS_BOLT) {
145  InfoMessage message = (InfoMessage) tuple.getValueByField(MESSAGE_FIELD);
146 
147  FlowStatsData data = (FlowStatsData) message.getData();
148 
149  Map<Long, CacheFlowEntry> dataCache = new HashMap<>();
150  for (FlowStatsReply reply : data.getStats()) {
151  for (FlowStatsEntry entry : reply.getEntries()) {
152  if (cookieToFlow.containsKey(entry.getCookie())) {
153  CacheFlowEntry cacheFlowEntry = cookieToFlow.get(entry.getCookie());
154  dataCache.put(entry.getCookie(), cacheFlowEntry);
155  }
156  }
157  }
158  logger.debug("execute:dataCache: {}", dataCache);
159  Values values = new Values(message, dataCache);
160  outputCollector.emit(FLOW_STATS.name(), tuple, values);
161  }
162  } finally {
163  outputCollector.ack(tuple);
164  }
165  }
166 
170  @Override
171  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
172  outputFieldsDeclarer.declareStream(FLOW_STATS.name(),
174  }
175 
176  private void updateCacheEntry(Long cookie, String flowId, String sw, MeasurePoint measurePoint) {
177  CacheFlowEntry current = cookieToFlow.get(cookie);
178  CacheFlowEntry replacement;
179  if (current != null) {
180  replacement = current.replace(sw, measurePoint);
181  } else {
182  replacement = new CacheFlowEntry(flowId).replace(sw, measurePoint);
183  }
184  cookieToFlow.put(cookie, replacement);
185  }
186 }
CacheFlowEntry replace(String sw, MeasurePoint point)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
Definition: CacheBolt.java:106
def command(payload, fields)
Definition: share.py:102
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
Definition: CacheBolt.java:171