Open Kilda Java Documentation
CacheBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.cache;
17 
18 import static java.lang.String.format;
19 import static org.openkilda.messaging.Utils.MAPPER;
20 
45 import org.openkilda.pce.cache.Cache;
55 
56 import com.fasterxml.jackson.core.JsonProcessingException;
57 import com.google.common.annotations.VisibleForTesting;
58 import org.apache.storm.state.InMemoryKeyValueState;
59 import org.apache.storm.task.OutputCollector;
60 import org.apache.storm.task.TopologyContext;
61 import org.apache.storm.topology.OutputFieldsDeclarer;
62 import org.apache.storm.topology.base.BaseStatefulBolt;
63 import org.apache.storm.tuple.Tuple;
64 import org.apache.storm.tuple.Values;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67 
68 import java.io.IOException;
69 import java.util.HashSet;
70 import java.util.Map;
71 import java.util.Optional;
72 import java.util.Set;
73 import java.util.stream.Collectors;
74 
75 public class CacheBolt
76  extends BaseStatefulBolt<InMemoryKeyValueState<String, Cache>>
77  implements ICtrlBolt {
78  public static final String STREAM_ID_CTRL = "ctrl";
79 
83  private static final String NETWORK_CACHE = "network";
84 
88  private static final String FLOW_CACHE = "flow";
89 
93  private static final Logger logger = LoggerFactory.getLogger(CacheBolt.class);
94 
98  private NetworkCache networkCache;
99 
103  private FlowCache flowCache;
104 
108  private InMemoryKeyValueState<String, Cache> state;
109 
110  private final Auth pathComputerAuth;
111 
112  private TopologyContext context;
113  private OutputCollector outputCollector;
114 
115  CacheBolt(Auth pathComputerAuth) {
116  this.pathComputerAuth = pathComputerAuth;
117  }
118 
122  @Override
123  public void initState(InMemoryKeyValueState<String, Cache> state) {
124  this.state = state;
125 
126  networkCache = (NetworkCache) state.get(NETWORK_CACHE);
127  if (networkCache == null) {
128  networkCache = new NetworkCache();
129  this.state.put(NETWORK_CACHE, networkCache);
130  }
131 
132  flowCache = (FlowCache) state.get(FLOW_CACHE);
133  if (flowCache == null) {
134  flowCache = new FlowCache();
135  this.state.put(FLOW_CACHE, flowCache);
136  }
137 
138  logger.info("Request initial network state");
139 
140  final PathComputer pathComputer = new NeoDriver(pathComputerAuth.getDriver());
141  initFlowCache(pathComputer);
142  initNetwork(pathComputer);
143  }
144 
148  @Override
149  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
150  this.context = topologyContext;
151  this.outputCollector = outputCollector;
152  }
153 
157  @Override
158  public void execute(Tuple tuple) {
159  if (CtrlAction.boltHandlerEntrance(this, tuple)) {
160  return;
161  }
162  logger.trace("State before: {}", state);
163 
164  String json = tuple.getString(0);
165  String source = tuple.getSourceComponent();
166 
167  /*
168  (carmine) Hack Alert
169  1) merged two kafka topics into one;
170  2) previous logic used topic source to determine how to parse the message
171  3) new logic tries to parse it one way, then the next. Slightly inefficient.
172  */
173  // TODO: Eliminate the inefficiency introduced through the hack
174  try {
175  BaseMessage bm = MAPPER.readValue(json, BaseMessage.class);
176  if (bm instanceof InfoMessage) {
177  InfoMessage message = (InfoMessage) bm;
178  InfoData data = message.getData();
179 
180  if (data instanceof SwitchInfoData) {
181  logger.info("Cache update switch info data: {}", data);
182  handleSwitchEvent((SwitchInfoData) data, tuple, message.getCorrelationId());
183 
184  } else if (data instanceof IslInfoData) {
185  handleIslEvent((IslInfoData) data, tuple, message.getCorrelationId());
186 
187  } else if (data instanceof PortInfoData) {
188  handlePortEvent((PortInfoData) data, tuple, message.getCorrelationId());
189 
190  } else if (data instanceof FlowInfoData) {
191 
192  FlowInfoData flowData = (FlowInfoData) data;
193  handleFlowEvent(flowData, tuple, message.getCorrelationId());
194  } else if (data instanceof NetworkTopologyChange) {
195  logger.debug("Switch flows reroute request");
196 
198  handleNetworkTopologyChange(topologyChange, tuple, message.getCorrelationId());
199  } else {
200  logger.warn("Skip undefined info data type {}", json);
201  }
202  } else {
203  logger.warn("Skip undefined message type {}", json);
204  }
205 
206  } catch (CacheException exception) {
207  logger.error("Could not process message {}", tuple, exception);
208  } catch (IOException exception) {
209  logger.error("Could not deserialize message {}", tuple, exception);
210  } catch (Exception e) {
211  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
212  } finally {
213  outputCollector.ack(tuple);
214  }
215 
216  logger.trace("State after: {}", state);
217  }
218 
222  @Override
223  public void declareOutputFields(OutputFieldsDeclarer output) {
224  output.declareStream(StreamType.TPE.toString(), AbstractTopology.fieldMessage);
225  output.declareStream(StreamType.WFM_DUMP.toString(), AbstractTopology.fieldMessage);
226  output.declareStream(StreamType.OFE.toString(), AbstractTopology.fieldMessage);
227  // FIXME(dbogun): use proper tuple format
228  output.declareStream(STREAM_ID_CTRL, AbstractTopology.fieldMessage);
229  }
230 
231  private void handleSwitchEvent(SwitchInfoData sw, Tuple tuple, String correlationId) throws IOException {
232  logger.debug("State update switch {} message {}", sw.getSwitchId(), sw.getState());
233  Set<ImmutablePair<Flow, Flow>> affectedFlows;
234 
235  switch (sw.getState()) {
236 
237  case ADDED:
238  case ACTIVATED:
239  onSwitchUp(sw);
240  break;
241 
242  case REMOVED:
243  case DEACTIVATED:
244  if (networkCache.cacheContainsSwitch(sw.getSwitchId())) {
245  networkCache.updateSwitch(sw);
246  }
247 
248  // (crimi - 2018.04.17) - eliminating taking action on Switch down events ..
249  // primarily because floodlight can regularly drop a connection to the switch (or
250  // vice versa) and a new connection is made almost immediately. Essentially, a flap.
251  // Rather than reroute here .. what to see if an ISL goes down. This introduces a
252  // longer delay .. but a necessary dampening affect. The better solution
253  // is to kick of an immediate probe if we get such an event .. and the probe
254  // should confirm what is really happening.
255  //affectedFlows = flowCache.getActiveFlowsWithAffectedPath(sw.getSwitchId());
256  //String reason = String.format("switch %s is %s", sw.getSwitchId(), sw.getState());
257  //emitRerouteCommands(affectedFlows, tuple, correlationId, FlowOperation.UPDATE, reason);
258  break;
259 
260  case CACHED:
261  break;
262  case CHANGED:
263  break;
264 
265  default:
266  logger.warn("Unknown state update switch info message");
267  break;
268  }
269  }
270 
271  private void handleIslEvent(IslInfoData isl, Tuple tuple, String correlationId) {
272  logger.debug("State update isl {} message cached {}", isl.getId(), isl.getState());
273  Set<ImmutablePair<Flow, Flow>> affectedFlows;
274 
275  switch (isl.getState()) {
276  case DISCOVERED:
277  if (networkCache.cacheContainsIsl(isl.getId())) {
278  networkCache.updateIsl(isl);
279  } else {
280  if (isl.isSelfLooped()) {
281  logger.warn("Skipped self-looped ISL: {}", isl);
282  } else {
283  networkCache.createIsl(isl);
284  }
285  }
286  break;
287 
288  case FAILED:
289  case MOVED:
290  try {
291  networkCache.deleteIsl(isl.getId());
292  } catch (CacheException exception) {
293  logger.warn("{}:{}", exception.getErrorMessage(), exception.getErrorDescription());
294  }
295 
296  affectedFlows = flowCache.getActiveFlowsWithAffectedPath(isl);
297  String reason = String.format("isl %s FAILED", isl.getId());
298  emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
299  break;
300 
301  case OTHER_UPDATE:
302  break;
303 
304  case CACHED:
305  break;
306 
307  default:
308  logger.warn("Unknown state update isl info message");
309  break;
310  }
311  }
312 
313  private void handlePortEvent(PortInfoData port, Tuple tuple, String correlationId) {
314  logger.debug("State update port {}_{} message cached {}",
315  port.getSwitchId(), port.getPortNo(), port.getState());
316 
317  switch (port.getState()) {
318  case DOWN:
319  case DELETE:
320  Set<ImmutablePair<Flow, Flow>> affectedFlows = flowCache.getActiveFlowsWithAffectedPath(port);
321  String reason = String.format("port %s_%s is %s",
322  port.getSwitchId(), port.getPortNo(), port.getState());
323  emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
324  break;
325 
326  case UP:
327  case ADD:
328  break;
329 
330  case OTHER_UPDATE:
331  case CACHED:
332  break;
333 
334  default:
335  logger.warn("Unknown state update isl info message");
336  break;
337  }
338  }
339 
340  private void handleNetworkTopologyChange(NetworkTopologyChange topologyChange, Tuple tuple, String correlationId) {
341  Set<ImmutablePair<Flow, Flow>> affectedFlows;
342 
343  switch (topologyChange.getType()) {
344  case ENDPOINT_DROP:
345  // TODO(surabujin): need implementation
346  return;
347 
348  case ENDPOINT_ADD:
349  affectedFlows = getFlowsForRerouting();
350  break;
351 
352  default:
353  logger.error("Unhandled reroute type: {}", topologyChange.getType());
354  return;
355  }
356  String reason = String.format("network topology change %s_%s is %s",
357  topologyChange.getSwitchId(), topologyChange.getPortNumber(),
358  topologyChange.getType());
359  emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
360  }
361 
362  private void emitFlowMessage(InfoData data, Tuple tuple, String correlationId) throws IOException {
363  Message message = new InfoMessage(data, System.currentTimeMillis(),
364  correlationId, Destination.TOPOLOGY_ENGINE);
365  outputCollector.emit(StreamType.TPE.toString(), tuple, new Values(MAPPER.writeValueAsString(message)));
366  logger.debug("Flow command message sent");
367  }
368 
369  private void emitFlowCrudMessage(InfoData data, Tuple tuple, String correlationId) throws IOException {
370  Message message = new InfoMessage(data, System.currentTimeMillis(),
371  correlationId, Destination.WFM);
372  outputCollector.emit(StreamType.WFM_DUMP.toString(), tuple, new Values(MAPPER.writeValueAsString(message)));
373  logger.debug("Flow command message sent");
374  }
375 
376  private void emitRerouteCommands(Tuple input, Set<ImmutablePair<Flow, Flow>> flows,
377  String initialCorrelationId, String reason) {
378  for (ImmutablePair<Flow, Flow> flow : flows) {
379  final String flowId = flow.getLeft().getFlowId();
380  FlowRerouteRequest request = new FlowRerouteRequest(flowId);
381 
382  String correlationId = format("%s-%s", initialCorrelationId, flowId);
383 
384  String json;
385  try {
386  json = Utils.MAPPER.writeValueAsString(new CommandMessage(
387  request, System.currentTimeMillis(), correlationId, Destination.WFM));
388  } catch (JsonProcessingException exception) {
389  logger.error("Could not format flow reroute request by flow={}", flow, exception);
390  return;
391  }
392 
393  Values values = new Values(json);
394  outputCollector.emit(StreamType.WFM_DUMP.toString(), input, values);
395 
396  flow.getLeft().setState(FlowState.DOWN);
397  flow.getRight().setState(FlowState.DOWN);
398 
399  logger.warn("Flow {} reroute command message sent with correlationId {}, reason {}",
400  flowId, correlationId, reason);
401  }
402  }
403 
404  private void onSwitchUp(SwitchInfoData sw) throws IOException {
405  logger.info("Switch {} is {}", sw.getSwitchId(), sw.getState().getType());
406  if (networkCache.cacheContainsSwitch(sw.getSwitchId())) {
407  networkCache.updateSwitch(sw);
408  } else {
409  networkCache.createSwitch(sw);
410  }
411  }
412 
413  private void handleFlowEvent(FlowInfoData flowData, Tuple tuple, String correlationId) throws IOException {
414  switch (flowData.getOperation()) {
415  case PUSH:
416  case PUSH_PROPAGATE:
417  logger.debug("Flow {} message received: {}, correlationId: {}", flowData.getOperation(), flowData,
418  correlationId);
419  flowCache.putFlow(flowData.getPayload());
420  // do not emit to TPE .. NB will send directly
421  break;
422 
423  case UNPUSH:
424  case UNPUSH_PROPAGATE:
425  logger.trace("Flow {} message received: {}, correlationId: {}", flowData.getOperation(), flowData,
426  correlationId);
427  String flowsId2 = flowData.getPayload().getLeft().getFlowId();
428  flowCache.removeFlow(flowsId2);
429  logger.debug("Flow {} message processed: {}, correlationId: {}", flowData.getOperation(), flowData,
430  correlationId);
431  break;
432 
433 
434  case CREATE:
435  // TODO: This should be more lenient .. in case of retries
436  logger.trace("Flow create message received: {}, correlationId: {}", flowData, correlationId);
437  flowCache.putFlow(flowData.getPayload());
438  emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
439  logger.debug("Flow create message sent: {}, correlationId: {}", flowData, correlationId);
440  break;
441 
442  case DELETE:
443  // TODO: This should be more lenient .. in case of retries
444  logger.trace("Flow remove message received: {}, correlationId: {}", flowData, correlationId);
445  String flowsId = flowData.getPayload().getLeft().getFlowId();
446  flowCache.removeFlow(flowsId);
447  emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
448  logger.debug("Flow remove message sent: {}, correlationId: {} ", flowData, correlationId);
449  break;
450 
451  case UPDATE:
452  logger.trace("Flow update message received: {}, correlationId: {}", flowData, correlationId);
453  // TODO: This should be more lenient .. in case of retries
454  flowCache.putFlow(flowData.getPayload());
455  emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
456  logger.debug("Flow update message sent: {}, correlationId: {}", flowData, correlationId);
457  break;
458 
459  case STATE:
460  flowCache.putFlow(flowData.getPayload());
461  logger.debug("Flow state changed: {}, correlationId: {}", flowData, correlationId);
462  break;
463 
464  case CACHE:
465  logger.debug("Sync flow cache message received: {}, correlationId: {}", flowData, correlationId);
466  if (flowData.getPayload() != null) {
467  flowCache.putFlow(flowData.getPayload());
468  } else {
469  flowCache.removeFlow(flowData.getFlowId());
470  }
471  break;
472 
473  default:
474  logger.warn("Skip undefined flow operation {}", flowData);
475  break;
476  }
477  }
478 
479  private Set<ImmutablePair<Flow, Flow>> getFlowsForRerouting() {
480  Set<ImmutablePair<Flow, Flow>> inactiveFlows = flowCache.dumpFlows().stream()
481  .filter(flow -> FlowState.DOWN.equals(flow.getLeft().getState()))
482  .collect(Collectors.toSet());
483 
484  return inactiveFlows;
485  }
486 
487  private void initNetwork(PathComputer pathComputer) {
488  logger.info("Network Cache: Initializing");
489  Set<SwitchInfoData> switches = new HashSet<>(pathComputer.getSwitches());
490  Set<IslInfoData> links = new HashSet<>(pathComputer.getIsls());
491 
492  logger.info("Network Cache: Initializing - {} Switches (size)", switches.size());
493  logger.info("Network Cache: Initializing - {} ISLs (size)", links.size());
494 
495  //
496  // We will call createOrUpdate to ensure we can ignore duplicates.
497  //
498  // The alternative is to call networkCache::createOrUpdateSwitch / networkCache::createOrUpdateIsl
499  // networkCache.load(switches, links);
500 
501  switches.forEach(networkCache::createOrUpdateSwitch);
502 
503  for (IslInfoData isl : links) {
504  try {
505  if (isl.isSelfLooped()) {
506  logger.warn("Skipped self-looped ISL: {}", isl);
507  } else {
508  networkCache.createOrUpdateIsl(isl);
509  }
510  } catch (Exception e) {
511  logger.error("CacheBolt :: initNetwork :: add ISL caused error --> isl = {} ; Exception = {}", isl, e);
512  }
513  }
514 
515  logger.info("Network Cache: Initialized");
516  }
517 
518  private void initFlowCache(PathComputer pathComputer) {
519  logger.info("Flow Cache: Initializing");
520  PathComputerFlowFetcher flowFetcher = new PathComputerFlowFetcher(pathComputer);
521 
522  for (BidirectionalFlow bidirectionalFlow : flowFetcher.getFlows()) {
523  ImmutablePair<Flow, Flow> flowPair = new ImmutablePair<>(
524  bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
525  flowCache.pushFlow(flowPair);
526  }
527  logger.info("Flow Cache: Initialized");
528  }
529 
530  @Override
532  NetworkDump networkDump = new NetworkDump(
533  networkCache.dumpSwitches(), networkCache.dumpIsls());
534  FlowDump flowDump = new FlowDump(flowCache.dumpFlows());
535  return new CacheBoltState(networkDump, flowDump);
536  }
537 
538  @VisibleForTesting
539  @Override
540  public void clearState() {
541  logger.info("State clear request from test");
542  initState(new InMemoryKeyValueState<>());
543  }
544 
545  @Override
547  // Not implemented
548  NetworkDump networkDump = new NetworkDump(
549  new HashSet<>(),
550  new HashSet<>());
551  FlowDump flowDump = new FlowDump(new HashSet<>());
552  return new CacheBoltState(networkDump, flowDump);
553  }
554 
555  @Override
556  public String getCtrlStreamId() {
557  return STREAM_ID_CTRL;
558  }
559 
560  @Override
561  public TopologyContext getContext() {
562  return context;
563  }
564 
565  @Override
566  public OutputCollector getOutput() {
567  return outputCollector;
568  }
569 
570  @Override
571  public Optional<AbstractDumpState> dumpResorceCacheState() {
572  return Optional.of(new ResorceCacheBoltState(
573  flowCache.getAllocatedMeters(),
574  flowCache.getAllocatedVlans(),
575  flowCache.getAllocatedCookies()));
576  }
577 }
void pushFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:106
IslInfoData deleteIsl(String islId)
IslInfoData updateIsl(IslInfoData isl)
boolean cacheContainsIsl(String islId)
ImmutablePair< Flow, Flow > removeFlow(String flowId)
Definition: FlowCache.java:97
static final ObjectMapper MAPPER
Definition: Utils.java:31
SwitchInfoData updateSwitch(SwitchInfoData newSwitch)
Optional< AbstractDumpState > dumpResorceCacheState()
Definition: CacheBolt.java:571
IslInfoData createOrUpdateIsl(IslInfoData isl)
IslInfoData createIsl(IslInfoData isl)
void declareOutputFields(OutputFieldsDeclarer output)
Definition: CacheBolt.java:223
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
Definition: CacheBolt.java:149
SwitchInfoData createSwitch(SwitchInfoData newSwitch)
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
Definition: CacheBolt.java:546
Map< SwitchId, Set< Integer > > getAllocatedMeters()
Definition: FlowCache.java:549
Set< Integer > getAllocatedVlans()
Definition: FlowCache.java:541
Set< ImmutablePair< Flow, Flow > > dumpFlows()
Definition: FlowCache.java:342
source
Definition: nodes.py:53
Set< SwitchInfoData > dumpSwitches()
void initState(InMemoryKeyValueState< String, Cache > state)
Definition: CacheBolt.java:123
ImmutablePair< Flow, Flow > putFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:87
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
Definition: CtrlAction.java:78
Set< ImmutablePair< Flow, Flow > > getActiveFlowsWithAffectedPath(SwitchId switchId)
Definition: FlowCache.java:129
Set< Integer > getAllocatedCookies()
Definition: FlowCache.java:545
boolean cacheContainsSwitch(SwitchId switchId)