16 package org.openkilda.wfm.topology.cache;
18 import static java.lang.String.format;
56 import com.fasterxml.jackson.core.JsonProcessingException;
57 import com.google.common.annotations.VisibleForTesting;
58 import org.apache.storm.state.InMemoryKeyValueState;
59 import org.apache.storm.task.OutputCollector;
60 import org.apache.storm.task.TopologyContext;
61 import org.apache.storm.topology.OutputFieldsDeclarer;
62 import org.apache.storm.topology.base.BaseStatefulBolt;
63 import org.apache.storm.tuple.Tuple;
64 import org.apache.storm.tuple.Values;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 import java.io.IOException;
69 import java.util.HashSet;
71 import java.util.Optional;
73 import java.util.stream.Collectors;
76 extends BaseStatefulBolt<InMemoryKeyValueState<String, Cache>>
83 private static final String NETWORK_CACHE =
"network";
88 private static final String FLOW_CACHE =
"flow";
93 private static final Logger logger = LoggerFactory.getLogger(
CacheBolt.class);
108 private InMemoryKeyValueState<String, Cache> state;
110 private final Auth pathComputerAuth;
112 private TopologyContext context;
113 private OutputCollector outputCollector;
116 this.pathComputerAuth = pathComputerAuth;
123 public void initState(InMemoryKeyValueState<String, Cache> state) {
127 if (networkCache == null) {
129 this.state.put(NETWORK_CACHE, networkCache);
132 flowCache = (
FlowCache) state.get(FLOW_CACHE);
133 if (flowCache == null) {
135 this.state.put(FLOW_CACHE, flowCache);
138 logger.info(
"Request initial network state");
141 initFlowCache(pathComputer);
142 initNetwork(pathComputer);
149 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
150 this.context = topologyContext;
151 this.outputCollector = outputCollector;
162 logger.trace(
"State before: {}", state);
164 String json = tuple.getString(0);
165 String
source = tuple.getSourceComponent();
181 logger.info(
"Cache update switch info data: {}",
data);
195 logger.debug(
"Switch flows reroute request");
198 handleNetworkTopologyChange(topologyChange, tuple, message.
getCorrelationId());
200 logger.warn(
"Skip undefined info data type {}", json);
203 logger.warn(
"Skip undefined message type {}", json);
207 logger.error(
"Could not process message {}", tuple, exception);
208 }
catch (IOException exception) {
209 logger.error(
"Could not deserialize message {}", tuple, exception);
210 }
catch (Exception e) {
211 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
213 outputCollector.ack(tuple);
216 logger.trace(
"State after: {}", state);
231 private void handleSwitchEvent(
SwitchInfoData sw, Tuple tuple, String correlationId)
throws IOException {
232 logger.debug(
"State update switch {} message {}", sw.getSwitchId(), sw.getState());
233 Set<ImmutablePair<Flow, Flow>> affectedFlows;
235 switch (sw.getState()) {
266 logger.warn(
"Unknown state update switch info message");
271 private void handleIslEvent(IslInfoData isl, Tuple tuple, String correlationId) {
272 logger.debug(
"State update isl {} message cached {}", isl.getId(), isl.getState());
273 Set<ImmutablePair<Flow, Flow>> affectedFlows;
275 switch (isl.getState()) {
280 if (isl.isSelfLooped()) {
281 logger.warn(
"Skipped self-looped ISL: {}", isl);
292 }
catch (CacheException exception) {
293 logger.warn(
"{}:{}", exception.getErrorMessage(), exception.getErrorDescription());
297 String reason = String.format(
"isl %s FAILED", isl.getId());
298 emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
308 logger.warn(
"Unknown state update isl info message");
313 private void handlePortEvent(PortInfoData
port, Tuple tuple, String correlationId) {
314 logger.debug(
"State update port {}_{} message cached {}",
317 switch (
port.getState()) {
321 String reason = String.format(
"port %s_%s is %s",
323 emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
335 logger.warn(
"Unknown state update isl info message");
340 private void handleNetworkTopologyChange(NetworkTopologyChange topologyChange, Tuple tuple, String correlationId) {
341 Set<ImmutablePair<Flow, Flow>> affectedFlows;
343 switch (topologyChange.getType()) {
349 affectedFlows = getFlowsForRerouting();
353 logger.error(
"Unhandled reroute type: {}", topologyChange.getType());
356 String reason = String.format(
"network topology change %s_%s is %s",
357 topologyChange.getSwitchId(), topologyChange.getPortNumber(),
358 topologyChange.getType());
359 emitRerouteCommands(tuple, affectedFlows, correlationId, reason);
362 private void emitFlowMessage(InfoData
data, Tuple tuple, String correlationId)
throws IOException {
363 Message message =
new InfoMessage(
data, System.currentTimeMillis(),
364 correlationId, Destination.TOPOLOGY_ENGINE);
365 outputCollector.emit(StreamType.TPE.toString(), tuple,
new Values(MAPPER.writeValueAsString(message)));
366 logger.debug(
"Flow command message sent");
369 private void emitFlowCrudMessage(InfoData
data, Tuple tuple, String correlationId)
throws IOException {
370 Message message =
new InfoMessage(
data, System.currentTimeMillis(),
371 correlationId, Destination.WFM);
372 outputCollector.emit(StreamType.WFM_DUMP.toString(), tuple,
new Values(MAPPER.writeValueAsString(message)));
373 logger.debug(
"Flow command message sent");
376 private void emitRerouteCommands(Tuple input, Set<ImmutablePair<Flow, Flow>> flows,
377 String initialCorrelationId, String reason) {
378 for (ImmutablePair<Flow, Flow> flow : flows) {
379 final String flowId = flow.getLeft().getFlowId();
380 FlowRerouteRequest request =
new FlowRerouteRequest(flowId);
382 String correlationId =
format(
"%s-%s", initialCorrelationId, flowId);
386 json = Utils.MAPPER.writeValueAsString(
new CommandMessage(
387 request, System.currentTimeMillis(), correlationId, Destination.WFM));
388 }
catch (JsonProcessingException exception) {
389 logger.error(
"Could not format flow reroute request by flow={}", flow, exception);
393 Values values =
new Values(json);
394 outputCollector.emit(StreamType.WFM_DUMP.toString(), input, values);
396 flow.getLeft().setState(FlowState.DOWN);
397 flow.getRight().setState(FlowState.DOWN);
399 logger.warn(
"Flow {} reroute command message sent with correlationId {}, reason {}",
400 flowId, correlationId, reason);
404 private void onSwitchUp(SwitchInfoData sw)
throws IOException {
405 logger.info(
"Switch {} is {}", sw.getSwitchId(), sw.getState().getType());
413 private void handleFlowEvent(FlowInfoData flowData, Tuple tuple, String correlationId)
throws IOException {
414 switch (flowData.getOperation()) {
417 logger.debug(
"Flow {} message received: {}, correlationId: {}", flowData.getOperation(), flowData,
419 flowCache.
putFlow(flowData.getPayload());
424 case UNPUSH_PROPAGATE:
425 logger.trace(
"Flow {} message received: {}, correlationId: {}", flowData.getOperation(), flowData,
427 String flowsId2 = flowData.getPayload().getLeft().getFlowId();
429 logger.debug(
"Flow {} message processed: {}, correlationId: {}", flowData.getOperation(), flowData,
436 logger.trace(
"Flow create message received: {}, correlationId: {}", flowData, correlationId);
437 flowCache.
putFlow(flowData.getPayload());
438 emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
439 logger.debug(
"Flow create message sent: {}, correlationId: {}", flowData, correlationId);
444 logger.trace(
"Flow remove message received: {}, correlationId: {}", flowData, correlationId);
445 String flowsId = flowData.getPayload().getLeft().getFlowId();
447 emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
448 logger.debug(
"Flow remove message sent: {}, correlationId: {} ", flowData, correlationId);
452 logger.trace(
"Flow update message received: {}, correlationId: {}", flowData, correlationId);
454 flowCache.
putFlow(flowData.getPayload());
455 emitFlowMessage(flowData, tuple, flowData.getCorrelationId());
456 logger.debug(
"Flow update message sent: {}, correlationId: {}", flowData, correlationId);
460 flowCache.
putFlow(flowData.getPayload());
461 logger.debug(
"Flow state changed: {}, correlationId: {}", flowData, correlationId);
465 logger.debug(
"Sync flow cache message received: {}, correlationId: {}", flowData, correlationId);
466 if (flowData.getPayload() != null) {
467 flowCache.
putFlow(flowData.getPayload());
474 logger.warn(
"Skip undefined flow operation {}", flowData);
479 private Set<ImmutablePair<Flow, Flow>> getFlowsForRerouting() {
480 Set<ImmutablePair<Flow, Flow>> inactiveFlows = flowCache.
dumpFlows().stream()
481 .filter(flow -> FlowState.DOWN.equals(flow.getLeft().getState()))
482 .collect(Collectors.toSet());
484 return inactiveFlows;
487 private void initNetwork(PathComputer pathComputer) {
488 logger.info(
"Network Cache: Initializing");
489 Set<SwitchInfoData> switches =
new HashSet<>(pathComputer.getSwitches());
490 Set<IslInfoData> links =
new HashSet<>(pathComputer.getIsls());
492 logger.info(
"Network Cache: Initializing - {} Switches (size)", switches.size());
493 logger.info(
"Network Cache: Initializing - {} ISLs (size)", links.size());
501 switches.forEach(networkCache::createOrUpdateSwitch);
503 for (IslInfoData isl : links) {
505 if (isl.isSelfLooped()) {
506 logger.warn(
"Skipped self-looped ISL: {}", isl);
510 }
catch (Exception e) {
511 logger.error(
"CacheBolt :: initNetwork :: add ISL caused error --> isl = {} ; Exception = {}", isl, e);
515 logger.info(
"Network Cache: Initialized");
518 private void initFlowCache(PathComputer pathComputer) {
519 logger.info(
"Flow Cache: Initializing");
520 PathComputerFlowFetcher flowFetcher =
new PathComputerFlowFetcher(pathComputer);
522 for (BidirectionalFlow bidirectionalFlow : flowFetcher.getFlows()) {
523 ImmutablePair<Flow, Flow> flowPair =
new ImmutablePair<>(
524 bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
527 logger.info(
"Flow Cache: Initialized");
541 logger.info(
"State clear request from test");
542 initState(
new InMemoryKeyValueState<>());
567 return outputCollector;
void pushFlow(ImmutablePair< Flow, Flow > flow)
IslInfoData deleteIsl(String islId)
IslInfoData updateIsl(IslInfoData isl)
boolean cacheContainsIsl(String islId)
ImmutablePair< Flow, Flow > removeFlow(String flowId)
static final ObjectMapper MAPPER
SwitchInfoData updateSwitch(SwitchInfoData newSwitch)
Optional< AbstractDumpState > dumpResorceCacheState()
void execute(Tuple tuple)
IslInfoData createOrUpdateIsl(IslInfoData isl)
IslInfoData createIsl(IslInfoData isl)
void declareOutputFields(OutputFieldsDeclarer output)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final Fields fieldMessage
SwitchInfoData createSwitch(SwitchInfoData newSwitch)
OutputCollector getOutput()
static final String STREAM_ID_CTRL
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
Map< SwitchId, Set< Integer > > getAllocatedMeters()
Set< Integer > getAllocatedVlans()
Set< ImmutablePair< Flow, Flow > > dumpFlows()
AbstractDumpState dumpState()
Set< SwitchInfoData > dumpSwitches()
void initState(InMemoryKeyValueState< String, Cache > state)
ImmutablePair< Flow, Flow > putFlow(ImmutablePair< Flow, Flow > flow)
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
Set< ImmutablePair< Flow, Flow > > getActiveFlowsWithAffectedPath(SwitchId switchId)
TopologyContext getContext()
Set< Integer > getAllocatedCookies()
String getCorrelationId()
boolean cacheContainsSwitch(SwitchId switchId)
Set< IslInfoData > dumpIsls()