16 package org.openkilda.wfm.topology.event;
18 import static java.lang.String.format;
58 import com.fasterxml.jackson.core.JsonProcessingException;
59 import com.google.common.annotations.VisibleForTesting;
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.Lists;
62 import org.apache.storm.kafka.spout.internal.Timer;
63 import org.apache.storm.state.InMemoryKeyValueState;
64 import org.apache.storm.state.KeyValueState;
65 import org.apache.storm.task.OutputCollector;
66 import org.apache.storm.task.TopologyContext;
67 import org.apache.storm.topology.OutputFieldsDeclarer;
68 import org.apache.storm.tuple.Fields;
69 import org.apache.storm.tuple.Tuple;
70 import org.apache.storm.tuple.Values;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
74 import java.io.IOException;
75 import java.util.HashMap;
78 import java.util.UUID;
79 import java.util.concurrent.TimeUnit;
80 import java.util.stream.Collectors;
102 private static final Logger logger = LoggerFactory.getLogger(
OfeLinkBolt.class);
103 private static final int BOLT_TICK_INTERVAL = 1;
105 private static final String STREAM_ID_CTRL =
"ctrl";
107 static final String STATE_ID_DISCOVERY =
"discovery-manager";
108 static final String TOPO_ENG_STREAM =
"topo.eng";
109 static final String SPEAKER_STREAM =
"speaker";
111 private final String islDiscoveryTopic;
113 private final int islHealthCheckInterval;
114 private final int islHealthCheckTimeout;
115 private final int islHealthFailureLimit;
116 private final int islKeepRemovedTimeout;
117 private final float watchDogInterval;
119 private TopologyContext context;
120 private OutputCollector collector;
124 private Map<SwitchId, Set<DiscoveryLink>> linksBySwitch;
126 private String dumpRequestCorrelationId = null;
127 private float dumpRequestTimeout;
128 private Timer dumpRequestTimer;
130 State state = State.NEED_SYNC;
136 super(BOLT_TICK_INTERVAL);
140 Preconditions.checkArgument(islHealthCheckInterval > 0,
141 "Invalid value for DiscoveryInterval: %s", islHealthCheckInterval);
143 Preconditions.checkArgument(islHealthCheckTimeout > 0,
144 "Invalid value for DiscoveryTimeout: %s", islHealthCheckTimeout);
151 islDiscoveryTopic =
config.getKafkaSpeakerTopic();
155 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
158 this.context = context;
159 this.collector = collector;
163 @SuppressWarnings(
"unchecked")
164 public
void initState(KeyValueState<String, Object> state) {
165 watchDog =
new WatchDog(watchDogInterval);
169 Object payload = state.get(STATE_ID_DISCOVERY);
170 if (payload == null) {
171 payload = linksBySwitch =
new HashMap<>();
172 state.put(islDiscoveryTopic, payload);
174 linksBySwitch = (Map<SwitchId, Set<DiscoveryLink>>) payload;
179 int islConsecutiveFailureLimit = (int) Math.ceil(islHealthCheckTimeout / (
float) islHealthCheckInterval);
181 discovery =
new DiscoveryManager(linksBySwitch, islHealthCheckInterval, islConsecutiveFailureLimit,
182 islHealthFailureLimit, islKeepRemovedTimeout);
190 boolean isSpeakerAvailable = watchDog.
isAvailable();
192 if (!isSpeakerAvailable) {
193 stateTransition(State.OFFLINE);
196 String correlationId = UUID.randomUUID().toString();
200 dumpRequestCorrelationId = correlationId;
201 sendNetworkRequest(tuple, correlationId);
202 enableDumpRequestTimer();
203 stateTransition(State.WAIT_SYNC);
207 case SYNC_IN_PROGRESS:
208 if (dumpRequestTimer.isExpiredResetOnTrue()) {
209 logger.error(
"Did not get network dump, send one more dump request");
210 dumpRequestCorrelationId = correlationId;
211 sendNetworkRequest(tuple, correlationId);
216 if (isSpeakerAvailable) {
217 logger.info(
"Switch into ONLINE mode");
218 stateTransition(State.NEED_SYNC);
223 processDiscoveryPlan(tuple, correlationId);
226 logger.error(
"Illegal state of OfeLinkBolt: {}", state);
233 private String sendNetworkRequest(Tuple tuple, String correlationId) {
235 System.currentTimeMillis(), correlationId,
239 "Send network dump request (correlation-id: {})",
244 collector.emit(SPEAKER_STREAM, tuple,
new Values(PAYLOAD, json));
245 }
catch (JsonProcessingException exception) {
246 logger.error(
"Could not serialize network cache request", exception);
249 return correlationId;
252 private void processDiscoveryPlan(Tuple tuple, String correlationId) {
255 for (NetworkEndpoint
node : discoveryPlan.needDiscovery) {
256 String msgCorrelationId =
format(
"%s-%s:%s", correlationId,
257 node.getSwitchDpId(),
node.getPortId());
258 sendDiscoveryMessage(tuple,
node, msgCorrelationId);
261 for (NetworkEndpoint
node : discoveryPlan.discoveryFailure) {
262 String msgCorrelationId =
format(
"%s-%s:%s-fail", correlationId,
263 node.getSwitchDpId(),
node.getPortId());
268 sendDiscoveryFailed(
node.getSwitchDpId(),
node.getPortId(), tuple, msgCorrelationId);
270 }
catch (IOException e) {
271 logger.error(
"Unable to encode message: {}", e);
278 private void sendDiscoveryMessage(Tuple tuple, NetworkEndpoint
node, String correlationId)
throws IOException {
279 DiscoverIslCommandData
data =
new DiscoverIslCommandData(
node.getDatapath(),
node.getPortNumber());
280 CommandMessage message =
new CommandMessage(
data, System.currentTimeMillis(),
281 correlationId, Destination.CONTROLLER);
282 logger.debug(
"LINK: Send ISL discovery command: {}", message);
283 collector.emit(SPEAKER_STREAM, tuple,
new Values(PAYLOAD, Utils.MAPPER.writeValueAsString(message)));
304 String json = tuple.getString(0);
308 message = MAPPER.readValue(json,
BaseMessage.class);
310 }
catch (IOException e) {
311 collector.ack(tuple);
312 logger.error(
"Unknown Message type={}", json);
319 }
else if (message instanceof
HeartBeat) {
320 logger.debug(
"Got speaker's heart beat");
321 stateTransition(State.NEED_SYNC, State.OFFLINE);
323 }
catch (Exception e) {
324 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
326 collector.ack(tuple);
330 private void dispatch(Tuple tuple,
InfoMessage infoMessage) {
333 dispatchNeedSync(tuple, infoMessage);
336 dispatchWaitSync(tuple, infoMessage);
338 case SYNC_IN_PROGRESS:
339 dispatchSyncInProgress(tuple, infoMessage);
342 dispatchOffline(tuple, infoMessage);
345 dispatchMain(tuple, infoMessage);
348 reportInvalidEvent(infoMessage.
getData());
352 private void dispatchNeedSync(Tuple tuple,
InfoMessage infoMessage) {
353 logger.warn(
"Bolt internal state is out of sync with FL, skip tuple");
356 private void dispatchWaitSync(Tuple tuple, InfoMessage infoMessage) {
357 InfoData
data = infoMessage.getData();
358 if (
data instanceof NetworkDumpBeginMarker) {
359 if (dumpRequestCorrelationId.equals(infoMessage.getCorrelationId())) {
360 logger.info(
"Got response on network sync request, start processing network events");
361 enableDumpRequestTimer();
362 stateTransition(State.SYNC_IN_PROGRESS);
365 "Got response on network sync request with invalid " 366 +
"correlation-id(expect: \"{}\", got: \"{}\")",
367 dumpRequestCorrelationId, infoMessage.getCorrelationId());
370 reportInvalidEvent(
data);
374 private void dispatchSyncInProgress(Tuple tuple, InfoMessage infoMessage) {
375 InfoData
data = infoMessage.getData();
376 if (
data instanceof NetworkDumpSwitchData) {
377 logger.info(
"Event/WFM Sync: switch {}",
data);
380 }
else if (
data instanceof NetworkDumpPortData) {
382 NetworkDumpPortData portData = (NetworkDumpPortData)
data;
383 discovery.
registerPort(portData.getSwitchId(), portData.getPortNo());
385 }
else if (
data instanceof NetworkDumpEndMarker) {
386 logger.info(
"End of network sync stream received");
387 stateTransition(State.MAIN);
389 reportInvalidEvent(
data);
393 private void dispatchOffline(Tuple tuple, InfoMessage infoMessage) {
394 logger.warn(
"Got input while in offline mode, it mean the possibility to try sync state");
396 stateTransition(State.NEED_SYNC);
399 private void dispatchMain(Tuple tuple, InfoMessage infoMessage) {
400 InfoData
data = infoMessage.getData();
401 if (
data instanceof SwitchInfoData) {
402 handleSwitchEvent(tuple, (SwitchInfoData)
data);
403 passToTopologyEngine(tuple);
404 }
else if (
data instanceof PortInfoData) {
405 handlePortEvent(tuple, (PortInfoData)
data);
406 passToTopologyEngine(tuple);
407 }
else if (
data instanceof IslInfoData) {
408 handleIslEvent(tuple, (IslInfoData)
data, infoMessage.getCorrelationId());
409 }
else if (
data instanceof DiscoPacketSendingConfirmation) {
410 handleSentDiscoPacket((DiscoPacketSendingConfirmation)
data);
412 reportInvalidEvent(
data);
416 private void stateTransition(State switchTo) {
417 logger.info(
"State transition to {} (current {})", switchTo, state);
421 private void stateTransition(State switchTo, State onlyInState) {
422 if (state == onlyInState) {
423 stateTransition(switchTo);
427 private void reportInvalidEvent(InfoData event) {
429 "Unhandled event: state={}, type={}", state,
430 event.getClass().getName());
433 private void handleSwitchEvent(Tuple tuple, SwitchInfoData switchData) {
434 SwitchId switchId = switchData.getSwitchId();
435 String state = switchData.getState().toString();
436 logger.info(
"DISCO: Switch Event: switch={} state={}", switchId, state);
438 if (SwitchState.DEACTIVATED.getType().equals(state)) {
444 }
else if (SwitchState.ACTIVATED.getType().equals(state)) {
451 logger.warn(
"SWITCH Event: ignoring state: {}", state);
458 private void passToTopologyEngine(Tuple tuple) {
459 String json = tuple.getString(0);
460 collector.emit(TOPO_ENG_STREAM, tuple,
new Values(PAYLOAD, json));
463 private void passToTopologyEngine(Tuple tuple, InfoMessage message) {
465 String json = Utils.MAPPER.writeValueAsString(message);
466 collector.emit(TOPO_ENG_STREAM, tuple,
new Values(PAYLOAD, json));
467 }
catch (JsonProcessingException e) {
468 logger.error(
"Error during json processing", e);
472 private void handlePortEvent(Tuple tuple, PortInfoData portData) {
473 final SwitchId switchId = portData.getSwitchId();
474 final int portId = portData.getPortNo();
475 String updown = portData.getState().toString();
476 logger.info(
"DISCO: Port Event: switch={} port={} state={}", switchId, portId, updown);
478 if (isPortUpOrCached(updown)) {
480 }
else if (updown.equals(OfeMessageUtils.PORT_DOWN)) {
484 logger.warn(
"PORT Event: ignoring state: {}", updown);
488 private void handleIslEvent(Tuple tuple, IslInfoData discoveredIsl, String correlationId) {
489 PathNode srcNode = discoveredIsl.getPath().get(0);
490 final SwitchId srcSwitch = srcNode.getSwitchId();
491 final int srcPort = srcNode.getPortNo();
493 PathNode dstNode = discoveredIsl.getPath().get(1);
494 final SwitchId dstSwitch = dstNode.getSwitchId();
495 final int dstPort = dstNode.getPortNo();
497 IslChangeType state = discoveredIsl.getState();
498 boolean stateChanged =
false;
505 if (IslChangeType.DISCOVERED.equals(state)) {
506 if (discovery.
isIslMoved(srcSwitch, srcPort, dstSwitch, dstPort)) {
507 handleMovedIsl(tuple, srcSwitch, srcPort, dstSwitch, dstPort, correlationId);
509 stateChanged = discovery.
handleDiscovered(srcSwitch, srcPort, dstSwitch, dstPort);
517 logger.warn(
"ISL Event: ignoring state: {}", state);
522 logger.info(
"DISCO: ISL Event: switch={} port={} state={}", srcSwitch, srcPort, state);
523 passToTopologyEngine(tuple);
536 private void sendDiscoveryFailed(SwitchId switchId,
int portId, Tuple tuple, String correlationId)
538 String discoFail = OfeMessageUtils.createIslFail(switchId, portId, correlationId);
541 collector.emit(TOPO_ENG_STREAM, tuple,
new Values(PAYLOAD, discoFail));
543 logger.warn(
"LINK: Send ISL discovery failure message={}", discoFail);
546 private boolean isPortUpOrCached(String state) {
547 return OfeMessageUtils.PORT_UP.equals(state) || OfeMessageUtils.PORT_ADD.equals(state)
548 || PortChangeType.CACHED.getType().equals(state);
551 private void enableDumpRequestTimer() {
552 long expireDelay = (int) (dumpRequestTimeout * 1000);
553 dumpRequestTimer =
new Timer(expireDelay, expireDelay, TimeUnit.MILLISECONDS);
556 private void handleMovedIsl(Tuple tuple, SwitchId srcSwitch,
int srcPort, SwitchId dstSwitch,
int dstPort,
557 String correlationId) {
559 logger.info(
"Link is moved from {}_{} - {}_{} to endpoint {}_{}", srcSwitch, srcPort,
560 dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), dstSwitch, dstPort);
564 PathNode srcNode =
new PathNode(srcSwitch, srcPort, 0);
565 PathNode dstNode =
new PathNode(dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), 1);
566 IslInfoData infoData =
new IslInfoData(Lists.newArrayList(srcNode, dstNode), IslChangeType.MOVED);
567 InfoMessage message =
new InfoMessage(infoData, System.currentTimeMillis(), correlationId);
568 passToTopologyEngine(tuple, message);
571 srcNode =
new PathNode(dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), 0);
572 dstNode =
new PathNode(srcSwitch, srcPort, 1);
573 IslInfoData reverseLink =
new IslInfoData(Lists.newArrayList(srcNode, dstNode), IslChangeType.MOVED);
574 message =
new InfoMessage(reverseLink, System.currentTimeMillis(), correlationId);
575 passToTopologyEngine(tuple, message);
578 private void handleSentDiscoPacket(DiscoPacketSendingConfirmation confirmation) {
579 logger.debug(
"Discovery packet is sent from {}", confirmation);
585 declarer.declareStream(SPEAKER_STREAM,
new Fields(
"key",
"message"));
586 declarer.declareStream(TOPO_ENG_STREAM,
new Fields(
"key",
"message"));
593 Set<DiscoveryLink> links = linksBySwitch.values()
595 .flatMap(Set::stream)
596 .collect(Collectors.toSet());
603 return STREAM_ID_CTRL;
608 logger.info(
"ClearState request has been received.");
609 initState(
new InMemoryKeyValueState<>());
615 Set<DiscoveryLink> filteredDiscoveryList = linksBySwitch.get(switchId);
617 Set<DiscoveryLink> filterdIslFilter = islFilter.
getMatchSet().stream()
618 .filter(
node ->
node.getSource().getSwitchDpId().equals(switchId))
619 .collect(Collectors.toSet());
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final String PAYLOAD
static final ObjectMapper MAPPER
boolean isIslMoved(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)
boolean isInDiscoveryPlan(SwitchId switchId, int portId)
int getDiscoverySpeakerFailureTimeout()
OfeLinkBolt(OFEventWfmTopologyConfig config)
void handlePortUp(SwitchId switchId, int portId)
void handlePortDown(SwitchId switchId, int portId)
AbstractDumpState dumpState()
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
DiscoveryLink registerPort(SwitchId switchId, int portId)
static final Fields fieldMessage
void declareOutputFields(OutputFieldsDeclarer declarer)
int getDiscoveryInterval()
void handleSentDiscoPacket(NetworkEndpoint endpoint)
void deactivateLinkFromEndpoint(NetworkEndpoint endpoint)
boolean handleFailed(SwitchId switchId, int portId)
def command(payload, fields)
int getDiscoveryDumpRequestTimeout()
int getDiscoveryTimeout()
void initState(KeyValueState< String, Object > state)
TopologyContext getContext()
OutputCollector getOutput()
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
int getKeepRemovedIslTimeout()
NetworkEndpoint getLinkDestination(SwitchId srcSwitch, int srcPort)
Set< DiscoveryLink > getMatchSet()
void handleSwitchUp(SwitchId switchId)
void handleSwitchDown(SwitchId switchId)
boolean handleDiscovered(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)