16 package org.openkilda.northbound.service.impl;
74 import org.apache.commons.lang3.math.NumberUtils;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import org.springframework.beans.factory.annotation.Autowired;
78 import org.springframework.beans.factory.annotation.Value;
79 import org.springframework.http.HttpHeaders;
80 import org.springframework.stereotype.Service;
81 import org.springframework.web.client.RestTemplate;
83 import java.nio.file.InvalidPathException;
84 import java.util.ArrayList;
85 import java.util.HashMap;
86 import java.util.HashSet;
87 import java.util.List;
89 import java.util.Optional;
91 import java.util.stream.Collectors;
92 import javax.annotation.PostConstruct;
102 private static final Logger LOGGER = LoggerFactory.getLogger(
FlowServiceImpl.class);
107 private static final Long IGNORE_COOKIE_FILTER = 0L;
110 private Auth pathComputerAuth;
115 @Value(
"#{kafkaTopicsConfig.getFlowTopic()}")
116 private String topic;
121 @Value(
"#{kafkaTopicsConfig.getTopoEngTopic()}")
122 private String topoEngTopic;
124 @Value(
"${neo4j.hosts}")
125 private String neoHost;
127 @Value(
"${neo4j.user}")
128 private String neoUser;
130 @Value(
"${neo4j.pswd}")
131 private String neoPswd;
157 private HttpHeaders headers;
162 @Value(
"${topology.engine.rest.endpoint}")
163 private String topologyEngineRest;
169 private RestTemplate restTemplate;
177 pathComputerAuth =
new AuthNeo4j(neoHost, neoUser, neoPswd);
187 LOGGER.debug(
"Create flow: {}", flow);
190 messageProducer.send(topic, request);
193 return flowMapper.toFlowPayload(response.
getPayload());
201 logger.debug(
"Delete flow: {}={}", FLOW_ID,
id);
204 return deleteFlowResponse(correlationId, request);
211 private CommandMessage sendDeleteFlow(
final String
id,
final String correlationId) {
216 messageProducer.send(topic, request);
224 private FlowPayload deleteFlowResponse(
final String correlationId, CommandMessage request) {
226 FlowResponse response = (FlowResponse)
validateInfoMessage(request, message, correlationId);
227 return flowMapper.toFlowPayload(response.getPayload());
235 logger.debug(
"Get flow: {}={}", FLOW_ID,
id);
237 return flowMapper.toFlowPayload(flow.getForward());
246 logger.debug(
"Update flow: {}={}", FLOW_ID, flow.
getId());
249 messageProducer.send(topic, request);
252 return flowMapper.toFlowPayload(response.
getPayload());
261 LOGGER.debug(
"Get flows request processing");
264 messageProducer.send(topic, request);
265 List<FlowReadResponse>
result = flowsCollector.getResult(correlationId);
269 .map(FlowReadResponse::getPayload)
270 .map(BidirectionalFlow::getForward)
271 .map(flowMapper::toFlowPayload)
272 .collect(Collectors.toList());
281 LOGGER.debug(
"DELETE ALL FLOWS");
282 ArrayList<FlowPayload>
result =
new ArrayList<>();
284 List<FlowPayload> flows = this.
getFlows();
287 ArrayList<CommandMessage> requests =
new ArrayList<>();
288 for (
int i = 0;
i < flows.size();
i++) {
289 String cid = correlationId +
"-" +
i;
291 requests.add(sendDeleteFlow(flow.
getId(), cid));
294 for (
int i = 0;
i < flows.size();
i++) {
295 String cid = correlationId +
"-" +
i;
296 result.add(deleteFlowResponse(cid, requests.get(
i)));
299 LOGGER.debug(
"\n\nDELETE ALL FLOWS: EXIT {}={}\n", CORRELATION_ID, correlationId);
308 logger.debug(
"Flow status: {}={}", FLOW_ID,
id);
310 return flowMapper.toFlowIdStatusPayload(flow);
318 LOGGER.debug(
"Flow path: {}={}", FLOW_ID,
id);
320 return flowMapper.toFlowPathPayload(flow);
330 return flowPushUnpush(externalFlows, op);
340 return flowPushUnpush(externalFlows, op);
347 private BidirectionalFlow getBidirectionalFlow(String flowId, String correlationId) {
350 messageProducer.send(topic, request);
358 private BatchResults flowPushUnpush(List<FlowInfoData> externalFlows, FlowOperation op) {
360 LOGGER.debug(
"Flow {}: id: {}",
362 LOGGER.debug(
"Size of list: {}", externalFlows.size());
365 ArrayList<InfoMessage> flowRequests =
new ArrayList<>();
366 ArrayList<InfoMessage> teRequests =
new ArrayList<>();
367 for (
int i = 0;
i < externalFlows.size();
i++) {
369 data.setOperation(op);
370 String flowCorrelation = correlationId +
"-FLOW-" +
i;
372 new InfoMessage(
data, System.currentTimeMillis(), flowCorrelation, Destination.WFM);
373 flowRequests.add(flowRequest);
374 messageProducer.send(topic, flowRequest);
375 String teCorrelation = correlationId +
"-TE-" +
i;
377 new InfoMessage(
data, System.currentTimeMillis(), teCorrelation, Destination.TOPOLOGY_ENGINE);
378 teRequests.add(teRequest);
379 messageProducer.send(topoEngTopic, teRequest);
386 List<String> msgs =
new ArrayList<>();
387 msgs.add(
"Total Flows Received: " + externalFlows.size());
389 for (
int i = 0;
i < externalFlows.size();
i++) {
390 String flowCorrelation = correlationId +
"-FLOW-" +
i;
391 String teCorrelation = correlationId +
"-TE-" +
i;
392 FlowState expectedState = (op == FlowOperation.PUSH || op == FlowOperation.PUSH_PROPAGATE)
396 Message flowMessage = (
Message) messageConsumer.poll(flowCorrelation);
398 flowRequests.get(
i), flowMessage, correlationId);
399 FlowIdStatusPayload
status = response.getPayload();
400 if (
status.getStatus() == expectedState) {
403 msgs.add(
"FAILURE (FlowTopo): Flow " +
status.getId()
404 +
" NOT in " + expectedState
405 +
" state: state = " +
status.getStatus());
408 }
catch (Exception e) {
409 msgs.add(
"EXCEPTION in Flow Topology Response: " + e.getMessage());
414 Message teMessage = (Message) messageConsumer.poll(teCorrelation);
415 FlowStatusResponse response =
417 FlowIdStatusPayload
status = response.getPayload();
418 if (
status.getStatus() == expectedState) {
421 msgs.add(
"FAILURE (TE): Flow " +
status.getId()
422 +
" NOT in " + expectedState
423 +
" state: state = " +
status.getStatus());
426 }
catch (Exception e) {
427 msgs.add(
"EXCEPTION in Topology Engine Response: " + e.getMessage());
432 BatchResults
result =
new BatchResults(
433 flowFailure + teFailure,
434 flowSuccess + teSuccess,
437 LOGGER.debug(
"Returned: ",
result);
446 return reroute(flowId,
false);
451 return reroute(flowId,
true);
454 private static final class SimpleSwitchRule {
462 private long pktCount;
463 private long byteCount;
467 public String toString() {
468 return "{sw:" + switchId
470 +
", in:" + inPort +
"-" + inVlan
471 +
", out:" + outPort +
"-" + outVlan
478 public static final List<SimpleSwitchRule> convertFlow(Flow flow) {
482 SimpleSwitchRule rule =
new SimpleSwitchRule();
483 rule.switchId = flow.getSourceSwitch();
484 rule.cookie = flow.getCookie();
485 rule.inPort = flow.getSourcePort();
486 rule.inVlan = flow.getSourceVlan();
487 rule.meterId = flow.getMeterId();
488 List<PathNode>
path = flow.getFlowPath().getPath();
490 if (
path.size() == 0) {
492 rule.outPort = flow.getDestinationPort();
493 rule.outVlan = flow.getDestinationVlan();
496 rule.outPort =
path.get(0).getPortNo();
497 rule.outVlan = flow.getTransitVlan();
500 List<SimpleSwitchRule>
result =
new ArrayList<>();
509 if (
path.size() > 2) {
510 for (
int i = 1;
i <
path.size() - 1;
i =
i + 2) {
512 final PathNode inNode =
path.get(
i);
513 final PathNode outNode =
path.get(
i + 1);
515 rule =
new SimpleSwitchRule();
516 rule.switchId = inNode.getSwitchId();
517 rule.inPort = inNode.getPortNo();
519 rule.cookie = Optional.ofNullable(inNode.getCookie())
520 .filter(cookie -> !cookie.equals(NumberUtils.LONG_ZERO))
521 .orElse(flow.getCookie());
522 rule.inVlan = flow.getTransitVlan();
525 rule.outPort = outNode.getPortNo();
533 if (
path.size() > 0) {
534 rule =
new SimpleSwitchRule();
535 rule.switchId = flow.getDestinationSwitch();
536 rule.outPort = flow.getDestinationPort();
537 rule.outVlan = flow.getDestinationVlan();
538 rule.inVlan = flow.getTransitVlan();
539 rule.inPort =
path.get(
path.size() - 1).getPortNo();
540 rule.cookie = Optional.ofNullable(
path.get(
path.size() - 1).getCookie())
541 .filter(cookie -> !cookie.equals(NumberUtils.LONG_ZERO))
542 .orElse(flow.getCookie());
551 public static final List<SimpleSwitchRule> convertSwitchRules(SwitchFlowEntries rules) {
552 List<SimpleSwitchRule>
result =
new ArrayList<>();
553 if (rules == null || rules.getFlowEntries() == null) {
557 for (FlowEntry switchRule : rules.getFlowEntries()) {
558 logger.debug(
"FlowEntry: {}", switchRule);
559 SimpleSwitchRule rule =
new SimpleSwitchRule();
560 rule.switchId = rules.getSwitchId();
561 rule.cookie = switchRule.getCookie();
562 rule.inPort = NumberUtils.toInt(switchRule.getMatch().getInPort());
563 rule.inVlan = NumberUtils.toInt(switchRule.getMatch().getVlanVid());
564 if (switchRule.getInstructions() != null) {
566 if (switchRule.getInstructions().getApplyActions() != null) {
568 FlowApplyActions applyActions = switchRule.getInstructions().getApplyActions();
569 rule.outVlan = Optional.ofNullable(applyActions.getFieldAction())
570 .filter(action ->
"vlan_vid".equals(action.getFieldName()))
571 .map(FlowSetFieldAction::getFieldValue)
572 .map(NumberUtils::toInt)
573 .orElse(NumberUtils.INTEGER_ZERO);
574 rule.outPort = NumberUtils.toInt(applyActions.getFlowOutput());
577 rule.meterId = Optional.ofNullable(switchRule.getInstructions().getGoToMeter())
579 .orElse(NumberUtils.INTEGER_ZERO);
581 rule.pktCount = switchRule.getPacketCount();
582 rule.byteCount = switchRule.getByteCount();
583 rule.version = switchRule.getVersion();
594 static List<PathDiscrepancyDto> findDiscrepancy(
595 SimpleSwitchRule expected, List<SimpleSwitchRule> possibleActual,
596 List<Long> pktCounts, List<Long> byteCounts) {
597 List<PathDiscrepancyDto>
result =
new ArrayList<>();
602 SimpleSwitchRule matched = possibleActual.stream()
603 .filter(rule -> rule.cookie != 0 && rule.cookie == expected.cookie)
609 if (matched == null) {
610 matched = possibleActual.stream()
611 .filter(rule -> rule.inPort == expected.inPort && rule.inVlan == expected.inVlan)
618 if (matched == null) {
619 matched = possibleActual.stream()
620 .filter(rule -> rule.outPort == expected.outPort && rule.outVlan == expected.outVlan)
628 if (matched == null) {
629 result.add(
new PathDiscrepancyDto(String.valueOf(expected),
"all", String.valueOf(expected),
""));
633 if (matched.cookie != expected.cookie) {
634 result.add(
new PathDiscrepancyDto(expected.toString(),
"cookie",
635 String.valueOf(expected.cookie), String.valueOf(matched.cookie)));
637 if (matched.inPort != expected.inPort) {
638 result.add(
new PathDiscrepancyDto(expected.toString(),
"inPort",
639 String.valueOf(expected.inPort), String.valueOf(matched.inPort)));
641 if (matched.inVlan != expected.inVlan) {
642 result.add(
new PathDiscrepancyDto(expected.toString(),
"inVlan",
643 String.valueOf(expected.inVlan), String.valueOf(matched.inVlan)));
645 if (matched.outPort != expected.outPort) {
646 result.add(
new PathDiscrepancyDto(expected.toString(),
"outPort",
647 String.valueOf(expected.outPort), String.valueOf(matched.outPort)));
649 if (matched.outVlan != expected.outVlan) {
650 result.add(
new PathDiscrepancyDto(expected.toString(),
"outVlan",
651 String.valueOf(expected.outVlan), String.valueOf(matched.outVlan)));
655 if ((matched.version == null || matched.version.compareTo(
"OF_12") > 0)
656 && matched.meterId != expected.meterId) {
657 result.add(
new PathDiscrepancyDto(expected.toString(),
"meterId",
658 String.valueOf(expected.meterId), String.valueOf(matched.meterId)));
660 pktCounts.add(matched.pktCount);
661 byteCounts.add(matched.byteCount);
681 List<Flow> flows = pathComputer.
getFlow(flowId);
686 logger.debug(
"VALIDATE FLOW: Found Flows: count = {}", flows.size());
691 List<List<SimpleSwitchRule>> simpleFlowRules =
new ArrayList<>();
692 Set<SwitchId> switches =
new HashSet<>();
693 for (
Flow flow : flows) {
694 if (flow.getFlowPath() != null) {
695 simpleFlowRules.add(SimpleSwitchRule.convertFlow(flow));
696 switches.add(flow.getSourceSwitch());
697 switches.add(flow.getDestinationSwitch());
699 switches.add(
node.getSwitchId());
702 throw new InvalidPathException(flowId,
"Flow Path was not returned.");
721 final Map<SwitchId, SwitchFlowEntries> rules =
new HashMap<>();
722 final Map<SwitchId, List<SimpleSwitchRule>> simpleRules =
new HashMap<>();
723 int totalSwitchRules = 0;
725 for (
SwitchId switchId : switches) {
726 String requestId = correlationId +
"-" +
index++;
727 SwitchFlowEntries sfe = switchService.getRules(switchId, IGNORE_COOKIE_FILTER, requestId);
728 rules.put(switchId, sfe);
729 simpleRules.put(switchId, SimpleSwitchRule.convertSwitchRules(rules.get(switchId)));
730 totalSwitchRules += (sfe != null && sfe.getFlowEntries() != null) ? sfe.getFlowEntries().size() : 0;
736 List<FlowValidationDto> results =
new ArrayList<>();
737 for (List<SimpleSwitchRule> oneDirection : simpleFlowRules) {
738 List<PathDiscrepancyDto> discrepancies =
new ArrayList<>();
739 List<Long> pktCounts =
new ArrayList<>();
740 List<Long> byteCounts =
new ArrayList<>();
741 for (
int i = 0;
i < oneDirection.size();
i++) {
742 SimpleSwitchRule simpleRule = oneDirection.get(
i);
744 discrepancies.addAll(
745 SimpleSwitchRule.findDiscrepancy(simpleRule,
746 simpleRules.get(simpleRule.switchId),
747 pktCounts, byteCounts
753 result.setDiscrepancies(discrepancies);
754 result.setAsExpected(discrepancies.size() == 0);
755 result.setPktCounts(pktCounts);
756 result.setByteCounts(byteCounts);
757 result.setFlowRulesTotal(oneDirection.size());
758 result.setSwitchRulesTotal(totalSwitchRules);
770 messageProducer.send(
topic, request);
774 request, message, correlationId);
776 return flowMapper.toVerificationOutput(response);
785 LOGGER.debug(
"Flow cache sync. Action: {}", syncCacheAction);
788 messageProducer.send(
topic, request);
795 logger.debug(
"Reroute flow: {}={}, forced={}", FLOW_ID, flowId, forced);
799 payload, System.currentTimeMillis(), correlationId,
Destination.
WFM);
804 logger.debug(
"Got reroute response {}", message);
806 return flowMapper.toReroutePayload(flowId, response.getPayload(), response.isRerouted());
FlowCacheSyncResults getPayload()
BatchResults unpushFlows(List< FlowInfoData > externalFlows, Boolean propagate, Boolean verify)
FlowReroutePayload syncFlow(String flowId)
List< FlowValidationDto > validateFlow(final String flowId)
List< FlowPayload > deleteFlows()
List< FlowPayload > getFlows()
FlowPathPayload pathFlow(final String id)
FlowIdStatusPayload statusFlow(final String id)
def command(payload, fields)
default InfoData validateInfoMessage(final Message requestMessage, final Message responseMessage, final String correlationId)
FlowPayload getFlow(final String id)
static final String CORRELATION_ID
static Flow buildFlowByFlowPayload(FlowPayload flowPayload)
BatchResults pushFlows(List< FlowInfoData > externalFlows, Boolean propagate, Boolean verify)
FlowReroutePayload rerouteFlow(String flowId)
default List< Flow > getFlow(String flowId)
FlowPayload createFlow(final FlowPayload flow)
VerificationOutput verifyFlow(String flowId, VerificationInput payload)
FlowCacheSyncResults syncFlowCache(SynchronizeCacheAction syncCacheAction)
static final String FLOW_ID
FlowPayload updateFlow(final FlowPayload flow)
FlowPayload deleteFlow(final String id)