16 package org.openkilda.wfm.topology.flow.bolts;
18 import static java.lang.String.format;
80 import com.fasterxml.jackson.core.JsonProcessingException;
81 import com.google.common.annotations.VisibleForTesting;
82 import org.apache.storm.state.InMemoryKeyValueState;
83 import org.apache.storm.task.OutputCollector;
84 import org.apache.storm.task.TopologyContext;
85 import org.apache.storm.topology.OutputFieldsDeclarer;
86 import org.apache.storm.topology.base.BaseStatefulBolt;
87 import org.apache.storm.tuple.Fields;
88 import org.apache.storm.tuple.Tuple;
89 import org.apache.storm.tuple.Values;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
93 import java.io.IOException;
94 import java.util.ArrayList;
95 import java.util.HashMap;
96 import java.util.HashSet;
97 import java.util.Iterator;
98 import java.util.List;
100 import java.util.Optional;
101 import java.util.Set;
102 import java.util.UUID;
103 import java.util.stream.Collectors;
104 import java.util.stream.Stream;
105 import javax.annotation.Nullable;
108 extends BaseStatefulBolt<InMemoryKeyValueState<String, FlowCache>>
122 private static final Logger logger = LoggerFactory.getLogger(
CrudBolt.class);
127 private static final String FLOW_CACHE =
"flow";
138 private InMemoryKeyValueState<String, FlowCache> caches;
140 private TopologyContext context;
141 private OutputCollector outputCollector;
156 this.pathComputerAuth = pathComputerAuth;
163 public void initState(InMemoryKeyValueState<String, FlowCache> state) {
168 flowCache = state.get(FLOW_CACHE);
169 if (flowCache == null) {
171 this.caches.put(FLOW_CACHE, flowCache);
199 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
200 this.context = topologyContext;
201 this.outputCollector = outputCollector;
220 String flowId = null;
222 streamId =
StreamType.valueOf(tuple.getSourceStreamId());
226 boolean isRecoverable =
false;
228 logger.debug(
"Request tuple={}", tuple);
230 switch (componentId) {
238 logger.info(
"Flow request: {}={}, {}={}, component={}, stream={}",
243 handleCreateRequest(cmsg, tuple);
246 handleUpdateRequest(cmsg, tuple);
249 handleDeleteRequest(flowId, cmsg, tuple);
252 handlePushRequest(flowId, imsg, tuple);
255 handleUnpushRequest(flowId, imsg, tuple);
258 handleRerouteRequest(cmsg, tuple);
261 handleCacheSyncRequest(cmsg, tuple);
264 handleVerificationRequest(tuple, flowId, cmsg);
267 handleReadRequest(flowId, cmsg, tuple);
270 handleDumpRequest(cmsg, tuple);
274 logger.debug(
"Unexpected stream: component={}, stream={}", componentId, streamId);
280 case TRANSACTION_BOLT:
284 logger.info(
"Flow {} status {}: component={}, stream={}", flowId, newStatus, componentId, streamId);
289 handleStateRequest(flowId, newStatus, tuple, correlationId);
292 logger.debug(
"Unexpected stream: component={}, stream={}", componentId, streamId);
297 case TOPOLOGY_ENGINE_BOLT:
301 logger.info(
"Flow {} error: component={}, stream={}", flowId, componentId, streamId);
305 handleErrorRequest(flowId, errorMessage, tuple);
308 logger.debug(
"Unexpected stream: component={}, stream={}", componentId, streamId);
313 case LCM_FLOW_SYNC_BOLT:
314 logger.debug(
"Got network dump from TE");
318 handleFlowSync(networkDump);
322 logger.debug(
"Unexpected component: {}", componentId);
328 "Recoverable error (do not try to recoverable it until retry limit will be implemented): {}", e);
333 logger.error(
"{}, {}={}, {}={}, component={}, stream={}", logMessage,
Utils.
CORRELATION_ID,
334 correlationId,
Utils.
FLOW_ID, flowId, componentId, streamId, exception);
337 logMessage, componentId.toString().toLowerCase());
339 Values error =
new Values(errorMessage, exception.
getErrorType());
342 }
catch (IOException exception) {
343 logger.error(
"Could not deserialize message {}", tuple, exception);
345 }
catch (Exception e) {
346 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
349 outputCollector.ack(tuple);
351 logger.debug(
"Command message ack: component={}, stream={}, tuple={}",
352 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple);
355 outputCollector.fail(tuple);
357 outputCollector.ack(tuple);
362 private void handleCacheSyncRequest(
CommandMessage message, Tuple tuple) {
363 logger.debug(
"CACHE SYNCE: {}", message);
367 List<String> droppedFlows =
new ArrayList<>();
368 List<String> addedFlows =
new ArrayList<>();
369 List<String> modifiedFlowChanges =
new ArrayList<>();
370 List<String> modifiedFlowIds =
new ArrayList<>();
371 List<String> unchangedFlows =
new ArrayList<>();
373 List<FlowInfo> flowInfos = pathComputer.
getFlowInfo();
376 HashMap<String, FlowInfo> flowToInfo =
new HashMap<>();
378 flowToInfo.put(fi.getFlowId() + fi.getCookie(), fi);
382 for (FlowInfo fi : flowInfos) {
383 String flowid = fi.getFlowId();
391 final int count = modifiedFlowChanges.size();
392 if (fi.getCookie() != fc.
left.getCookie() && fi.getCookie() != fc.
right.getCookie()) {
394 .add(
"cookie: " + flowid +
":" + fi.getCookie() +
":" + fc.
left.getCookie() +
":" + fc.
right 397 if (fi.getMeterId() != fc.
left.getMeterId() && fi.getMeterId() != fc.
right.getMeterId()) {
399 .add(
"meter: " + flowid +
":" + fi.getMeterId() +
":" + fc.
left.getMeterId() +
":" 400 + fc.
right.getMeterId());
402 if (fi.getTransitVlanId() != fc.
left.getTransitVlan() && fi.getTransitVlanId() != fc.
right 405 .add(
"transit: " + flowid +
":" + fi.getTransitVlanId() +
":" + fc.
left.getTransitVlan()
406 +
":" + fc.
right.getTransitVlan());
408 if (!fi.getSrcSwitchId().equals(fc.
left.getSourceSwitch()) && !fi.getSrcSwitchId()
409 .equals(fc.
right.getSourceSwitch())) {
411 .add(
"switch: " + flowid +
"|" + fi.getSrcSwitchId() +
"|" + fc.
left.getSourceSwitch() +
"|" 412 + fc.
right.getSourceSwitch());
415 if (
count == modifiedFlowChanges.size()) {
416 unchangedFlows.add(flowid);
418 modifiedFlowIds.add(flowid);
422 addedFlows.add(flowid);
428 for (ImmutablePair<Flow, Flow> flow : flowCache.
dumpFlows()) {
429 String key = flow.left.getFlowId() + flow.left.getCookie();
431 if (!flowToInfo.containsKey(key)) {
432 droppedFlows.add(flow.left.getFlowId());
434 key = flow.right.getFlowId() + flow.right.getCookie();
435 if (!flowToInfo.containsKey(key)) {
436 droppedFlows.add(flow.right.getFlowId());
441 FlowCacheSyncRequest request = (FlowCacheSyncRequest) message.
getData();
442 if (request.getSynchronizeCache() == SynchronizeCacheAction.SYNCHRONIZE_CACHE) {
443 synchronizeCache(addedFlows, modifiedFlowIds, droppedFlows, tuple, message.
getCorrelationId());
444 }
else if (request.getSynchronizeCache() == SynchronizeCacheAction.INVALIDATE_CACHE) {
445 invalidateCache(addedFlows, modifiedFlowIds, droppedFlows, tuple, message.
getCorrelationId());
448 FlowCacheSyncResults results =
new FlowCacheSyncResults(
449 droppedFlows, addedFlows, modifiedFlowChanges, unchangedFlows);
450 Values northbound =
new Values(
new InfoMessage(
new FlowCacheSyncResponse(results),
452 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
455 private void handleVerificationRequest(Tuple tuple, String flowId, CommandMessage message) {
456 ImmutablePair<Flow, Flow> flowPair = flowCache.
getFlow(flowId);
457 BidirectionalFlow biFlow =
new BidirectionalFlow(flowPair);
459 outputCollector.emit(StreamType.VERIFICATION.toString(), tuple,
new Values(flowId, biFlow, message));
465 private void synchronizeCache(List<String> addedFlowIds, List<String> modifiedFlowIds, List<String> droppedFlowIds,
466 Tuple tuple, String correlationId) {
467 logger.info(
"Synchronizing the flow cache data: {} dropped, {} added, {} modified.",
468 droppedFlowIds.size(), addedFlowIds.size(), modifiedFlowIds.size());
470 deleteFromCache(droppedFlowIds, tuple, correlationId);
473 Stream.concat(addedFlowIds.stream(), modifiedFlowIds.stream())
474 .map(pathComputer::getFlows)
475 .filter(flows -> !flows.isEmpty())
477 FlowCollector flowPair =
new FlowCollector();
478 flows.forEach(flowPair::add);
481 .forEach(flowPair -> {
482 final BidirectionalFlow bidirectionalFlow = flowPair.make();
483 final ImmutablePair<Flow, Flow> flow =
new ImmutablePair<>(
484 bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
485 final String flowId = flow.getLeft().getFlowId();
486 logger.debug(
"Refresh the flow: {}", flowId);
491 emitCacheSyncInfoMessage(flowId, flow, tuple, correlationId);
498 private void invalidateCache(List<String> addedFlowIds, List<String> modifiedFlowIds, List<String> droppedFlowIds,
499 Tuple tuple, String correlationId) {
500 logger.info(
"Invalidating the flow cache data: {} dropped, {} added, {} modified.",
501 droppedFlowIds.size(), addedFlowIds.size(), modifiedFlowIds.size());
503 deleteFromCache(droppedFlowIds, tuple, correlationId);
510 final String flowId = flow.getLeft().getFlowId();
511 logger.debug(
"Refresh the flow: {}", flowId);
513 emitCacheSyncInfoMessage(flowId, flow, tuple, correlationId);
520 private void deleteFromCache(List<String> droppedFlowIds, Tuple tuple, String correlationId) {
521 droppedFlowIds.forEach(flowId -> {
522 logger.debug(
"Delete the flow: {}", flowId);
526 emitCacheSyncInfoMessage(flowId, null, tuple, correlationId);
530 private void emitCacheSyncInfoMessage(String flowId, @Nullable ImmutablePair<Flow, Flow> flow,
531 Tuple tuple, String correlationId) {
532 String subCorrelationId =
format(
"%s-%s", correlationId, flowId);
533 FlowInfoData
data =
new FlowInfoData(flowId, flow, FlowOperation.CACHE, subCorrelationId);
534 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), subCorrelationId);
537 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
538 outputCollector.emit(StreamType.CACHE_SYNC.toString(), tuple,
topology);
539 }
catch (JsonProcessingException e) {
540 logger.error(
"Unable to serialize the message: {}", infoMessage);
544 private void handlePushRequest(String flowId, InfoMessage message, Tuple tuple)
throws IOException {
545 logger.info(
"PUSH flow: {} :: {}", flowId, message);
546 FlowInfoData fid = (FlowInfoData) message.getData();
547 ImmutablePair<Flow, Flow> flow = fid.getPayload();
552 FlowInfoData
data =
new FlowInfoData(flow.getLeft().getFlowId(), flow, FlowOperation.PUSH,
553 message.getCorrelationId());
554 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), message.getCorrelationId());
555 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
556 outputCollector.emit(StreamType.CREATE.toString(), tuple,
topology);
558 Values northbound =
new Values(
new InfoMessage(
new FlowStatusResponse(
559 new FlowIdStatusPayload(flowId, FlowState.UP)), message.getTimestamp(),
560 message.getCorrelationId(), Destination.NORTHBOUND));
561 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
564 private void handleUnpushRequest(String flowId, InfoMessage message, Tuple tuple)
throws IOException {
565 logger.info(
"UNPUSH flow: {} :: {}", flowId, message);
567 ImmutablePair<Flow, Flow> flow = flowCache.
deleteFlow(flowId);
570 FlowInfoData
data =
new FlowInfoData(flowId, flow, FlowOperation.UNPUSH, message.getCorrelationId());
571 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), message.getCorrelationId());
572 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
573 outputCollector.emit(StreamType.DELETE.toString(), tuple,
topology);
576 Values northbound =
new Values(
new InfoMessage(
new FlowStatusResponse(
577 new FlowIdStatusPayload(flowId, FlowState.DOWN)),
578 message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
579 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
583 private void handleDeleteRequest(String flowId, CommandMessage message, Tuple tuple)
throws IOException {
584 ImmutablePair<Flow, Flow> flow = flowCache.
deleteFlow(flowId);
586 logger.info(
"Deleted flow: {}", flowId);
588 FlowInfoData
data =
new FlowInfoData(flowId, flow, DELETE, message.getCorrelationId());
589 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), message.getCorrelationId());
590 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
591 outputCollector.emit(StreamType.DELETE.toString(), tuple,
topology);
593 Values northbound =
new Values(
new InfoMessage(
new FlowResponse(buildFlowResponse(flow)),
594 message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
595 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
598 private void handleCreateRequest(CommandMessage message, Tuple tuple)
throws IOException, RecoverableException {
599 Flow requestedFlow = ((FlowCreateRequest) message.getData()).getPayload();
601 ImmutablePair<PathInfoData, PathInfoData>
path;
603 flowValidator.
validate(requestedFlow);
605 path = pathComputer.
getPath(requestedFlow, Strategy.COST);
606 logger.info(
"Creating flow {}. Found path: {}, correlationId: {}", requestedFlow.getFlowId(),
path,
607 message.getCorrelationId());
609 }
catch (FlowValidationException e) {
610 throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
611 ErrorType.ALREADY_EXISTS,
"Could not create flow", e.getMessage());
612 }
catch (UnroutablePathException e) {
613 throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
614 ErrorType.NOT_FOUND,
"Could not create flow",
615 "Not enough bandwidth found or path not found");
618 ImmutablePair<Flow, Flow> flow = flowCache.
createFlow(requestedFlow,
path);
619 logger.info(
"Created flow: {}, correlationId: {}", flow, message.getCorrelationId());
621 FlowInfoData
data =
new FlowInfoData(requestedFlow.getFlowId(), flow, FlowOperation.CREATE,
622 message.getCorrelationId());
623 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), message.getCorrelationId());
624 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
625 outputCollector.emit(StreamType.CREATE.toString(), tuple,
topology);
627 Values northbound =
new Values(
new InfoMessage(
new FlowResponse(buildFlowResponse(flow)),
628 message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
629 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
632 private void handleRerouteRequest(CommandMessage message, Tuple tuple)
throws IOException, RecoverableException {
633 FlowRerouteRequest request = (FlowRerouteRequest) message.getData();
634 final String flowId = request.getFlowId();
635 final String correlationId = message.getCorrelationId();
636 logger.warn(
"Handling reroute request with correlationId {}", correlationId);
638 ImmutablePair<Flow, Flow> flow = flowCache.
getFlow(flowId);
639 switch (request.getOperation()) {
641 final Flow flowForward = flow.getLeft();
644 logger.warn(
"Origin flow {} path: {} correlationId {}", flowId, flowForward.getFlowPath(),
646 AvailableNetwork network = pathComputer.
getAvailableNetwork(flowForward.isIgnoreBandwidth(),
647 flowForward.getBandwidth());
649 ImmutablePair<PathInfoData, PathInfoData>
path =
650 pathComputer.
getPath(flow.getLeft(), network, Strategy.COST);
651 logger.warn(
"Potential New Path for flow {} with LEFT path: {}, RIGHT path: {} correlationId {}",
652 flowId,
path.getLeft(),
path.getRight(), correlationId);
653 boolean isFoundNewPath = (
654 !
path.getLeft().equals(flow.getLeft().getFlowPath())
655 || !
path.getRight().equals(flow.getRight().getFlowPath())
656 || !isFlowActive(flow));
659 if (isFoundNewPath || request.isForce()) {
660 flow.getLeft().setState(FlowState.DOWN);
661 flow.getRight().setState(FlowState.DOWN);
664 logger.warn(
"Rerouted flow with new path: {}, correlationId {}", flow, correlationId);
666 FlowInfoData
data =
new FlowInfoData(flowId, flow, UPDATE, correlationId);
667 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), correlationId);
668 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
669 outputCollector.emit(StreamType.UPDATE.toString(), tuple,
topology);
671 logger.warn(
"Reroute {} is unsuccessful: can't find new path. CorrelationId: {}",
672 flowId, correlationId);
675 logger.debug(
"Sending response to NB. Correlation id {}", correlationId);
676 FlowRerouteResponse response =
new FlowRerouteResponse(flow.left.getFlowPath(), isFoundNewPath);
677 Values values =
new Values(
new InfoMessage(response, message.getTimestamp(),
678 correlationId, Destination.NORTHBOUND));
679 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, values);
680 }
catch (UnroutablePathException e) {
681 logger.warn(
"There is no path available for the flow {}, correlationId: {}", flowId,
683 flow.getLeft().setState(FlowState.DOWN);
684 flow.getRight().setState(FlowState.DOWN);
685 throw new MessageException(correlationId, System.currentTimeMillis(),
686 ErrorType.UPDATE_FAILURE,
"Could not reroute flow",
"Path was not found");
691 logger.warn(
"State flow: {}={}, correlationId: {}", flowId, FlowState.UP, correlationId);
692 flow.getLeft().setState(FlowState.UP);
693 flow.getRight().setState(FlowState.UP);
697 logger.warn(
"State flow: {}={}, correlationId: {}", flowId, FlowState.DOWN, correlationId);
698 flow.getLeft().setState(FlowState.DOWN);
699 flow.getRight().setState(FlowState.DOWN);
703 logger.warn(
"Flow {} undefined reroute operation", request.getOperation());
708 private void handleUpdateRequest(CommandMessage message, Tuple tuple)
throws IOException, RecoverableException {
709 Flow requestedFlow = ((FlowUpdateRequest) message.getData()).getPayload();
710 String correlationId = message.getCorrelationId();
712 ImmutablePair<PathInfoData, PathInfoData>
path;
714 flowValidator.
validate(requestedFlow);
716 AvailableNetwork network = pathComputer.
getAvailableNetwork(requestedFlow.isIgnoreBandwidth(),
717 requestedFlow.getBandwidth());
719 requestedFlow.isIgnoreBandwidth(), requestedFlow.getBandwidth());
720 path = pathComputer.
getPath(requestedFlow, network, Strategy.COST);
721 logger.info(
"Updated flow path: {}, correlationId {}",
path, correlationId);
723 }
catch (FlowValidationException e) {
724 throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
725 ErrorType.ALREADY_EXISTS,
"Could not update flow", e.getMessage());
726 }
catch (UnroutablePathException e) {
727 throw new MessageException(message.getCorrelationId(), System.currentTimeMillis(),
728 ErrorType.NOT_FOUND,
"Could not update flow",
"Path was not found");
731 ImmutablePair<Flow, Flow> flow = flowCache.
updateFlow(requestedFlow,
path);
732 logger.info(
"Updated flow: {}, correlationId {}", flow, correlationId);
734 FlowInfoData
data =
new FlowInfoData(requestedFlow.getFlowId(), flow, UPDATE,
735 message.getCorrelationId());
736 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), message.getCorrelationId());
737 Values
topology =
new Values(MAPPER.writeValueAsString(infoMessage));
738 outputCollector.emit(StreamType.UPDATE.toString(), tuple,
topology);
740 Values northbound =
new Values(
new InfoMessage(
new FlowResponse(buildFlowResponse(flow)),
741 message.getTimestamp(), message.getCorrelationId(), Destination.NORTHBOUND));
742 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
745 private void handleDumpRequest(CommandMessage message, Tuple tuple) {
746 List<BidirectionalFlow> flows = flowCache.
dumpFlows().stream()
747 .map(BidirectionalFlow::new)
748 .collect(Collectors.toList());
750 logger.debug(
"Dump flows: found {} items", flows.size());
752 String requestId = message.getCorrelationId();
753 if (flows.isEmpty()) {
754 Message response =
new ChunkedInfoMessage(null, System.currentTimeMillis(), requestId, null);
755 outputCollector.emit(StreamType.RESPONSE.toString(), tuple,
new Values(response));
757 Iterator<BidirectionalFlow> iterator = flows.iterator();
759 BidirectionalFlow flow = iterator.next();
760 String nextRequestId = iterator.hasNext() ? UUID.randomUUID().toString() : null;
762 Message response =
new ChunkedInfoMessage(
763 new FlowReadResponse(flow), System.currentTimeMillis(), requestId, nextRequestId);
764 outputCollector.emit(StreamType.RESPONSE.toString(), tuple,
new Values(response));
765 requestId = nextRequestId;
766 }
while (iterator.hasNext());
770 private void handleReadRequest(String flowId, CommandMessage message, Tuple tuple) {
771 BidirectionalFlow flow =
new BidirectionalFlow(flowCache.
getFlow(flowId));
773 logger.debug(
"Got bidirectional flow: {}, correlationId {}", flow, message.getCorrelationId());
775 Values northbound =
new Values(
777 new FlowReadResponse(flow),
778 message.getTimestamp(),
779 message.getCorrelationId(),
780 Destination.NORTHBOUND));
781 outputCollector.emit(StreamType.RESPONSE.toString(), tuple, northbound);
790 private void handleStateRequest(String flowId, FlowState state, Tuple tuple, String correlationId)
792 ImmutablePair<Flow, Flow> flow = flowCache.
getFlow(flowId);
793 logger.info(
"State flow: {}={}", flowId, state);
794 flow.getLeft().setState(state);
795 flow.getRight().setState(state);
797 FlowInfoData
data =
new FlowInfoData(flowId, flow, FlowOperation.STATE, correlationId);
798 InfoMessage infoMessage =
new InfoMessage(
data, System.currentTimeMillis(), correlationId);
800 Values
topology =
new Values(Utils.MAPPER.writeValueAsString(infoMessage));
801 outputCollector.emit(StreamType.STATUS.toString(), tuple,
topology);
805 private void handleErrorRequest(String flowId, ErrorMessage message, Tuple tuple)
throws IOException {
806 ErrorType errorType = message.getData().getErrorType();
807 message.getData().setErrorDescription(
"topology-engine internal error");
809 logger.info(
"Flow {} {} failure", errorType, flowId);
812 case CREATION_FAILURE:
817 handleStateRequest(flowId, FlowState.DOWN, tuple, message.getCorrelationId());
820 case DELETION_FAILURE:
827 logger.warn(
"Flow {} undefined failure", flowId);
831 Values error =
new Values(message, errorType);
832 outputCollector.emit(StreamType.ERROR.toString(), tuple, error);
835 private void handleFlowSync(NetworkInfoData networkDump) {
836 Set<ImmutablePair<Flow, Flow>> flows = networkDump.getFlows();
838 logger.info(
"Load flows {}", flows.size());
839 flows.forEach(flowCache::putFlow);
848 private Flow buildFlowResponse(ImmutablePair<Flow, Flow> flow) {
849 Flow response =
new Flow(flow.left);
850 response.setCookie(response.getCookie() & ResourceCache.FLOW_COOKIE_VALUE_MASK);
854 private ErrorMessage buildErrorMessage(String correlationId, ErrorType
type, String message, String
description) {
856 System.currentTimeMillis(), correlationId, Destination.NORTHBOUND);
859 private boolean isFlowActive(ImmutablePair<Flow, Flow> flowPair) {
860 return flowPair.getLeft().getState().isActive() && flowPair.getRight().getState().isActive();
863 private void initFlowCache() {
864 PathComputerFlowFetcher flowFetcher =
new PathComputerFlowFetcher(pathComputer);
866 for (BidirectionalFlow bidirectionalFlow : flowFetcher.getFlows()) {
867 ImmutablePair<Flow, Flow> flowPair =
new ImmutablePair<>(
868 bidirectionalFlow.getForward(), bidirectionalFlow.getReverse());
882 logger.info(
"State clear request from test");
883 initState(
new InMemoryKeyValueState<>());
905 return outputCollector;
void pushFlow(ImmutablePair< Flow, Flow > flow)
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final String STATUS_FIELD
static final Fields fieldsMessageErrorType
ImmutablePair< Flow, Flow > removeFlow(String flowId)
static final String FIELD_ID_BIFLOW
static final ObjectMapper MAPPER
static final Fields STREAM_FIELDS_VERIFICATION
String getErrorDescription()
Optional< AbstractDumpState > dumpResorceCacheState()
PathComputer getPathComputer()
static final String FIELD_ID_FLOW_ID
static final String MESSAGE_FIELD
ImmutablePair< Flow, Flow > getFlow(String flowId)
ImmutablePair< PathInfoData, PathInfoData > getPath(Flow flow, AvailableNetwork network, Strategy strategy)
static final Fields fieldMessage
Map< SwitchId, Set< Integer > > getAllocatedMeters()
default List< FlowInfo > getFlowInfo()
void execute(Tuple tuple)
Set< Integer > getAllocatedVlans()
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
Set< ImmutablePair< Flow, Flow > > dumpFlows()
TopologyContext getContext()
static final String FIELD_ID_NETWORK_DUMP
AbstractDumpState dumpState()
static final String DEFAULT_CORRELATION_ID
ImmutablePair< Flow, Flow > updateFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
static final String CORRELATION_ID
AvailableNetwork getAvailableNetwork(boolean ignoreBandwidth, long requestedBandwidth)
static final String FIELD_ID_MESSAGE
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
ImmutablePair< Flow, Flow > deleteFlow(String flowId)
ImmutablePair< Flow, Flow > createFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
CrudBolt(PathComputerAuth pathComputerAuth)
Set< Integer > getAllocatedCookies()
void addIslsOccupiedByFlow(String flowId, boolean ignoreBandwidth, long flowBandwidth)
String getCorrelationId()
static final String FLOW_ID
void initState(InMemoryKeyValueState< String, FlowCache > state)
OutputCollector getOutput()
boolean cacheContainsFlow(String flowId)
static final String STREAM_ID_CTRL