Open Kilda Java Documentation
DiscoveryManager.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.isl;
17 
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import org.apache.commons.collections4.CollectionUtils;
24 import org.apache.commons.collections4.map.PassiveExpiringMap;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Optional;
35 import java.util.Set;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Function;
38 import java.util.stream.Collectors;
39 
59 public class DiscoveryManager {
60  private final Logger logger = LoggerFactory.getLogger(DiscoveryManager.class);
61 
65  private final int islHealthCheckInterval;
66  private final int islConsecutiveFailureLimit;
67  private final int maxAttempts;
68 
69  private final Map<SwitchId, Set<DiscoveryLink>> linksBySwitch;
70 
75  private final Map<NetworkEndpoint, DiscoveryLink> removedFromDiscovery;
76 
85  public DiscoveryManager(Map<SwitchId, Set<DiscoveryLink>> linksBySwitch,
86  int islHealthCheckInterval, int islConsecutiveFailureLimit,
87  int maxAttempts, Integer minutesKeepRemovedIsl) {
88  this.islHealthCheckInterval = islHealthCheckInterval;
89  this.islConsecutiveFailureLimit = islConsecutiveFailureLimit;
90  this.maxAttempts = maxAttempts;
91  this.linksBySwitch = linksBySwitch;
92  this.removedFromDiscovery = new PassiveExpiringMap<>(minutesKeepRemovedIsl, TimeUnit.MINUTES, new HashMap<>());
93  }
94 
109  Plan result = new Plan();
110  int unsentDiscoPackets = 0;
111 
112  List<DiscoveryLink> links = linksBySwitch.values()
113  .stream()
114  .flatMap(Set::stream)
115  .collect(Collectors.toList());
116 
117  for (DiscoveryLink link : links) {
118  if (!link.isNewAttemptAllowed()) {
119  logger.trace("Disco packet from {} is not sent due to exceeded limit of consecutive failures: {}",
120  link.getSource(), link.getConsecutiveFailure());
121  continue;
122  }
123 
124  /*
125  * If we get a response from FL, we clear the attempts. Otherwise, no response, and
126  * number of attempts grows.
127  *
128  * Further, consecutivefailures = attempts - failure limit (we wait until attempt limit before increasing)
129  */
130  NetworkEndpoint node = link.getSource();
131  if (link.isAckAttemptsLimitExceeded(islConsecutiveFailureLimit)) {
132  // We've attempted to get the health multiple times, with no response.
133  // Time to mark it as a failure and send a failure notice ** if ** it was an ISL.
134  if (link.isActive() && link.getConsecutiveFailure() == 0) {
135  // It is a discovery failure if it was previously a success.
136  result.discoveryFailure.add(node);
137  logger.info("ISL IS DOWN (NO RESPONSE): {}", link);
138  }
139  // Increment Failure = 1 after isAttemptsLimitExceeded failure, then increases every attempt.
140  logger.trace("No response to the disco packet from {}", link.getSource());
141  link.fail();
142  // NB: this node can be in both discoveryFailure and needDiscovery
143  }
144 
145  if (link.isAttemptsLimitExceeded(islConsecutiveFailureLimit) && link.isActive()) {
146  logger.info("Speaker doesn't send disco packet for {}", link);
147  unsentDiscoPackets++;
148  }
149 
150  /*
151  * If you get here, the following are true:
152  * - it isn't in some filter
153  * - it hasn't reached failure limit (forlorn)
154  * - it is either time to send discovery or not
155  * - NB: we'll keep trying to send discovery, even if we don't get a response.
156  */
157  if (link.timeToCheck()) {
158  link.incAttempts();
159  link.resetTickCounter();
160  result.needDiscovery.add(node);
161 
162  logger.trace("Added to discovery plan: {}", link);
163  } else {
164  link.tick();
165  }
166 
167  }
168 
169  if (unsentDiscoPackets > 0) {
170  logger.warn("Speaker does not send discovery packets. Affected links amount: {}", unsentDiscoPackets);
171  }
172 
173  return result;
174  }
175 
181  public boolean handleDiscovered(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort) {
182  boolean stateChanged = false;
183  NetworkEndpoint node = new NetworkEndpoint(srcSwitch, srcPort);
184  Optional<DiscoveryLink> matchedLink = findBySourceEndpoint(node);
185 
186  if (!matchedLink.isPresent()) {
187  logger.warn("Ignore \"AVAIL\" request for {}: node not found", node);
188  } else {
189  DiscoveryLink link = matchedLink.get();
190  if (!link.isActive() || link.isDestinationChanged(dstSwitch, dstPort)) {
191  // we've found newly discovered or moved/replugged isl
192  link.activate(new NetworkEndpoint(dstSwitch, dstPort));
193 
194  stateChanged = true;
195  logger.info("FOUND ISL: {}", link);
196  } else if (link.getConsecutiveFailure() > 0) {
197  // We've found failures, but now we've had success, so that is a state change.
198  // To repeat, current model for state change is just 1 failure. If we change this
199  // policy, then change the test above.
200  stateChanged = true;
201  logger.info("ISL IS UP: {}", link);
202  }
203  link.renew();
204  link.success();
206  // If one of the logs above wasn't reachd, don't log anything .. ISL was up and is still up
207  }
208 
209  if (stateChanged) {
210  // Add logic to ensure we send a discovery packet for the opposite direction.
211  // TODO: in order to do this here, we need more information (ie the other end of the ISL)
212  // Since that isn't passed in and isn't available in our state, have to rely on the
213  // calling function.
214 
215  }
216  return stateChanged;
217  }
218 
224  public boolean handleFailed(SwitchId switchId, int portId) {
225  boolean stateChanged = false;
226  NetworkEndpoint endpoint = new NetworkEndpoint(switchId, portId);
227  Optional<DiscoveryLink> matchedLink = findBySourceEndpoint(endpoint);
228 
229  if (!matchedLink.isPresent()) {
230  logger.warn("Ignoring \"FAILED\" request. There is no link found from {}", endpoint);
231  } else {
232  DiscoveryLink link = matchedLink.get();
233  if (link.isActive() && link.getConsecutiveFailure() == 0) {
234  // This is the first failure for an ISL. That is a state change.
235  // IF this isn't an ISL and we receive a failure, that isn't a state change.
236  stateChanged = true;
237  logger.info("ISL IS DOWN (GOT RESPONSE): {}", link);
238  }
239  link.renew();
240  link.fail();
241  link.deactivate();
242  }
243  return stateChanged;
244  }
245 
251  public void handleSentDiscoPacket(NetworkEndpoint endpoint) {
252  findBySourceEndpoint(endpoint)
254 
255  logger.debug("Received acknowledge of sending disco from {}", endpoint);
256  }
257 
263  public void handleSwitchUp(SwitchId switchId) {
264  logger.info("Register switch {} into ISL discovery manager", switchId);
265  // TODO: this method *use to not* do anything .. but it should register the switch.
266  // At least, it seems like it should do something to register a switch, even
267  // though this can be lazily done when the first port event arrives.
268 
269  /*
270  * If a switch comes up, clear any "isFoundIsl" flags, in case something has changed,
271  * and/or if the TE has cleared it's state .. this will pass along the ISL.
272  */
273  Set<DiscoveryLink> subjectList = findAllBySwitch(switchId);
274 
275  if (!subjectList.isEmpty()) {
276  logger.info("Received SWITCH UP (id:{}) with EXISTING NODES. Clearing isFoundISL flags", switchId);
277  for (DiscoveryLink subject : subjectList) {
278  subject.deactivate();
279  }
280  }
281  }
282 
288  public void handleSwitchDown(SwitchId switchId) {
289  logger.info("Deregister switch {} from ISL discovery manager", switchId);
290  removeFromDiscovery(new NetworkEndpoint(switchId, 0));
291  }
292 
296  public void handlePortUp(SwitchId switchId, int portId) {
297  DiscoveryLink link = registerPort(switchId, portId);
298  if (link.isActive() || !link.isNewAttemptAllowed()) {
299  // Similar to SwitchUp, if we have a PortUp on an existing port, either we are receiving
300  // a duplicate, or we missed the port down, or a new discovery has occurred.
301  // NB: this should cause an ISL discovery packet to be sent.
302  // TODO: we should probably separate "port up" from "do discovery". ATM, one would call
303  // this function just to get the "do discovery" functionality.
304  logger.info("Port UP on existing node {}; clear failures and ISLFound", link);
305  link.deactivate();
306  } else {
307  logger.info("Port UP on new node: {}", link);
308  }
309  }
310 
318  public DiscoveryLink registerPort(SwitchId switchId, int portId) {
319  NetworkEndpoint node = new NetworkEndpoint(switchId, portId);
320  return findBySourceEndpoint(node)
321  .orElseGet(() -> registerDiscoveryLink(node));
322  }
323 
324  private DiscoveryLink registerDiscoveryLink(NetworkEndpoint node) {
325  DiscoveryLink link = new DiscoveryLink(node.getDatapath(), node.getPortNumber(),
326  this.islHealthCheckInterval, this.maxAttempts);
327  linksBySwitch.computeIfAbsent(node.getDatapath(), key -> new HashSet<>())
328  .add(link);
329 
330  logger.info("The link has been registered for discovery: {}", link);
331 
332  return link;
333  }
334 
338  public void handlePortDown(SwitchId switchId, int portId) {
339  NetworkEndpoint node = new NetworkEndpoint(switchId, portId);
340  Optional<DiscoveryLink> discoveryLink = findBySourceEndpoint(node);
341 
342  if (!discoveryLink.isPresent()) {
343  logger.warn("Can't update discovery {} -> node not found", node);
344  return;
345  }
346 
347  removeFromDiscovery(node);
348  }
349 
356  @VisibleForTesting
357  Set<DiscoveryLink> findAllBySwitch(SwitchId dpid) {
358  return linksBySwitch.getOrDefault(dpid, Collections.emptySet());
359  }
360 
367  @VisibleForTesting
368  Optional<DiscoveryLink> findBySourceEndpoint(NetworkEndpoint endpoint) {
369  Set<DiscoveryLink> links = linksBySwitch.getOrDefault(endpoint.getDatapath(), Collections.emptySet());
370 
371  return links.stream()
372  .filter(link -> endpoint.equals(link.getSource()))
373  .findFirst();
374  }
375 
382  private void removeFromDiscovery(NetworkEndpoint endpoint) {
383  if (endpoint.getPortNumber() == 0) {
384  Set<DiscoveryLink> links = linksBySwitch.remove(endpoint.getDatapath());
385  if (!CollectionUtils.isEmpty(links)) {
386  Map<NetworkEndpoint, DiscoveryLink> removedLinks = links.stream()
387  .collect(Collectors.toMap(DiscoveryLink::getSource, Function.identity()));
388  removedFromDiscovery.putAll(removedLinks);
389 
390  logger.info("Removed switch {} from discovery", endpoint.getDatapath());
391  }
392  } else {
393  Set<DiscoveryLink> links = linksBySwitch.get(endpoint.getDatapath());
394 
395  Optional<DiscoveryLink> matchedLink = links.stream()
396  .filter(discoveryLink -> endpoint.equals(discoveryLink.getSource()))
397  .findFirst();
398 
399  matchedLink.ifPresent(link -> {
400  links.remove(link);
401  removedFromDiscovery.put(endpoint, link);
402 
403  logger.info("The link has been removed from discovery: {}", link);
404  });
405  }
406  }
407 
411  public boolean isIslMoved(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort) {
412  NetworkEndpoint endpoint = new NetworkEndpoint(srcSwitch, srcPort);
413  DiscoveryLink link = findBySourceEndpoint(endpoint)
414  .orElseGet(() -> removedFromDiscovery.get(endpoint));
415 
416  if (link != null && link.isDestinationChanged(dstSwitch, dstPort)) {
417  logger.info("ISL Event: the link has been moved: {} to {}_{}", link, dstSwitch, dstPort);
418  return true;
419  }
420 
421  return false;
422  }
423 
427  public NetworkEndpoint getLinkDestination(SwitchId srcSwitch, int srcPort) {
428  NetworkEndpoint srcEndpoint = new NetworkEndpoint(srcSwitch, srcPort);
429  DiscoveryLink link = findBySourceEndpoint(srcEndpoint)
430  .orElseGet(() -> removedFromDiscovery.get(srcEndpoint));
431 
432  if (link != null && link.getDestination() != null) {
433  return link.getDestination();
434  }
435 
436  throw new IllegalStateException(String.format("Not found link from %s_%s", srcSwitch, srcPort));
437  }
438 
444  DiscoveryLink link = findBySourceEndpoint(endpoint)
445  .orElseGet(() -> removedFromDiscovery.remove(endpoint));
446  if (link != null) {
447  link.deactivate();
448 
449  logger.info("The link has been deactivated: {}", link);
450  }
451  }
452 
460  public boolean isInDiscoveryPlan(SwitchId switchId, int portId) {
461  Optional<DiscoveryLink> link = findBySourceEndpoint(new NetworkEndpoint(switchId, portId));
462 
463  return link.isPresent() && link.get().isNewAttemptAllowed();
464  }
465 
466  public final class Plan {
467  public final List<NetworkEndpoint> needDiscovery;
468  public final List<NetworkEndpoint> discoveryFailure;
469 
470  private Plan() {
471  this.needDiscovery = new LinkedList<>();
472  this.discoveryFailure = new LinkedList<>();
473  }
474  }
475 }
boolean isIslMoved(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)
boolean isInDiscoveryPlan(SwitchId switchId, int portId)
void handlePortUp(SwitchId switchId, int portId)
void handlePortDown(SwitchId switchId, int portId)
DiscoveryLink registerPort(SwitchId switchId, int portId)
void handleSentDiscoPacket(NetworkEndpoint endpoint)
void deactivateLinkFromEndpoint(NetworkEndpoint endpoint)
boolean handleFailed(SwitchId switchId, int portId)
list result
Definition: plan-d.py:72
final List< NetworkEndpoint > needDiscovery
DiscoveryManager(Map< SwitchId, Set< DiscoveryLink >> linksBySwitch, int islHealthCheckInterval, int islConsecutiveFailureLimit, int maxAttempts, Integer minutesKeepRemovedIsl)
NetworkEndpoint getLinkDestination(SwitchId srcSwitch, int srcPort)
final List< NetworkEndpoint > discoveryFailure
boolean handleDiscovered(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)