16 package org.openkilda.wfm.isl;
22 import com.google.common.annotations.VisibleForTesting;
23 import org.apache.commons.collections4.CollectionUtils;
24 import org.apache.commons.collections4.map.PassiveExpiringMap;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.LinkedList;
32 import java.util.List;
34 import java.util.Optional;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Function;
38 import java.util.stream.Collectors;
60 private final Logger logger = LoggerFactory.getLogger(
DiscoveryManager.class);
65 private final int islHealthCheckInterval;
66 private final int islConsecutiveFailureLimit;
67 private final int maxAttempts;
69 private final Map<SwitchId, Set<DiscoveryLink>> linksBySwitch;
75 private final Map<NetworkEndpoint, DiscoveryLink> removedFromDiscovery;
86 int islHealthCheckInterval,
int islConsecutiveFailureLimit,
87 int maxAttempts, Integer minutesKeepRemovedIsl) {
88 this.islHealthCheckInterval = islHealthCheckInterval;
89 this.islConsecutiveFailureLimit = islConsecutiveFailureLimit;
90 this.maxAttempts = maxAttempts;
91 this.linksBySwitch = linksBySwitch;
92 this.removedFromDiscovery =
new PassiveExpiringMap<>(minutesKeepRemovedIsl, TimeUnit.MINUTES,
new HashMap<>());
110 int unsentDiscoPackets = 0;
112 List<DiscoveryLink> links = linksBySwitch.values()
114 .flatMap(Set::stream)
115 .collect(Collectors.toList());
118 if (!link.isNewAttemptAllowed()) {
119 logger.trace(
"Disco packet from {} is not sent due to exceeded limit of consecutive failures: {}",
120 link.getSource(), link.getConsecutiveFailure());
131 if (link.isAckAttemptsLimitExceeded(islConsecutiveFailureLimit)) {
134 if (link.isActive() && link.getConsecutiveFailure() == 0) {
137 logger.info(
"ISL IS DOWN (NO RESPONSE): {}", link);
140 logger.trace(
"No response to the disco packet from {}", link.getSource());
145 if (link.isAttemptsLimitExceeded(islConsecutiveFailureLimit) && link.isActive()) {
146 logger.info(
"Speaker doesn't send disco packet for {}", link);
147 unsentDiscoPackets++;
157 if (link.timeToCheck()) {
159 link.resetTickCounter();
162 logger.trace(
"Added to discovery plan: {}", link);
169 if (unsentDiscoPackets > 0) {
170 logger.warn(
"Speaker does not send discovery packets. Affected links amount: {}", unsentDiscoPackets);
182 boolean stateChanged =
false;
184 Optional<DiscoveryLink> matchedLink = findBySourceEndpoint(
node);
186 if (!matchedLink.isPresent()) {
187 logger.warn(
"Ignore \"AVAIL\" request for {}: node not found",
node);
195 logger.info(
"FOUND ISL: {}", link);
196 }
else if (link.getConsecutiveFailure() > 0) {
201 logger.info(
"ISL IS UP: {}", link);
225 boolean stateChanged =
false;
227 Optional<DiscoveryLink> matchedLink = findBySourceEndpoint(endpoint);
229 if (!matchedLink.isPresent()) {
230 logger.warn(
"Ignoring \"FAILED\" request. There is no link found from {}", endpoint);
233 if (link.isActive() && link.getConsecutiveFailure() == 0) {
237 logger.info(
"ISL IS DOWN (GOT RESPONSE): {}", link);
252 findBySourceEndpoint(endpoint)
255 logger.debug(
"Received acknowledge of sending disco from {}", endpoint);
264 logger.info(
"Register switch {} into ISL discovery manager", switchId);
273 Set<DiscoveryLink> subjectList = findAllBySwitch(switchId);
275 if (!subjectList.isEmpty()) {
276 logger.info(
"Received SWITCH UP (id:{}) with EXISTING NODES. Clearing isFoundISL flags", switchId);
278 subject.deactivate();
289 logger.info(
"Deregister switch {} from ISL discovery manager", switchId);
304 logger.info(
"Port UP on existing node {}; clear failures and ISLFound", link);
307 logger.info(
"Port UP on new node: {}", link);
320 return findBySourceEndpoint(
node)
321 .orElseGet(() -> registerDiscoveryLink(
node));
326 this.islHealthCheckInterval, this.maxAttempts);
327 linksBySwitch.computeIfAbsent(
node.getDatapath(), key ->
new HashSet<>())
330 logger.info(
"The link has been registered for discovery: {}", link);
340 Optional<DiscoveryLink> discoveryLink = findBySourceEndpoint(
node);
342 if (!discoveryLink.isPresent()) {
343 logger.warn(
"Can't update discovery {} -> node not found",
node);
347 removeFromDiscovery(
node);
357 Set<DiscoveryLink> findAllBySwitch(
SwitchId dpid) {
358 return linksBySwitch.getOrDefault(dpid, Collections.emptySet());
368 Optional<DiscoveryLink> findBySourceEndpoint(NetworkEndpoint endpoint) {
369 Set<DiscoveryLink> links = linksBySwitch.getOrDefault(endpoint.getDatapath(), Collections.emptySet());
371 return links.stream()
372 .filter(link -> endpoint.equals(link.getSource()))
382 private void removeFromDiscovery(NetworkEndpoint endpoint) {
383 if (endpoint.getPortNumber() == 0) {
384 Set<DiscoveryLink> links = linksBySwitch.remove(endpoint.getDatapath());
385 if (!CollectionUtils.isEmpty(links)) {
386 Map<NetworkEndpoint, DiscoveryLink> removedLinks = links.stream()
387 .collect(Collectors.toMap(DiscoveryLink::getSource, Function.identity()));
388 removedFromDiscovery.putAll(removedLinks);
390 logger.info(
"Removed switch {} from discovery", endpoint.getDatapath());
393 Set<DiscoveryLink> links = linksBySwitch.get(endpoint.getDatapath());
395 Optional<DiscoveryLink> matchedLink = links.stream()
396 .filter(discoveryLink -> endpoint.equals(discoveryLink.getSource()))
399 matchedLink.ifPresent(link -> {
401 removedFromDiscovery.put(endpoint, link);
403 logger.info(
"The link has been removed from discovery: {}", link);
414 .orElseGet(() -> removedFromDiscovery.get(endpoint));
417 logger.info(
"ISL Event: the link has been moved: {} to {}_{}", link, dstSwitch, dstPort);
430 .orElseGet(() -> removedFromDiscovery.get(srcEndpoint));
432 if (link != null && link.getDestination() != null) {
433 return link.getDestination();
436 throw new IllegalStateException(String.format(
"Not found link from %s_%s", srcSwitch, srcPort));
445 .orElseGet(() -> removedFromDiscovery.remove(endpoint));
449 logger.info(
"The link has been deactivated: {}", link);
461 Optional<DiscoveryLink> link = findBySourceEndpoint(
new NetworkEndpoint(switchId, portId));
463 return link.isPresent() && link.get().isNewAttemptAllowed();
471 this.needDiscovery =
new LinkedList<>();
472 this.discoveryFailure =
new LinkedList<>();
boolean isIslMoved(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)
boolean isInDiscoveryPlan(SwitchId switchId, int portId)
boolean isDestinationChanged(SwitchId dstSwitch, int dstPort)
void handlePortUp(SwitchId switchId, int portId)
void handlePortDown(SwitchId switchId, int portId)
DiscoveryLink registerPort(SwitchId switchId, int portId)
void handleSentDiscoPacket(NetworkEndpoint endpoint)
void deactivateLinkFromEndpoint(NetworkEndpoint endpoint)
boolean handleFailed(SwitchId switchId, int portId)
final List< NetworkEndpoint > needDiscovery
DiscoveryManager(Map< SwitchId, Set< DiscoveryLink >> linksBySwitch, int islHealthCheckInterval, int islConsecutiveFailureLimit, int maxAttempts, Integer minutesKeepRemovedIsl)
boolean isNewAttemptAllowed()
NetworkEndpoint getLinkDestination(SwitchId srcSwitch, int srcPort)
void activate(NetworkEndpoint destination)
final List< NetworkEndpoint > discoveryFailure
void incAcknowledgedAttempts()
void handleSwitchUp(SwitchId switchId)
void handleSwitchDown(SwitchId switchId)
void clearConsecutiveFailure()
boolean handleDiscovered(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)