Open Kilda Java Documentation
OfeLinkBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.event;
17 
18 import static java.lang.String.format;
19 import static org.openkilda.messaging.Utils.MAPPER;
20 import static org.openkilda.messaging.Utils.PAYLOAD;
21 
49 import org.openkilda.wfm.WatchDog;
57 
58 import com.fasterxml.jackson.core.JsonProcessingException;
59 import com.google.common.annotations.VisibleForTesting;
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.Lists;
62 import org.apache.storm.kafka.spout.internal.Timer;
63 import org.apache.storm.state.InMemoryKeyValueState;
64 import org.apache.storm.state.KeyValueState;
65 import org.apache.storm.task.OutputCollector;
66 import org.apache.storm.task.TopologyContext;
67 import org.apache.storm.topology.OutputFieldsDeclarer;
68 import org.apache.storm.tuple.Fields;
69 import org.apache.storm.tuple.Tuple;
70 import org.apache.storm.tuple.Values;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 
74 import java.io.IOException;
75 import java.util.HashMap;
76 import java.util.Map;
77 import java.util.Set;
78 import java.util.UUID;
79 import java.util.concurrent.TimeUnit;
80 import java.util.stream.Collectors;
81 
99 public class OfeLinkBolt
100  extends AbstractTickStatefulBolt<KeyValueState<String, Object>>
101  implements ICtrlBolt {
102  private static final Logger logger = LoggerFactory.getLogger(OfeLinkBolt.class);
103  private static final int BOLT_TICK_INTERVAL = 1;
104 
105  private static final String STREAM_ID_CTRL = "ctrl";
106  @VisibleForTesting
107  static final String STATE_ID_DISCOVERY = "discovery-manager";
108  static final String TOPO_ENG_STREAM = "topo.eng";
109  static final String SPEAKER_STREAM = "speaker";
110 
111  private final String islDiscoveryTopic;
112 
113  private final int islHealthCheckInterval;
114  private final int islHealthCheckTimeout;
115  private final int islHealthFailureLimit;
116  private final int islKeepRemovedTimeout;
117  private final float watchDogInterval;
118  private WatchDog watchDog;
119  private TopologyContext context;
120  private OutputCollector collector;
121 
122  private DummyIIslFilter islFilter;
123  private DiscoveryManager discovery;
124  private Map<SwitchId, Set<DiscoveryLink>> linksBySwitch;
125 
126  private String dumpRequestCorrelationId = null;
127  private float dumpRequestTimeout;
128  private Timer dumpRequestTimer;
129  @VisibleForTesting
130  State state = State.NEED_SYNC;
131 
136  super(BOLT_TICK_INTERVAL);
137 
138  DiscoveryConfig discoveryConfig = config.getDiscoveryConfig();
139  islHealthCheckInterval = discoveryConfig.getDiscoveryInterval();
140  Preconditions.checkArgument(islHealthCheckInterval > 0,
141  "Invalid value for DiscoveryInterval: %s", islHealthCheckInterval);
142  islHealthCheckTimeout = discoveryConfig.getDiscoveryTimeout();
143  Preconditions.checkArgument(islHealthCheckTimeout > 0,
144  "Invalid value for DiscoveryTimeout: %s", islHealthCheckTimeout);
145  islHealthFailureLimit = discoveryConfig.getDiscoveryLimit();
146  islKeepRemovedTimeout = discoveryConfig.getKeepRemovedIslTimeout();
147 
148  watchDogInterval = discoveryConfig.getDiscoverySpeakerFailureTimeout();
149  dumpRequestTimeout = discoveryConfig.getDiscoveryDumpRequestTimeout();
150 
151  islDiscoveryTopic = config.getKafkaSpeakerTopic();
152  }
153 
154  @Override
155  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
156  islFilter = new DummyIIslFilter();
157 
158  this.context = context;
159  this.collector = collector;
160  }
161 
162  @Override
163  @SuppressWarnings("unchecked")
164  public void initState(KeyValueState<String, Object> state) {
165  watchDog = new WatchDog(watchDogInterval);
166 
167  // NB: First time the worker is created this will be null
168  // TODO: what happens to state as workers go up or down
169  Object payload = state.get(STATE_ID_DISCOVERY);
170  if (payload == null) {
171  payload = linksBySwitch = new HashMap<>();
172  state.put(islDiscoveryTopic, payload);
173  } else {
174  linksBySwitch = (Map<SwitchId, Set<DiscoveryLink>>) payload;
175  }
176 
177  // DiscoveryManager counts failures as failed attempts,
178  // so we need to convert islHealthCheckTimeout (which is in ticks) into attempts.
179  int islConsecutiveFailureLimit = (int) Math.ceil(islHealthCheckTimeout / (float) islHealthCheckInterval);
180 
181  discovery = new DiscoveryManager(linksBySwitch, islHealthCheckInterval, islConsecutiveFailureLimit,
182  islHealthFailureLimit, islKeepRemovedTimeout);
183  }
184 
188  @Override
189  protected void doTick(Tuple tuple) {
190  boolean isSpeakerAvailable = watchDog.isAvailable();
191 
192  if (!isSpeakerAvailable) {
193  stateTransition(State.OFFLINE);
194  }
195 
196  String correlationId = UUID.randomUUID().toString();
197 
198  switch (state) {
199  case NEED_SYNC:
200  dumpRequestCorrelationId = correlationId;
201  sendNetworkRequest(tuple, correlationId);
202  enableDumpRequestTimer();
203  stateTransition(State.WAIT_SYNC);
204  break;
205 
206  case WAIT_SYNC:
207  case SYNC_IN_PROGRESS:
208  if (dumpRequestTimer.isExpiredResetOnTrue()) {
209  logger.error("Did not get network dump, send one more dump request");
210  dumpRequestCorrelationId = correlationId;
211  sendNetworkRequest(tuple, correlationId);
212  }
213  break;
214 
215  case OFFLINE:
216  if (isSpeakerAvailable) {
217  logger.info("Switch into ONLINE mode");
218  stateTransition(State.NEED_SYNC);
219  }
220  break;
221 
222  case MAIN:
223  processDiscoveryPlan(tuple, correlationId);
224  break;
225  default:
226  logger.error("Illegal state of OfeLinkBolt: {}", state);
227  }
228  }
229 
233  private String sendNetworkRequest(Tuple tuple, String correlationId) {
235  System.currentTimeMillis(), correlationId,
237 
238  logger.info(
239  "Send network dump request (correlation-id: {})",
240  correlationId);
241 
242  try {
243  String json = Utils.MAPPER.writeValueAsString(command);
244  collector.emit(SPEAKER_STREAM, tuple, new Values(PAYLOAD, json));
245  } catch (JsonProcessingException exception) {
246  logger.error("Could not serialize network cache request", exception);
247  }
248 
249  return correlationId;
250  }
251 
252  private void processDiscoveryPlan(Tuple tuple, String correlationId) {
253  DiscoveryManager.Plan discoveryPlan = discovery.makeDiscoveryPlan();
254  try {
255  for (NetworkEndpoint node : discoveryPlan.needDiscovery) {
256  String msgCorrelationId = format("%s-%s:%s", correlationId,
257  node.getSwitchDpId(), node.getPortId());
258  sendDiscoveryMessage(tuple, node, msgCorrelationId);
259  }
260 
261  for (NetworkEndpoint node : discoveryPlan.discoveryFailure) {
262  String msgCorrelationId = format("%s-%s:%s-fail", correlationId,
263  node.getSwitchDpId(), node.getPortId());
264  // this is somewhat incongruous - we send failure to TE, but we send
265  // discovery to FL ..
266  // Reality is that the handleDiscovery/handleFailure below does the work
267  //
268  sendDiscoveryFailed(node.getSwitchDpId(), node.getPortId(), tuple, msgCorrelationId);
269  }
270  } catch (IOException e) {
271  logger.error("Unable to encode message: {}", e);
272  }
273  }
274 
278  private void sendDiscoveryMessage(Tuple tuple, NetworkEndpoint node, String correlationId) throws IOException {
279  DiscoverIslCommandData data = new DiscoverIslCommandData(node.getDatapath(), node.getPortNumber());
280  CommandMessage message = new CommandMessage(data, System.currentTimeMillis(),
281  correlationId, Destination.CONTROLLER);
282  logger.debug("LINK: Send ISL discovery command: {}", message);
283  collector.emit(SPEAKER_STREAM, tuple, new Values(PAYLOAD, Utils.MAPPER.writeValueAsString(message)));
284  }
285 
286  @Override
287  protected void doWork(Tuple tuple) {
288  if (CtrlAction.boltHandlerEntrance(this, tuple)) {
289  return;
290  }
291  //
292  // (crimi) - commenting out the filter code until we re-evaluate the design. Also, this code
293  // should probably be embedded in "handleIslEvent"
294  // /*
295  // * Check whether ISL Filter needs to be engaged.
296  // */
297  // String source = tuple.getSourceComponent();
298  // if (source.equals(OfEventWfmTopology.SPOUT_ID_INPUT)) {
299  // PopulateIslFilterAction action = new PopulateIslFilterAction(this, tuple, islFilter);
300  // action.run();
301  // return;
302  // }
303 
304  String json = tuple.getString(0);
305 
306  BaseMessage message;
307  try {
308  message = MAPPER.readValue(json, BaseMessage.class);
309  watchDog.reset();
310  } catch (IOException e) {
311  collector.ack(tuple);
312  logger.error("Unknown Message type={}", json);
313  return;
314  }
315 
316  try {
317  if (message instanceof InfoMessage) {
318  dispatch(tuple, (InfoMessage) message);
319  } else if (message instanceof HeartBeat) {
320  logger.debug("Got speaker's heart beat");
321  stateTransition(State.NEED_SYNC, State.OFFLINE);
322  }
323  } catch (Exception e) {
324  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
325  } finally {
326  collector.ack(tuple);
327  }
328  }
329 
330  private void dispatch(Tuple tuple, InfoMessage infoMessage) {
331  switch (state) {
332  case NEED_SYNC:
333  dispatchNeedSync(tuple, infoMessage);
334  break;
335  case WAIT_SYNC:
336  dispatchWaitSync(tuple, infoMessage);
337  break;
338  case SYNC_IN_PROGRESS:
339  dispatchSyncInProgress(tuple, infoMessage);
340  break;
341  case OFFLINE:
342  dispatchOffline(tuple, infoMessage);
343  break;
344  case MAIN:
345  dispatchMain(tuple, infoMessage);
346  break;
347  default:
348  reportInvalidEvent(infoMessage.getData());
349  }
350  }
351 
352  private void dispatchNeedSync(Tuple tuple, InfoMessage infoMessage) {
353  logger.warn("Bolt internal state is out of sync with FL, skip tuple");
354  }
355 
356  private void dispatchWaitSync(Tuple tuple, InfoMessage infoMessage) {
357  InfoData data = infoMessage.getData();
358  if (data instanceof NetworkDumpBeginMarker) {
359  if (dumpRequestCorrelationId.equals(infoMessage.getCorrelationId())) {
360  logger.info("Got response on network sync request, start processing network events");
361  enableDumpRequestTimer();
362  stateTransition(State.SYNC_IN_PROGRESS);
363  } else {
364  logger.warn(
365  "Got response on network sync request with invalid "
366  + "correlation-id(expect: \"{}\", got: \"{}\")",
367  dumpRequestCorrelationId, infoMessage.getCorrelationId());
368  }
369  } else {
370  reportInvalidEvent(data);
371  }
372  }
373 
374  private void dispatchSyncInProgress(Tuple tuple, InfoMessage infoMessage) {
375  InfoData data = infoMessage.getData();
376  if (data instanceof NetworkDumpSwitchData) {
377  logger.info("Event/WFM Sync: switch {}", data);
378  // no sync actions required for switches.
379 
380  } else if (data instanceof NetworkDumpPortData) {
381  logger.info("Event/WFM Sync: port {}", data);
382  NetworkDumpPortData portData = (NetworkDumpPortData) data;
383  discovery.registerPort(portData.getSwitchId(), portData.getPortNo());
384 
385  } else if (data instanceof NetworkDumpEndMarker) {
386  logger.info("End of network sync stream received");
387  stateTransition(State.MAIN);
388  } else {
389  reportInvalidEvent(data);
390  }
391  }
392 
393  private void dispatchOffline(Tuple tuple, InfoMessage infoMessage) {
394  logger.warn("Got input while in offline mode, it mean the possibility to try sync state");
395  watchDog.reset();
396  stateTransition(State.NEED_SYNC);
397  }
398 
399  private void dispatchMain(Tuple tuple, InfoMessage infoMessage) {
400  InfoData data = infoMessage.getData();
401  if (data instanceof SwitchInfoData) {
402  handleSwitchEvent(tuple, (SwitchInfoData) data);
403  passToTopologyEngine(tuple);
404  } else if (data instanceof PortInfoData) {
405  handlePortEvent(tuple, (PortInfoData) data);
406  passToTopologyEngine(tuple);
407  } else if (data instanceof IslInfoData) {
408  handleIslEvent(tuple, (IslInfoData) data, infoMessage.getCorrelationId());
409  } else if (data instanceof DiscoPacketSendingConfirmation) {
410  handleSentDiscoPacket((DiscoPacketSendingConfirmation) data);
411  } else {
412  reportInvalidEvent(data);
413  }
414  }
415 
416  private void stateTransition(State switchTo) {
417  logger.info("State transition to {} (current {})", switchTo, state);
418  state = switchTo;
419  }
420 
421  private void stateTransition(State switchTo, State onlyInState) {
422  if (state == onlyInState) {
423  stateTransition(switchTo);
424  }
425  }
426 
427  private void reportInvalidEvent(InfoData event) {
428  logger.error(
429  "Unhandled event: state={}, type={}", state,
430  event.getClass().getName());
431  }
432 
433  private void handleSwitchEvent(Tuple tuple, SwitchInfoData switchData) {
434  SwitchId switchId = switchData.getSwitchId();
435  String state = switchData.getState().toString();
436  logger.info("DISCO: Switch Event: switch={} state={}", switchId, state);
437 
438  if (SwitchState.DEACTIVATED.getType().equals(state)) {
439  // current logic: switch down means stop checking associated ports/links.
440  // - possible extra steps of validation of switch down should occur elsewhere
441  // - possible extra steps of generating link down messages aren't important since
442  // the TPE will drop the switch node from its graph.
443  discovery.handleSwitchDown(switchId);
444  } else if (SwitchState.ACTIVATED.getType().equals(state)) {
445  // It's possible that we get duplicated switch up events .. particulary if
446  // FL goes down and then comes back up; it'll rebuild its switch / port information.
447  // NB: need to account for this, and send along to TE to be conservative.
448  discovery.handleSwitchUp(switchId);
449  } else {
450  // TODO: Should this be a warning? Evaluate whether any other state needs to be handled
451  logger.warn("SWITCH Event: ignoring state: {}", state);
452  }
453  }
454 
458  private void passToTopologyEngine(Tuple tuple) {
459  String json = tuple.getString(0);
460  collector.emit(TOPO_ENG_STREAM, tuple, new Values(PAYLOAD, json));
461  }
462 
463  private void passToTopologyEngine(Tuple tuple, InfoMessage message) {
464  try {
465  String json = Utils.MAPPER.writeValueAsString(message);
466  collector.emit(TOPO_ENG_STREAM, tuple, new Values(PAYLOAD, json));
467  } catch (JsonProcessingException e) {
468  logger.error("Error during json processing", e);
469  }
470  }
471 
472  private void handlePortEvent(Tuple tuple, PortInfoData portData) {
473  final SwitchId switchId = portData.getSwitchId();
474  final int portId = portData.getPortNo();
475  String updown = portData.getState().toString();
476  logger.info("DISCO: Port Event: switch={} port={} state={}", switchId, portId, updown);
477 
478  if (isPortUpOrCached(updown)) {
479  discovery.handlePortUp(switchId, portId);
480  } else if (updown.equals(OfeMessageUtils.PORT_DOWN)) {
481  discovery.handlePortDown(switchId, portId);
482  } else {
483  // TODO: Should this be a warning? Evaluate whether any other state needs to be handled
484  logger.warn("PORT Event: ignoring state: {}", updown);
485  }
486  }
487 
488  private void handleIslEvent(Tuple tuple, IslInfoData discoveredIsl, String correlationId) {
489  PathNode srcNode = discoveredIsl.getPath().get(0);
490  final SwitchId srcSwitch = srcNode.getSwitchId();
491  final int srcPort = srcNode.getPortNo();
492 
493  PathNode dstNode = discoveredIsl.getPath().get(1);
494  final SwitchId dstSwitch = dstNode.getSwitchId();
495  final int dstPort = dstNode.getPortNo();
496 
497  IslChangeType state = discoveredIsl.getState();
498  boolean stateChanged = false;
499 
500  /*
501  * TODO: would be good to merge more of this behavior / business logic within DiscoveryManager
502  * The reason is so that we consolidate behavior related to Network Topology Discovery into
503  * one place.
504  */
505  if (IslChangeType.DISCOVERED.equals(state)) {
506  if (discovery.isIslMoved(srcSwitch, srcPort, dstSwitch, dstPort)) {
507  handleMovedIsl(tuple, srcSwitch, srcPort, dstSwitch, dstPort, correlationId);
508  }
509  stateChanged = discovery.handleDiscovered(srcSwitch, srcPort, dstSwitch, dstPort);
510  // If the state has changed, and since we've discovered one end of an ISL, let's make
511  // sure we can test the other side as well.
512  if (stateChanged && !discovery.isInDiscoveryPlan(dstSwitch, dstPort)) {
513  discovery.handlePortUp(dstSwitch, dstPort);
514  }
515  } else {
516  // TODO: Should this be a warning? Evaluate whether any other state needs to be handled
517  logger.warn("ISL Event: ignoring state: {}", state);
518  }
519 
520  if (stateChanged) {
521  // If the state changed, notify the TE.
522  logger.info("DISCO: ISL Event: switch={} port={} state={}", srcSwitch, srcPort, state);
523  passToTopologyEngine(tuple);
524  }
525  }
526 
527  // TODO: Who are some of the recipients of IslFail message? ie who are we emitting this to?
528  // - From a code search, we see these code bases refering to IslInfoData:
529  // - wfm/topology/cache
530  // - wfm/topology/islstats
531  // - simulator/bolts/SpeakerBolt
532  // - services/topology-engine/queue-engine/topologylistener/eventhandler.py
533  // - services/src/topology .. service/impl/IslServiceImpl .. service/IslService
534  // - services/src/topology .. messaging/kafka/KafkaMessageConsumer
535  // - services/src/pce .. NetworkCache .. FlowCache ..
536  private void sendDiscoveryFailed(SwitchId switchId, int portId, Tuple tuple, String correlationId)
537  throws IOException {
538  String discoFail = OfeMessageUtils.createIslFail(switchId, portId, correlationId);
539  // Values dataVal = new Values(PAYLOAD, discoFail, switchId, portId, OfeMessageUtils.LINK_DOWN);
540  // collector.emit(topoEngTopic, tuple, dataVal);
541  collector.emit(TOPO_ENG_STREAM, tuple, new Values(PAYLOAD, discoFail));
542  discovery.handleFailed(switchId, portId);
543  logger.warn("LINK: Send ISL discovery failure message={}", discoFail);
544  }
545 
546  private boolean isPortUpOrCached(String state) {
547  return OfeMessageUtils.PORT_UP.equals(state) || OfeMessageUtils.PORT_ADD.equals(state)
548  || PortChangeType.CACHED.getType().equals(state);
549  }
550 
551  private void enableDumpRequestTimer() {
552  long expireDelay = (int) (dumpRequestTimeout * 1000);
553  dumpRequestTimer = new Timer(expireDelay, expireDelay, TimeUnit.MILLISECONDS);
554  }
555 
556  private void handleMovedIsl(Tuple tuple, SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort,
557  String correlationId) {
558  NetworkEndpoint dstEndpoint = discovery.getLinkDestination(srcSwitch, srcPort);
559  logger.info("Link is moved from {}_{} - {}_{} to endpoint {}_{}", srcSwitch, srcPort,
560  dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), dstSwitch, dstPort);
561  // deactivate reverse link
562  discovery.deactivateLinkFromEndpoint(dstEndpoint);
563 
564  PathNode srcNode = new PathNode(srcSwitch, srcPort, 0);
565  PathNode dstNode = new PathNode(dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), 1);
566  IslInfoData infoData = new IslInfoData(Lists.newArrayList(srcNode, dstNode), IslChangeType.MOVED);
567  InfoMessage message = new InfoMessage(infoData, System.currentTimeMillis(), correlationId);
568  passToTopologyEngine(tuple, message);
569 
570  // we should send reverse link as well to modify status in TE
571  srcNode = new PathNode(dstEndpoint.getSwitchDpId(), dstEndpoint.getPortId(), 0);
572  dstNode = new PathNode(srcSwitch, srcPort, 1);
573  IslInfoData reverseLink = new IslInfoData(Lists.newArrayList(srcNode, dstNode), IslChangeType.MOVED);
574  message = new InfoMessage(reverseLink, System.currentTimeMillis(), correlationId);
575  passToTopologyEngine(tuple, message);
576  }
577 
578  private void handleSentDiscoPacket(DiscoPacketSendingConfirmation confirmation) {
579  logger.debug("Discovery packet is sent from {}", confirmation);
580  discovery.handleSentDiscoPacket(confirmation.getEndpoint());
581  }
582 
583  @Override
584  public void declareOutputFields(OutputFieldsDeclarer declarer) {
585  declarer.declareStream(SPEAKER_STREAM, new Fields("key", "message"));
586  declarer.declareStream(TOPO_ENG_STREAM, new Fields("key", "message"));
587  // FIXME(dbogun): use proper tuple format
588  declarer.declareStream(STREAM_ID_CTRL, AbstractTopology.fieldMessage);
589  }
590 
591  @Override
593  Set<DiscoveryLink> links = linksBySwitch.values()
594  .stream()
595  .flatMap(Set::stream)
596  .collect(Collectors.toSet());
597 
598  return new OFELinkBoltState(links, islFilter.getMatchSet());
599  }
600 
601  @Override
602  public String getCtrlStreamId() {
603  return STREAM_ID_CTRL;
604  }
605 
606  @Override
607  public void clearState() {
608  logger.info("ClearState request has been received.");
609  initState(new InMemoryKeyValueState<>());
610  }
611 
612  @Override
614 
615  Set<DiscoveryLink> filteredDiscoveryList = linksBySwitch.get(switchId);
616 
617  Set<DiscoveryLink> filterdIslFilter = islFilter.getMatchSet().stream()
618  .filter(node -> node.getSource().getSwitchDpId().equals(switchId))
619  .collect(Collectors.toSet());
620 
621 
622  return new OFELinkBoltState(filteredDiscoveryList, filterdIslFilter);
623  }
624 
625  @Override
626  public TopologyContext getContext() {
627  return context;
628  }
629 
630  @Override
631  public OutputCollector getOutput() {
632  return collector;
633  }
634 
635  @VisibleForTesting
636  enum State {
637  NEED_SYNC,
638  WAIT_SYNC,
639  SYNC_IN_PROGRESS,
640  OFFLINE,
641  MAIN
642  }
643 }
static final String PAYLOAD
Definition: Utils.java:57
static final ObjectMapper MAPPER
Definition: Utils.java:31
boolean isIslMoved(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)
boolean isInDiscoveryPlan(SwitchId switchId, int portId)
void handlePortUp(SwitchId switchId, int portId)
void handlePortDown(SwitchId switchId, int portId)
DiscoveryLink registerPort(SwitchId switchId, int portId)
void handleSentDiscoPacket(NetworkEndpoint endpoint)
void deactivateLinkFromEndpoint(NetworkEndpoint endpoint)
boolean handleFailed(SwitchId switchId, int portId)
def command(payload, fields)
Definition: share.py:102
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
Definition: CtrlAction.java:78
NetworkEndpoint getLinkDestination(SwitchId srcSwitch, int srcPort)
boolean handleDiscovered(SwitchId srcSwitch, int srcPort, SwitchId dstSwitch, int dstPort)