Open Kilda Java Documentation
CrudBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static java.lang.String.format;
19 import static org.openkilda.messaging.Utils.MAPPER;
22 
79 
80 import com.fasterxml.jackson.core.JsonProcessingException;
81 import com.google.common.annotations.VisibleForTesting;
82 import org.apache.storm.state.InMemoryKeyValueState;
83 import org.apache.storm.task.OutputCollector;
84 import org.apache.storm.task.TopologyContext;
85 import org.apache.storm.topology.OutputFieldsDeclarer;
86 import org.apache.storm.topology.base.BaseStatefulBolt;
87 import org.apache.storm.tuple.Fields;
88 import org.apache.storm.tuple.Tuple;
89 import org.apache.storm.tuple.Values;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
92 
93 import java.io.IOException;
94 import java.util.ArrayList;
95 import java.util.HashMap;
96 import java.util.HashSet;
97 import java.util.Iterator;
98 import java.util.List;
99 import java.util.Map;
100 import java.util.Optional;
101 import java.util.Set;
102 import java.util.UUID;
103 import java.util.stream.Collectors;
104 import java.util.stream.Stream;
105 import javax.annotation.Nullable;
106 
107 public class CrudBolt
108  extends BaseStatefulBolt<InMemoryKeyValueState<String, FlowCache>>
109  implements ICtrlBolt {
110 
111  public static final String FIELD_ID_FLOW_ID = Utils.FLOW_ID;
112  public static final String FIELD_ID_BIFLOW = "biflow";
113  public static final String FIELD_ID_MESSAGE = AbstractTopology.MESSAGE_FIELD;
114 
115  public static final String STREAM_ID_CTRL = "ctrl";
116  public static final Fields STREAM_FIELDS_VERIFICATION = new Fields(
118 
122  private static final Logger logger = LoggerFactory.getLogger(CrudBolt.class);
123 
127  private static final String FLOW_CACHE = "flow";
128 
132  private PathComputer pathComputer;
133  private final PathComputerAuth pathComputerAuth;
134 
138  private InMemoryKeyValueState<String, FlowCache> caches;
139 
140  private TopologyContext context;
141  private OutputCollector outputCollector;
142 
146  private FlowCache flowCache;
147 
148  private FlowValidator flowValidator;
149 
155  public CrudBolt(PathComputerAuth pathComputerAuth) {
156  this.pathComputerAuth = pathComputerAuth;
157  }
158 
162  @Override
163  public void initState(InMemoryKeyValueState<String, FlowCache> state) {
164  this.caches = state;
165 
166  // TODO - do we have to use InMemoryKeyValue, or is there some other InMemory option?
167  // The reason for the qestion .. we are only putting in one object.
168  flowCache = state.get(FLOW_CACHE);
169  if (flowCache == null) {
170  flowCache = new FlowCache();
171  this.caches.put(FLOW_CACHE, flowCache);
172  }
173  initFlowCache();
174 
175  flowValidator = new FlowValidator(flowCache);
176  }
177 
181  @Override
182  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
183  outputFieldsDeclarer.declareStream(StreamType.CREATE.toString(), AbstractTopology.fieldMessage);
184  outputFieldsDeclarer.declareStream(StreamType.UPDATE.toString(), AbstractTopology.fieldMessage);
185  outputFieldsDeclarer.declareStream(StreamType.DELETE.toString(), AbstractTopology.fieldMessage);
186  outputFieldsDeclarer.declareStream(StreamType.STATUS.toString(), AbstractTopology.fieldMessage);
187  outputFieldsDeclarer.declareStream(StreamType.RESPONSE.toString(), AbstractTopology.fieldMessage);
188  outputFieldsDeclarer.declareStream(StreamType.CACHE_SYNC.toString(), AbstractTopology.fieldMessage);
189  outputFieldsDeclarer.declareStream(StreamType.VERIFICATION.toString(), STREAM_FIELDS_VERIFICATION);
190  outputFieldsDeclarer.declareStream(StreamType.ERROR.toString(), FlowTopology.fieldsMessageErrorType);
191  // FIXME(dbogun): use proper tuple format
192  outputFieldsDeclarer.declareStream(STREAM_ID_CTRL, AbstractTopology.fieldMessage);
193  }
194 
198  @Override
199  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
200  this.context = topologyContext;
201  this.outputCollector = outputCollector;
202 
203  pathComputer = pathComputerAuth.getPathComputer();
204  }
205 
209  @Override
210  public void execute(Tuple tuple) {
211 
212  if (CtrlAction.boltHandlerEntrance(this, tuple)) {
213  return;
214  }
215 
216  ComponentType componentId = ComponentType.valueOf(tuple.getSourceComponent());
217  String correlationId = Utils.DEFAULT_CORRELATION_ID;
218 
219  StreamType streamId = null;
220  String flowId = null;
221  if (!componentId.equals(ComponentType.LCM_FLOW_SYNC_BOLT)) {
222  streamId = StreamType.valueOf(tuple.getSourceStreamId());
223  flowId = tuple.getStringByField(Utils.FLOW_ID);
224  }
225 
226  boolean isRecoverable = false;
227  try {
228  logger.debug("Request tuple={}", tuple);
229 
230  switch (componentId) {
231  case SPLITTER_BOLT:
232  Message msg = (Message) tuple.getValueByField(AbstractTopology.MESSAGE_FIELD);
233  correlationId = msg.getCorrelationId();
234 
235  CommandMessage cmsg = (msg instanceof CommandMessage) ? (CommandMessage) msg : null;
236  InfoMessage imsg = (msg instanceof InfoMessage) ? (InfoMessage) msg : null;
237 
238  logger.info("Flow request: {}={}, {}={}, component={}, stream={}",
239  Utils.CORRELATION_ID, correlationId, Utils.FLOW_ID, flowId, componentId, streamId);
240 
241  switch (streamId) {
242  case CREATE:
243  handleCreateRequest(cmsg, tuple);
244  break;
245  case UPDATE:
246  handleUpdateRequest(cmsg, tuple);
247  break;
248  case DELETE:
249  handleDeleteRequest(flowId, cmsg, tuple);
250  break;
251  case PUSH:
252  handlePushRequest(flowId, imsg, tuple);
253  break;
254  case UNPUSH:
255  handleUnpushRequest(flowId, imsg, tuple);
256  break;
257  case REROUTE:
258  handleRerouteRequest(cmsg, tuple);
259  break;
260  case CACHE_SYNC:
261  handleCacheSyncRequest(cmsg, tuple);
262  break;
263  case VERIFICATION:
264  handleVerificationRequest(tuple, flowId, cmsg);
265  break;
266  case READ:
267  handleReadRequest(flowId, cmsg, tuple);
268  break;
269  case DUMP:
270  handleDumpRequest(cmsg, tuple);
271  break;
272  default:
273 
274  logger.debug("Unexpected stream: component={}, stream={}", componentId, streamId);
275  break;
276  }
277  break;
278 
279  case SPEAKER_BOLT:
280  case TRANSACTION_BOLT:
281 
282  FlowState newStatus = (FlowState) tuple.getValueByField(FlowTopology.STATUS_FIELD);
283 
284  logger.info("Flow {} status {}: component={}, stream={}", flowId, newStatus, componentId, streamId);
285 
286  switch (streamId) {
287  case STATUS:
288  //TODO: SpeakerBolt & TransactionBolt don't supply a tuple with correlationId
289  handleStateRequest(flowId, newStatus, tuple, correlationId);
290  break;
291  default:
292  logger.debug("Unexpected stream: component={}, stream={}", componentId, streamId);
293  break;
294  }
295  break;
296 
297  case TOPOLOGY_ENGINE_BOLT:
298 
299  ErrorMessage errorMessage = (ErrorMessage) tuple.getValueByField(AbstractTopology.MESSAGE_FIELD);
300 
301  logger.info("Flow {} error: component={}, stream={}", flowId, componentId, streamId);
302 
303  switch (streamId) {
304  case STATUS:
305  handleErrorRequest(flowId, errorMessage, tuple);
306  break;
307  default:
308  logger.debug("Unexpected stream: component={}, stream={}", componentId, streamId);
309  break;
310  }
311  break;
312 
313  case LCM_FLOW_SYNC_BOLT:
314  logger.debug("Got network dump from TE");
315 
316  NetworkInfoData networkDump = (NetworkInfoData) tuple.getValueByField(
318  handleFlowSync(networkDump);
319  break;
320 
321  default:
322  logger.debug("Unexpected component: {}", componentId);
323  break;
324  }
325  } catch (RecoverableException e) {
326  // FIXME(surabujin): implement retry limit
327  logger.error(
328  "Recoverable error (do not try to recoverable it until retry limit will be implemented): {}", e);
329  // isRecoverable = true;
330 
331  } catch (CacheException exception) {
332  String logMessage = format("%s: %s", exception.getErrorMessage(), exception.getErrorDescription());
333  logger.error("{}, {}={}, {}={}, component={}, stream={}", logMessage, Utils.CORRELATION_ID,
334  correlationId, Utils.FLOW_ID, flowId, componentId, streamId, exception);
335 
336  ErrorMessage errorMessage = buildErrorMessage(correlationId, exception.getErrorType(),
337  logMessage, componentId.toString().toLowerCase());
338 
339  Values error = new Values(errorMessage, exception.getErrorType());
340  outputCollector.emit(StreamType.ERROR.toString(), tuple, error);
341 
342  } catch (IOException exception) {
343  logger.error("Could not deserialize message {}", tuple, exception);
344 
345  } catch (Exception e) {
346  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
347 
348  } finally {
349  outputCollector.ack(tuple);
350 
351  logger.debug("Command message ack: component={}, stream={}, tuple={}",
352  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple);
353 
354  if (isRecoverable) {
355  outputCollector.fail(tuple);
356  } else {
357  outputCollector.ack(tuple);
358  }
359  }
360  }
361 
362  private void handleCacheSyncRequest(CommandMessage message, Tuple tuple) {
363  logger.debug("CACHE SYNCE: {}", message);
364 
365  // NB: This is going to be a "bulky" operation - get all flows from DB, and synchronize with the cache.
366 
367  List<String> droppedFlows = new ArrayList<>();
368  List<String> addedFlows = new ArrayList<>();
369  List<String> modifiedFlowChanges = new ArrayList<>();
370  List<String> modifiedFlowIds = new ArrayList<>();
371  List<String> unchangedFlows = new ArrayList<>();
372 
373  List<FlowInfo> flowInfos = pathComputer.getFlowInfo();
374 
375  // Instead of determining left/right .. store based on flowid_& cookie
376  HashMap<String, FlowInfo> flowToInfo = new HashMap<>();
377  for (FlowInfo fi : flowInfos) {
378  flowToInfo.put(fi.getFlowId() + fi.getCookie(), fi);
379  }
380 
381  // We first look at comparing what is in the DB to what is in the Cache
382  for (FlowInfo fi : flowInfos) {
383  String flowid = fi.getFlowId();
384  if (flowCache.cacheContainsFlow(flowid)) {
385  // TODO: better, more holistic comparison
386  // TODO: if the flow is modified, then just leverage drop / add primitives.
387  // TODO: Ensure that the DB is always the source of truth - cache and db ops part of transaction.
388  // Need to compare both sides
389  ImmutablePair<Flow, Flow> fc = flowCache.getFlow(flowid);
390 
391  final int count = modifiedFlowChanges.size();
392  if (fi.getCookie() != fc.left.getCookie() && fi.getCookie() != fc.right.getCookie()) {
393  modifiedFlowChanges
394  .add("cookie: " + flowid + ":" + fi.getCookie() + ":" + fc.left.getCookie() + ":" + fc.right
395  .getCookie());
396  }
397  if (fi.getMeterId() != fc.left.getMeterId() && fi.getMeterId() != fc.right.getMeterId()) {
398  modifiedFlowChanges
399  .add("meter: " + flowid + ":" + fi.getMeterId() + ":" + fc.left.getMeterId() + ":"
400  + fc.right.getMeterId());
401  }
402  if (fi.getTransitVlanId() != fc.left.getTransitVlan() && fi.getTransitVlanId() != fc.right
403  .getTransitVlan()) {
404  modifiedFlowChanges
405  .add("transit: " + flowid + ":" + fi.getTransitVlanId() + ":" + fc.left.getTransitVlan()
406  + ":" + fc.right.getTransitVlan());
407  }
408  if (!fi.getSrcSwitchId().equals(fc.left.getSourceSwitch()) && !fi.getSrcSwitchId()
409  .equals(fc.right.getSourceSwitch())) {
410  modifiedFlowChanges
411  .add("switch: " + flowid + "|" + fi.getSrcSwitchId() + "|" + fc.left.getSourceSwitch() + "|"
412  + fc.right.getSourceSwitch());
413  }
414 
415  if (count == modifiedFlowChanges.size()) {
416  unchangedFlows.add(flowid);
417  } else {
418  modifiedFlowIds.add(flowid);
419  }
420  } else {
421  // TODO: need to get the flow from the DB and add it properly
422  addedFlows.add(flowid);
423 
424  }
425  }
426 
427  // Now we see if the cache holds things not in the DB
428  for (ImmutablePair<Flow, Flow> flow : flowCache.dumpFlows()) {
429  String key = flow.left.getFlowId() + flow.left.getCookie();
430  // compare the left .. if it is in, then check the right .. o/w remove it (no need to check right
431  if (!flowToInfo.containsKey(key)) {
432  droppedFlows.add(flow.left.getFlowId());
433  } else {
434  key = flow.right.getFlowId() + flow.right.getCookie();
435  if (!flowToInfo.containsKey(key)) {
436  droppedFlows.add(flow.right.getFlowId());
437  }
438  }
439  }
440 
441  FlowCacheSyncRequest request = (FlowCacheSyncRequest) message.getData();
442  if (request.getSynchronizeCache() == SynchronizeCacheAction.SYNCHRONIZE_CACHE) {
443  synchronizeCache(addedFlows, modifiedFlowIds, droppedFlows, tuple, message.getCorrelationId());
444  } else if (request.getSynchronizeCache() == SynchronizeCacheAction.INVALIDATE_CACHE) {
445  invalidateCache(addedFlows, modifiedFlowIds, droppedFlows, tuple, message.getCorrelationId());
446  }
447 
448  FlowCacheSyncResults results = new FlowCacheSyncResults(
449  droppedFlows, addedFlows, modifiedFlowChanges, unchangedFlows);
450  Values northbound = new Values(new InfoMessage(new FlowCacheSyncResponse(results),
451  message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
452  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
453  }
454 
455  private void handleVerificationRequest(Tuple tuple, String flowId, CommandMessage message) {
456  ImmutablePair<Flow, Flow> flowPair = flowCache.getFlow(flowId);
457  BidirectionalFlow biFlow = new BidirectionalFlow(flowPair);
458 
459  outputCollector.emit(StreamType.VERIFICATION.toString(), tuple, new Values(flowId, biFlow, message));
460  }
461 
465  private void synchronizeCache(List<String> addedFlowIds, List<String> modifiedFlowIds, List<String> droppedFlowIds,
466  Tuple tuple, String correlationId) {
467  logger.info("Synchronizing the flow cache data: {} dropped, {} added, {} modified.",
468  droppedFlowIds.size(), addedFlowIds.size(), modifiedFlowIds.size());
469 
470  deleteFromCache(droppedFlowIds, tuple, correlationId);
471 
472  // override added/modified flows in the cache
473  Stream.concat(addedFlowIds.stream(), modifiedFlowIds.stream())
474  .map(pathComputer::getFlows)
475  .filter(flows -> !flows.isEmpty())
476  .map(flows -> {
477  FlowCollector flowPair = new FlowCollector();
478  flows.forEach(flowPair::add);
479  return flowPair;
480  })
481  .forEach(flowPair -> {
482  final BidirectionalFlow bidirectionalFlow = flowPair.make();
483  final ImmutablePair<Flow, Flow> flow = new ImmutablePair<>(
484  bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
485  final String flowId = flow.getLeft().getFlowId();
486  logger.debug("Refresh the flow: {}", flowId);
487 
488  flowCache.pushFlow(flow);
489 
490  // propagate updates further
491  emitCacheSyncInfoMessage(flowId, flow, tuple, correlationId);
492  });
493  }
494 
498  private void invalidateCache(List<String> addedFlowIds, List<String> modifiedFlowIds, List<String> droppedFlowIds,
499  Tuple tuple, String correlationId) {
500  logger.info("Invalidating the flow cache data: {} dropped, {} added, {} modified.",
501  droppedFlowIds.size(), addedFlowIds.size(), modifiedFlowIds.size());
502 
503  deleteFromCache(droppedFlowIds, tuple, correlationId);
504 
505  initFlowCache();
506 
507  // propagate updates further
508  flowCache.dumpFlows()
509  .forEach(flow -> {
510  final String flowId = flow.getLeft().getFlowId();
511  logger.debug("Refresh the flow: {}", flowId);
512 
513  emitCacheSyncInfoMessage(flowId, flow, tuple, correlationId);
514  });
515  }
516 
520  private void deleteFromCache(List<String> droppedFlowIds, Tuple tuple, String correlationId) {
521  droppedFlowIds.forEach(flowId -> {
522  logger.debug("Delete the flow: {}", flowId);
523 
524  flowCache.removeFlow(flowId);
525 
526  emitCacheSyncInfoMessage(flowId, null, tuple, correlationId);
527  });
528  }
529 
530  private void emitCacheSyncInfoMessage(String flowId, @Nullable ImmutablePair<Flow, Flow> flow,
531  Tuple tuple, String correlationId) {
532  String subCorrelationId = format("%s-%s", correlationId, flowId);
533  FlowInfoData data = new FlowInfoData(flowId, flow, FlowOperation.CACHE, subCorrelationId);
534  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), subCorrelationId);
535 
536  try {
537  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
538  outputCollector.emit(StreamType.CACHE_SYNC.toString(), tuple, topology);
539  } catch (JsonProcessingException e) {
540  logger.error("Unable to serialize the message: {}", infoMessage);
541  }
542  }
543 
544  private void handlePushRequest(String flowId, InfoMessage message, Tuple tuple) throws IOException {
545  logger.info("PUSH flow: {} :: {}", flowId, message);
546  FlowInfoData fid = (FlowInfoData) message.getData();
547  ImmutablePair<Flow, Flow> flow = fid.getPayload();
548 
549  flowCache.pushFlow(flow);
550 
551  // Update Cache
552  FlowInfoData data = new FlowInfoData(flow.getLeft().getFlowId(), flow, FlowOperation.PUSH,
553  message.getCorrelationId());
554  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), message.getCorrelationId());
555  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
556  outputCollector.emit(StreamType.CREATE.toString(), tuple, topology);
557 
558  Values northbound = new Values(new InfoMessage(new FlowStatusResponse(
559  new FlowIdStatusPayload(flowId, FlowState.UP)), message.getTimestamp(),
560  message.getCorrelationId(), Destination.NORTHBOUND));
561  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
562  }
563 
564  private void handleUnpushRequest(String flowId, InfoMessage message, Tuple tuple) throws IOException {
565  logger.info("UNPUSH flow: {} :: {}", flowId, message);
566 
567  ImmutablePair<Flow, Flow> flow = flowCache.deleteFlow(flowId);
568 
569  // Update Cache
570  FlowInfoData data = new FlowInfoData(flowId, flow, FlowOperation.UNPUSH, message.getCorrelationId());
571  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), message.getCorrelationId());
572  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
573  outputCollector.emit(StreamType.DELETE.toString(), tuple, topology);
574 
575 
576  Values northbound = new Values(new InfoMessage(new FlowStatusResponse(
577  new FlowIdStatusPayload(flowId, FlowState.DOWN)),
578  message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
579  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
580  }
581 
582 
583  private void handleDeleteRequest(String flowId, CommandMessage message, Tuple tuple) throws IOException {
584  ImmutablePair<Flow, Flow> flow = flowCache.deleteFlow(flowId);
585 
586  logger.info("Deleted flow: {}", flowId);
587 
588  FlowInfoData data = new FlowInfoData(flowId, flow, DELETE, message.getCorrelationId());
589  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), message.getCorrelationId());
590  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
591  outputCollector.emit(StreamType.DELETE.toString(), tuple, topology);
592 
593  Values northbound = new Values(new InfoMessage(new FlowResponse(buildFlowResponse(flow)),
594  message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
595  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
596  }
597 
598  private void handleCreateRequest(CommandMessage message, Tuple tuple) throws IOException, RecoverableException {
599  Flow requestedFlow = ((FlowCreateRequest) message.getData()).getPayload();
600 
601  ImmutablePair<PathInfoData, PathInfoData> path;
602  try {
603  flowValidator.validate(requestedFlow);
604 
605  path = pathComputer.getPath(requestedFlow, Strategy.COST);
606  logger.info("Creating flow {}. Found path: {}, correlationId: {}", requestedFlow.getFlowId(), path,
607  message.getCorrelationId());
608 
609  } catch (FlowValidationException e) {
610  throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
611  ErrorType.ALREADY_EXISTS, "Could not create flow", e.getMessage());
612  } catch (UnroutablePathException e) {
613  throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
614  ErrorType.NOT_FOUND, "Could not create flow",
615  "Not enough bandwidth found or path not found");
616  }
617 
618  ImmutablePair<Flow, Flow> flow = flowCache.createFlow(requestedFlow, path);
619  logger.info("Created flow: {}, correlationId: {}", flow, message.getCorrelationId());
620 
621  FlowInfoData data = new FlowInfoData(requestedFlow.getFlowId(), flow, FlowOperation.CREATE,
622  message.getCorrelationId());
623  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), message.getCorrelationId());
624  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
625  outputCollector.emit(StreamType.CREATE.toString(), tuple, topology);
626 
627  Values northbound = new Values(new InfoMessage(new FlowResponse(buildFlowResponse(flow)),
628  message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
629  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
630  }
631 
632  private void handleRerouteRequest(CommandMessage message, Tuple tuple) throws IOException, RecoverableException {
633  FlowRerouteRequest request = (FlowRerouteRequest) message.getData();
634  final String flowId = request.getFlowId();
635  final String correlationId = message.getCorrelationId();
636  logger.warn("Handling reroute request with correlationId {}", correlationId);
637 
638  ImmutablePair<Flow, Flow> flow = flowCache.getFlow(flowId);
639  switch (request.getOperation()) {
640  case UPDATE:
641  final Flow flowForward = flow.getLeft();
642 
643  try {
644  logger.warn("Origin flow {} path: {} correlationId {}", flowId, flowForward.getFlowPath(),
645  correlationId);
646  AvailableNetwork network = pathComputer.getAvailableNetwork(flowForward.isIgnoreBandwidth(),
647  flowForward.getBandwidth());
648  network.addIslsOccupiedByFlow(flowId, flowForward.isIgnoreBandwidth(), flowForward.getBandwidth());
649  ImmutablePair<PathInfoData, PathInfoData> path =
650  pathComputer.getPath(flow.getLeft(), network, Strategy.COST);
651  logger.warn("Potential New Path for flow {} with LEFT path: {}, RIGHT path: {} correlationId {}",
652  flowId, path.getLeft(), path.getRight(), correlationId);
653  boolean isFoundNewPath = (
654  !path.getLeft().equals(flow.getLeft().getFlowPath())
655  || !path.getRight().equals(flow.getRight().getFlowPath())
656  || !isFlowActive(flow));
657  //no need to emit changes if path wasn't changed and flow is active.
658  //force means to update flow even if path is not changed.
659  if (isFoundNewPath || request.isForce()) {
660  flow.getLeft().setState(FlowState.DOWN);
661  flow.getRight().setState(FlowState.DOWN);
662 
663  flow = flowCache.updateFlow(flow.getLeft(), path);
664  logger.warn("Rerouted flow with new path: {}, correlationId {}", flow, correlationId);
665 
666  FlowInfoData data = new FlowInfoData(flowId, flow, UPDATE, correlationId);
667  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), correlationId);
668  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
669  outputCollector.emit(StreamType.UPDATE.toString(), tuple, topology);
670  } else {
671  logger.warn("Reroute {} is unsuccessful: can't find new path. CorrelationId: {}",
672  flowId, correlationId);
673  }
674 
675  logger.debug("Sending response to NB. Correlation id {}", correlationId);
676  FlowRerouteResponse response = new FlowRerouteResponse(flow.left.getFlowPath(), isFoundNewPath);
677  Values values = new Values(new InfoMessage(response, message.getTimestamp(),
678  correlationId, Destination.NORTHBOUND));
679  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, values);
680  } catch (UnroutablePathException e) {
681  logger.warn("There is no path available for the flow {}, correlationId: {}", flowId,
682  correlationId);
683  flow.getLeft().setState(FlowState.DOWN);
684  flow.getRight().setState(FlowState.DOWN);
685  throw new MessageException(correlationId, System.currentTimeMillis(),
686  ErrorType.UPDATE_FAILURE, "Could not reroute flow", "Path was not found");
687  }
688  break;
689 
690  case CREATE:
691  logger.warn("State flow: {}={}, correlationId: {}", flowId, FlowState.UP, correlationId);
692  flow.getLeft().setState(FlowState.UP);
693  flow.getRight().setState(FlowState.UP);
694  break;
695 
696  case DELETE:
697  logger.warn("State flow: {}={}, correlationId: {}", flowId, FlowState.DOWN, correlationId);
698  flow.getLeft().setState(FlowState.DOWN);
699  flow.getRight().setState(FlowState.DOWN);
700  break;
701 
702  default:
703  logger.warn("Flow {} undefined reroute operation", request.getOperation());
704  break;
705  }
706  }
707 
708  private void handleUpdateRequest(CommandMessage message, Tuple tuple) throws IOException, RecoverableException {
709  Flow requestedFlow = ((FlowUpdateRequest) message.getData()).getPayload();
710  String correlationId = message.getCorrelationId();
711 
712  ImmutablePair<PathInfoData, PathInfoData> path;
713  try {
714  flowValidator.validate(requestedFlow);
715 
716  AvailableNetwork network = pathComputer.getAvailableNetwork(requestedFlow.isIgnoreBandwidth(),
717  requestedFlow.getBandwidth());
718  network.addIslsOccupiedByFlow(requestedFlow.getFlowId(),
719  requestedFlow.isIgnoreBandwidth(), requestedFlow.getBandwidth());
720  path = pathComputer.getPath(requestedFlow, network, Strategy.COST);
721  logger.info("Updated flow path: {}, correlationId {}", path, correlationId);
722 
723  } catch (FlowValidationException e) {
724  throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
725  ErrorType.ALREADY_EXISTS, "Could not update flow", e.getMessage());
726  } catch (UnroutablePathException e) {
727  throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
728  ErrorType.NOT_FOUND, "Could not update flow", "Path was not found");
729  }
730 
731  ImmutablePair<Flow, Flow> flow = flowCache.updateFlow(requestedFlow, path);
732  logger.info("Updated flow: {}, correlationId {}", flow, correlationId);
733 
734  FlowInfoData data = new FlowInfoData(requestedFlow.getFlowId(), flow, UPDATE,
735  message.getCorrelationId());
736  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), message.getCorrelationId());
737  Values topology = new Values(MAPPER.writeValueAsString(infoMessage));
738  outputCollector.emit(StreamType.UPDATE.toString(), tuple, topology);
739 
740  Values northbound = new Values(new InfoMessage(new FlowResponse(buildFlowResponse(flow)),
741  message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
742  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
743  }
744 
745  private void handleDumpRequest(CommandMessage message, Tuple tuple) {
746  List<BidirectionalFlow> flows = flowCache.dumpFlows().stream()
747  .map(BidirectionalFlow::new)
748  .collect(Collectors.toList());
749 
750  logger.debug("Dump flows: found {} items", flows.size());
751 
752  String requestId = message.getCorrelationId();
753  if (flows.isEmpty()) {
754  Message response = new ChunkedInfoMessage(null, System.currentTimeMillis(), requestId, null);
755  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, new Values(response));
756  } else {
757  Iterator<BidirectionalFlow> iterator = flows.iterator();
758  do {
759  BidirectionalFlow flow = iterator.next();
760  String nextRequestId = iterator.hasNext() ? UUID.randomUUID().toString() : null;
761 
762  Message response = new ChunkedInfoMessage(
763  new FlowReadResponse(flow), System.currentTimeMillis(), requestId, nextRequestId);
764  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, new Values(response));
765  requestId = nextRequestId;
766  } while (iterator.hasNext());
767  }
768  }
769 
770  private void handleReadRequest(String flowId, CommandMessage message, Tuple tuple) {
771  BidirectionalFlow flow = new BidirectionalFlow(flowCache.getFlow(flowId));
772 
773  logger.debug("Got bidirectional flow: {}, correlationId {}", flow, message.getCorrelationId());
774 
775  Values northbound = new Values(
776  new InfoMessage(
777  new FlowReadResponse(flow),
778  message.getTimestamp(),
779  message.getCorrelationId(),
780  Destination.NORTHBOUND));
781  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
782  }
783 
790  private void handleStateRequest(String flowId, FlowState state, Tuple tuple, String correlationId)
791  throws IOException {
792  ImmutablePair<Flow, Flow> flow = flowCache.getFlow(flowId);
793  logger.info("State flow: {}={}", flowId, state);
794  flow.getLeft().setState(state);
795  flow.getRight().setState(state);
796 
797  FlowInfoData data = new FlowInfoData(flowId, flow, FlowOperation.STATE, correlationId);
798  InfoMessage infoMessage = new InfoMessage(data, System.currentTimeMillis(), correlationId);
799 
800  Values topology = new Values(Utils.MAPPER.writeValueAsString(infoMessage));
801  outputCollector.emit(StreamType.STATUS.toString(), tuple, topology);
802 
803  }
804 
805  private void handleErrorRequest(String flowId, ErrorMessage message, Tuple tuple) throws IOException {
806  ErrorType errorType = message.getData().getErrorType();
807  message.getData().setErrorDescription("topology-engine internal error");
808 
809  logger.info("Flow {} {} failure", errorType, flowId);
810 
811  switch (errorType) {
812  case CREATION_FAILURE:
813  flowCache.removeFlow(flowId);
814  break;
815 
816  case UPDATE_FAILURE:
817  handleStateRequest(flowId, FlowState.DOWN, tuple, message.getCorrelationId());
818  break;
819 
820  case DELETION_FAILURE:
821  break;
822 
823  case INTERNAL_ERROR:
824  break;
825 
826  default:
827  logger.warn("Flow {} undefined failure", flowId);
828 
829  }
830 
831  Values error = new Values(message, errorType);
832  outputCollector.emit(StreamType.ERROR.toString(), tuple, error);
833  }
834 
835  private void handleFlowSync(NetworkInfoData networkDump) {
836  Set<ImmutablePair<Flow, Flow>> flows = networkDump.getFlows();
837 
838  logger.info("Load flows {}", flows.size());
839  flows.forEach(flowCache::putFlow);
840  }
841 
848  private Flow buildFlowResponse(ImmutablePair<Flow, Flow> flow) {
849  Flow response = new Flow(flow.left);
850  response.setCookie(response.getCookie() & ResourceCache.FLOW_COOKIE_VALUE_MASK);
851  return response;
852  }
853 
854  private ErrorMessage buildErrorMessage(String correlationId, ErrorType type, String message, String description) {
855  return new ErrorMessage(new ErrorData(type, message, description),
856  System.currentTimeMillis(), correlationId, Destination.NORTHBOUND);
857  }
858 
859  private boolean isFlowActive(ImmutablePair<Flow, Flow> flowPair) {
860  return flowPair.getLeft().getState().isActive() && flowPair.getRight().getState().isActive();
861  }
862 
863  private void initFlowCache() {
864  PathComputerFlowFetcher flowFetcher = new PathComputerFlowFetcher(pathComputer);
865 
866  for (BidirectionalFlow bidirectionalFlow : flowFetcher.getFlows()) {
867  ImmutablePair<Flow, Flow> flowPair = new ImmutablePair<>(
868  bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
869  flowCache.pushFlow(flowPair);
870  }
871  }
872 
873  @Override
875  FlowDump flowDump = new FlowDump(flowCache.dumpFlows());
876  return new CrudBoltState(flowDump);
877  }
878 
879  @VisibleForTesting
880  @Override
881  public void clearState() {
882  logger.info("State clear request from test");
883  initState(new InMemoryKeyValueState<>());
884  }
885 
886  @Override
888  // Not implemented
889  return new CrudBoltState(new FlowDump(new HashSet<>()));
890  }
891 
892 
893  @Override
894  public String getCtrlStreamId() {
895  return STREAM_ID_CTRL;
896  }
897 
898  @Override
899  public TopologyContext getContext() {
900  return context;
901  }
902 
903  @Override
904  public OutputCollector getOutput() {
905  return outputCollector;
906  }
907 
908  @Override
909  public Optional<AbstractDumpState> dumpResorceCacheState() {
910  return Optional.of(new ResorceCacheBoltState(
911  flowCache.getAllocatedMeters(),
912  flowCache.getAllocatedVlans(),
913  flowCache.getAllocatedCookies()));
914  }
915 }
void pushFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:106
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
Definition: CrudBolt.java:182
ImmutablePair< Flow, Flow > removeFlow(String flowId)
Definition: FlowCache.java:97
static final ObjectMapper MAPPER
Definition: Utils.java:31
Optional< AbstractDumpState > dumpResorceCacheState()
Definition: CrudBolt.java:909
ImmutablePair< Flow, Flow > getFlow(String flowId)
Definition: FlowCache.java:247
ImmutablePair< PathInfoData, PathInfoData > getPath(Flow flow, AvailableNetwork network, Strategy strategy)
Map< SwitchId, Set< Integer > > getAllocatedMeters()
Definition: FlowCache.java:549
default List< FlowInfo > getFlowInfo()
Set< Integer > getAllocatedVlans()
Definition: FlowCache.java:541
int count
Definition: generator.py:19
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
Definition: CrudBolt.java:199
description
Definition: setup.py:26
Set< ImmutablePair< Flow, Flow > > dumpFlows()
Definition: FlowCache.java:342
static final String DEFAULT_CORRELATION_ID
Definition: Utils.java:69
ImmutablePair< Flow, Flow > updateFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
Definition: FlowCache.java:312
static final String CORRELATION_ID
Definition: Utils.java:43
AvailableNetwork getAvailableNetwork(boolean ignoreBandwidth, long requestedBandwidth)
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
Definition: CtrlAction.java:78
ImmutablePair< Flow, Flow > deleteFlow(String flowId)
Definition: FlowCache.java:291
ImmutablePair< Flow, Flow > createFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
Definition: FlowCache.java:268
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
Definition: CrudBolt.java:887
CrudBolt(PathComputerAuth pathComputerAuth)
Definition: CrudBolt.java:155
Set< Integer > getAllocatedCookies()
Definition: FlowCache.java:545
void addIslsOccupiedByFlow(String flowId, boolean ignoreBandwidth, long flowBandwidth)
static final String FLOW_ID
Definition: Utils.java:61
void initState(InMemoryKeyValueState< String, FlowCache > state)
Definition: CrudBolt.java:163
boolean cacheContainsFlow(String flowId)
Definition: FlowCache.java:117