Open Kilda Java Documentation
FlowServiceImpl.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.northbound.service.impl;
17 
19 import static org.openkilda.messaging.Utils.FLOW_ID;
20 
73 
74 import org.apache.commons.lang3.math.NumberUtils;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import org.springframework.beans.factory.annotation.Autowired;
78 import org.springframework.beans.factory.annotation.Value;
79 import org.springframework.http.HttpHeaders;
80 import org.springframework.stereotype.Service;
81 import org.springframework.web.client.RestTemplate;
82 
83 import java.nio.file.InvalidPathException;
84 import java.util.ArrayList;
85 import java.util.HashMap;
86 import java.util.HashSet;
87 import java.util.List;
88 import java.util.Map;
89 import java.util.Optional;
90 import java.util.Set;
91 import java.util.stream.Collectors;
92 import javax.annotation.PostConstruct;
93 
97 @Service
98 public class FlowServiceImpl implements FlowService {
102  private static final Logger LOGGER = LoggerFactory.getLogger(FlowServiceImpl.class);
103 
107  private static final Long IGNORE_COOKIE_FILTER = 0L;
108 
109  private PathComputer pathComputer;
110  private Auth pathComputerAuth;
111 
115  @Value("#{kafkaTopicsConfig.getFlowTopic()}")
116  private String topic;
117 
121  @Value("#{kafkaTopicsConfig.getTopoEngTopic()}")
122  private String topoEngTopic;
123 
124  @Value("${neo4j.hosts}")
125  private String neoHost;
126 
127  @Value("${neo4j.user}")
128  private String neoUser;
129 
130  @Value("${neo4j.pswd}")
131  private String neoPswd;
132 
133  @Autowired
134  private FlowMapper flowMapper;
135 
139  @Autowired
140  private SwitchService switchService;
141 
145  @Autowired
146  private MessageConsumer messageConsumer;
147 
151  @Autowired
152  private MessageProducer messageProducer;
153 
157  private HttpHeaders headers;
158 
162  @Value("${topology.engine.rest.endpoint}")
163  private String topologyEngineRest;
164 
168  @Autowired
169  private RestTemplate restTemplate;
170 
171  @Autowired
172  private ResponseCollector<FlowReadResponse> flowsCollector;
173 
174 
175  @PostConstruct
176  void init() {
177  pathComputerAuth = new AuthNeo4j(neoHost, neoUser, neoPswd);
178  pathComputer = new NeoDriver(pathComputerAuth.getDriver());
179  }
180 
184  @Override
185  public FlowPayload createFlow(final FlowPayload flow) {
186  final String correlationId = RequestCorrelationId.getId();
187  LOGGER.debug("Create flow: {}", flow);
189  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
190  messageProducer.send(topic, request);
191  Message message = (Message) messageConsumer.poll(correlationId);
192  FlowResponse response = (FlowResponse) validateInfoMessage(request, message, correlationId);
193  return flowMapper.toFlowPayload(response.getPayload());
194  }
195 
199  @Override
200  public FlowPayload deleteFlow(final String id) {
201  logger.debug("Delete flow: {}={}", FLOW_ID, id);
202  final String correlationId = RequestCorrelationId.getId();
203  CommandMessage request = sendDeleteFlow(id, correlationId);
204  return deleteFlowResponse(correlationId, request);
205  }
206 
211  private CommandMessage sendDeleteFlow(final String id, final String correlationId) {
212  Flow flow = new Flow();
213  flow.setFlowId(id);
215  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
216  messageProducer.send(topic, request);
217  return request;
218  }
219 
224  private FlowPayload deleteFlowResponse(final String correlationId, CommandMessage request) {
225  Message message = (Message) messageConsumer.poll(correlationId);
226  FlowResponse response = (FlowResponse) validateInfoMessage(request, message, correlationId);
227  return flowMapper.toFlowPayload(response.getPayload());
228  }
229 
233  @Override
234  public FlowPayload getFlow(final String id) {
235  logger.debug("Get flow: {}={}", FLOW_ID, id);
236  BidirectionalFlow flow = getBidirectionalFlow(id, RequestCorrelationId.getId());
237  return flowMapper.toFlowPayload(flow.getForward());
238  }
239 
243  @Override
244  public FlowPayload updateFlow(final FlowPayload flow) {
245  final String correlationId = RequestCorrelationId.getId();
246  logger.debug("Update flow: {}={}", FLOW_ID, flow.getId());
248  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
249  messageProducer.send(topic, request);
250  Message message = (Message) messageConsumer.poll(correlationId);
251  FlowResponse response = (FlowResponse) validateInfoMessage(request, message, correlationId);
252  return flowMapper.toFlowPayload(response.getPayload());
253  }
254 
258  @Override
259  public List<FlowPayload> getFlows() {
260  final String correlationId = RequestCorrelationId.getId();
261  LOGGER.debug("Get flows request processing");
263  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
264  messageProducer.send(topic, request);
265  List<FlowReadResponse> result = flowsCollector.getResult(correlationId);
266  logger.debug("Received {} flows", result.size());
267 
268  return result.stream()
269  .map(FlowReadResponse::getPayload)
270  .map(BidirectionalFlow::getForward)
271  .map(flowMapper::toFlowPayload)
272  .collect(Collectors.toList());
273  }
274 
278  @Override
279  public List<FlowPayload> deleteFlows() {
280  String correlationId = RequestCorrelationId.getId();
281  LOGGER.debug("DELETE ALL FLOWS");
282  ArrayList<FlowPayload> result = new ArrayList<>();
283  // TODO: Need a getFlowIDs .. since that is all we need
284  List<FlowPayload> flows = this.getFlows();
285 
286  // Send all the requests first
287  ArrayList<CommandMessage> requests = new ArrayList<>();
288  for (int i = 0; i < flows.size(); i++) {
289  String cid = correlationId + "-" + i;
290  FlowPayload flow = flows.get(i);
291  requests.add(sendDeleteFlow(flow.getId(), cid));
292  }
293  // Now wait for the responses.
294  for (int i = 0; i < flows.size(); i++) {
295  String cid = correlationId + "-" + i;
296  result.add(deleteFlowResponse(cid, requests.get(i)));
297  }
298 
299  LOGGER.debug("\n\nDELETE ALL FLOWS: EXIT {}={}\n", CORRELATION_ID, correlationId);
300  return result;
301  }
302 
306  @Override
307  public FlowIdStatusPayload statusFlow(final String id) {
308  logger.debug("Flow status: {}={}", FLOW_ID, id);
309  BidirectionalFlow flow = getBidirectionalFlow(id, RequestCorrelationId.getId());
310  return flowMapper.toFlowIdStatusPayload(flow);
311  }
312 
316  @Override
317  public FlowPathPayload pathFlow(final String id) {
318  LOGGER.debug("Flow path: {}={}", FLOW_ID, id);
319  BidirectionalFlow flow = getBidirectionalFlow(id, RequestCorrelationId.getId());
320  return flowMapper.toFlowPathPayload(flow);
321  }
322 
326  @Override
327  public BatchResults unpushFlows(List<FlowInfoData> externalFlows, Boolean propagate, Boolean verify) {
329  // TODO: ADD the VERIFY implementation
330  return flowPushUnpush(externalFlows, op);
331  }
332 
336  @Override
337  public BatchResults pushFlows(List<FlowInfoData> externalFlows, Boolean propagate, Boolean verify) {
339  // TODO: ADD the VERIFY implementation
340  return flowPushUnpush(externalFlows, op);
341  }
342 
347  private BidirectionalFlow getBidirectionalFlow(String flowId, String correlationId) {
348  FlowReadRequest data = new FlowReadRequest(flowId);
349  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
350  messageProducer.send(topic, request);
351  Message message = (Message) messageConsumer.poll(correlationId);
352  return ((FlowReadResponse) validateInfoMessage(request, message, correlationId)).getPayload();
353  }
354 
358  private BatchResults flowPushUnpush(List<FlowInfoData> externalFlows, FlowOperation op) {
359  final String correlationId = RequestCorrelationId.getId();
360  LOGGER.debug("Flow {}: id: {}",
361  op, externalFlows.stream().map(FlowInfoData::getFlowId).collect(Collectors.joining()));
362  LOGGER.debug("Size of list: {}", externalFlows.size());
363  // First, send them all, then wait for all the responses.
364  // Send the command to both Flow Topology and to TE
365  ArrayList<InfoMessage> flowRequests = new ArrayList<>(); // used for error reporting, if needed
366  ArrayList<InfoMessage> teRequests = new ArrayList<>(); // used for error reporting, if needed
367  for (int i = 0; i < externalFlows.size(); i++) {
368  FlowInfoData data = externalFlows.get(i);
369  data.setOperation(op); // <-- this is what determines PUSH / UNPUSH
370  String flowCorrelation = correlationId + "-FLOW-" + i;
371  InfoMessage flowRequest =
372  new InfoMessage(data, System.currentTimeMillis(), flowCorrelation, Destination.WFM);
373  flowRequests.add(flowRequest);
374  messageProducer.send(topic, flowRequest);
375  String teCorrelation = correlationId + "-TE-" + i;
376  InfoMessage teRequest =
377  new InfoMessage(data, System.currentTimeMillis(), teCorrelation, Destination.TOPOLOGY_ENGINE);
378  teRequests.add(teRequest);
379  messageProducer.send(topoEngTopic, teRequest);
380  }
381 
382  int flowSuccess = 0;
383  int flowFailure = 0;
384  int teSuccess = 0;
385  int teFailure = 0;
386  List<String> msgs = new ArrayList<>();
387  msgs.add("Total Flows Received: " + externalFlows.size());
388 
389  for (int i = 0; i < externalFlows.size(); i++) {
390  String flowCorrelation = correlationId + "-FLOW-" + i;
391  String teCorrelation = correlationId + "-TE-" + i;
392  FlowState expectedState = (op == FlowOperation.PUSH || op == FlowOperation.PUSH_PROPAGATE)
393  ? FlowState.UP
394  : FlowState.DOWN;
395  try {
396  Message flowMessage = (Message) messageConsumer.poll(flowCorrelation);
397  FlowStatusResponse response = (FlowStatusResponse) validateInfoMessage(
398  flowRequests.get(i), flowMessage, correlationId);
399  FlowIdStatusPayload status = response.getPayload();
400  if (status.getStatus() == expectedState) {
401  flowSuccess++;
402  } else {
403  msgs.add("FAILURE (FlowTopo): Flow " + status.getId()
404  + " NOT in " + expectedState
405  + " state: state = " + status.getStatus());
406  flowFailure++;
407  }
408  } catch (Exception e) {
409  msgs.add("EXCEPTION in Flow Topology Response: " + e.getMessage());
410  flowFailure++;
411  }
412  try {
413  // TODO: this code block is mostly the same as the previous: consolidate.
414  Message teMessage = (Message) messageConsumer.poll(teCorrelation);
415  FlowStatusResponse response =
416  (FlowStatusResponse) validateInfoMessage(teRequests.get(i), teMessage, correlationId);
417  FlowIdStatusPayload status = response.getPayload();
418  if (status.getStatus() == expectedState) {
419  teSuccess++;
420  } else {
421  msgs.add("FAILURE (TE): Flow " + status.getId()
422  + " NOT in " + expectedState
423  + " state: state = " + status.getStatus());
424  teFailure++;
425  }
426  } catch (Exception e) {
427  msgs.add("EXCEPTION in Topology Engine Response: " + e.getMessage());
428  teFailure++;
429  }
430  }
431 
432  BatchResults result = new BatchResults(
433  flowFailure + teFailure,
434  flowSuccess + teSuccess,
435  msgs);
436 
437  LOGGER.debug("Returned: ", result);
438  return result;
439  }
440 
444  @Override
445  public FlowReroutePayload rerouteFlow(String flowId) {
446  return reroute(flowId, false);
447  }
448 
449  @Override
450  public FlowReroutePayload syncFlow(String flowId) {
451  return reroute(flowId, true);
452  }
453 
454  private static final class SimpleSwitchRule {
455  private SwitchId switchId; // so we don't get lost
456  private long cookie;
457  private int inPort;
458  private int outPort;
459  private int inVlan;
460  private int outVlan;
461  private int meterId;
462  private long pktCount; // only set from switch rules, not flow rules
463  private long byteCount; // only set from switch rules, not flow rules
464  private String version;
465 
466  @Override
467  public String toString() {
468  return "{sw:" + switchId
469  + ", ck:" + cookie
470  + ", in:" + inPort + "-" + inVlan
471  + ", out:" + outPort + "-" + outVlan
472  + '}';
473  }
474 
478  public static final List<SimpleSwitchRule> convertFlow(Flow flow) {
479  /*
480  * Start with Ingress
481  */
482  SimpleSwitchRule rule = new SimpleSwitchRule();
483  rule.switchId = flow.getSourceSwitch();
484  rule.cookie = flow.getCookie();
485  rule.inPort = flow.getSourcePort();
486  rule.inVlan = flow.getSourceVlan();
487  rule.meterId = flow.getMeterId();
488  List<PathNode> path = flow.getFlowPath().getPath();
489  // TODO: ensure path is sorted by sequence
490  if (path.size() == 0) {
491  // single switch rule.
492  rule.outPort = flow.getDestinationPort();
493  rule.outVlan = flow.getDestinationVlan();
494  } else {
495  // flows with two switches or more will have at least 2 in getPath()
496  rule.outPort = path.get(0).getPortNo();
497  rule.outVlan = flow.getTransitVlan();
498  // OPTIONAL - for sanity check, we should confirm switch ID and cookie match.
499  }
500  List<SimpleSwitchRule> result = new ArrayList<>();
501  result.add(rule);
502 
503  /*
504  * Now Transits
505  *
506  * .. only if path is greater than 2. If it is 2, then there are just
507  * two switches (no transits).
508  */
509  if (path.size() > 2) {
510  for (int i = 1; i < path.size() - 1; i = i + 2) {
511  // eg .. size 4, means 1 transit .. start at 1,2 .. don't process 3
512  final PathNode inNode = path.get(i);
513  final PathNode outNode = path.get(i + 1);
514 
515  rule = new SimpleSwitchRule();
516  rule.switchId = inNode.getSwitchId();
517  rule.inPort = inNode.getPortNo();
518 
519  rule.cookie = Optional.ofNullable(inNode.getCookie())
520  .filter(cookie -> !cookie.equals(NumberUtils.LONG_ZERO))
521  .orElse(flow.getCookie());
522  rule.inVlan = flow.getTransitVlan();
523  //TODO: out vlan is not set for transit flows. Is it correct behavior?
524  //rule.outVlan = flow.getTransitVlan();
525  rule.outPort = outNode.getPortNo();
526  result.add(rule);
527  }
528  }
529 
530  /*
531  * Now Egress .. only if we have a path. Otherwise it is one switch.
532  */
533  if (path.size() > 0) {
534  rule = new SimpleSwitchRule();
535  rule.switchId = flow.getDestinationSwitch();
536  rule.outPort = flow.getDestinationPort();
537  rule.outVlan = flow.getDestinationVlan();
538  rule.inVlan = flow.getTransitVlan();
539  rule.inPort = path.get(path.size() - 1).getPortNo();
540  rule.cookie = Optional.ofNullable(path.get(path.size() - 1).getCookie())
541  .filter(cookie -> !cookie.equals(NumberUtils.LONG_ZERO))
542  .orElse(flow.getCookie());
543  result.add(rule);
544  }
545  return result;
546  }
547 
551  public static final List<SimpleSwitchRule> convertSwitchRules(SwitchFlowEntries rules) {
552  List<SimpleSwitchRule> result = new ArrayList<>();
553  if (rules == null || rules.getFlowEntries() == null) {
554  return result;
555  }
556 
557  for (FlowEntry switchRule : rules.getFlowEntries()) {
558  logger.debug("FlowEntry: {}", switchRule);
559  SimpleSwitchRule rule = new SimpleSwitchRule();
560  rule.switchId = rules.getSwitchId();
561  rule.cookie = switchRule.getCookie();
562  rule.inPort = NumberUtils.toInt(switchRule.getMatch().getInPort());
563  rule.inVlan = NumberUtils.toInt(switchRule.getMatch().getVlanVid());
564  if (switchRule.getInstructions() != null) {
565  // TODO: What is the right way to get OUT VLAN and OUT PORT? How does it vary?
566  if (switchRule.getInstructions().getApplyActions() != null) {
567  // The outVlan could be empty. If it is, then pop is?
568  FlowApplyActions applyActions = switchRule.getInstructions().getApplyActions();
569  rule.outVlan = Optional.ofNullable(applyActions.getFieldAction())
570  .filter(action -> "vlan_vid".equals(action.getFieldName()))
571  .map(FlowSetFieldAction::getFieldValue)
572  .map(NumberUtils::toInt)
573  .orElse(NumberUtils.INTEGER_ZERO);
574  rule.outPort = NumberUtils.toInt(applyActions.getFlowOutput());
575  }
576 
577  rule.meterId = Optional.ofNullable(switchRule.getInstructions().getGoToMeter())
578  .map(Long::intValue)
579  .orElse(NumberUtils.INTEGER_ZERO);
580  }
581  rule.pktCount = switchRule.getPacketCount();
582  rule.byteCount = switchRule.getByteCount();
583  rule.version = switchRule.getVersion();
584  result.add(rule);
585  }
586  return result;
587  }
588 
594  static List<PathDiscrepancyDto> findDiscrepancy(
595  SimpleSwitchRule expected, List<SimpleSwitchRule> possibleActual,
596  List<Long> pktCounts, List<Long> byteCounts) {
597  List<PathDiscrepancyDto> result = new ArrayList<>();
598 
599  /*
600  * Start with trying to match on the cookie.
601  */
602  SimpleSwitchRule matched = possibleActual.stream()
603  .filter(rule -> rule.cookie != 0 && rule.cookie == expected.cookie)
604  .findFirst()
605  .orElse(null);
606  /*
607  * If no cookie match, then try inport and invlan
608  */
609  if (matched == null) {
610  matched = possibleActual.stream()
611  .filter(rule -> rule.inPort == expected.inPort && rule.inVlan == expected.inVlan)
612  .findFirst()
613  .orElse(null);
614  }
615  /*
616  * Lastly, if cookie doesn't match, and inport / invlan doesn't, try outport/outvlan
617  */
618  if (matched == null) {
619  matched = possibleActual.stream()
620  .filter(rule -> rule.outPort == expected.outPort && rule.outVlan == expected.outVlan)
621  .findFirst()
622  .orElse(null);
623  }
624 
625  /*
626  * If we haven't matched anything .. then file discrepancy for each field used in match.
627  */
628  if (matched == null) {
629  result.add(new PathDiscrepancyDto(String.valueOf(expected), "all", String.valueOf(expected), ""));
630  pktCounts.add(-1L);
631  byteCounts.add(-1L);
632  } else {
633  if (matched.cookie != expected.cookie) {
634  result.add(new PathDiscrepancyDto(expected.toString(), "cookie",
635  String.valueOf(expected.cookie), String.valueOf(matched.cookie)));
636  }
637  if (matched.inPort != expected.inPort) {
638  result.add(new PathDiscrepancyDto(expected.toString(), "inPort",
639  String.valueOf(expected.inPort), String.valueOf(matched.inPort)));
640  }
641  if (matched.inVlan != expected.inVlan) {
642  result.add(new PathDiscrepancyDto(expected.toString(), "inVlan",
643  String.valueOf(expected.inVlan), String.valueOf(matched.inVlan)));
644  }
645  if (matched.outPort != expected.outPort) {
646  result.add(new PathDiscrepancyDto(expected.toString(), "outPort",
647  String.valueOf(expected.outPort), String.valueOf(matched.outPort)));
648  }
649  if (matched.outVlan != expected.outVlan) {
650  result.add(new PathDiscrepancyDto(expected.toString(), "outVlan",
651  String.valueOf(expected.outVlan), String.valueOf(matched.outVlan)));
652  }
653 
654  //TODO: dumping of meters on OF_12 switches (and earlier) is not implemented yet, so skip them.
655  if ((matched.version == null || matched.version.compareTo("OF_12") > 0)
656  && matched.meterId != expected.meterId) {
657  result.add(new PathDiscrepancyDto(expected.toString(), "meterId",
658  String.valueOf(expected.meterId), String.valueOf(matched.meterId)));
659  }
660  pktCounts.add(matched.pktCount);
661  byteCounts.add(matched.byteCount);
662  }
663 
664  return result;
665  }
666  }
667 
671  @Override
672  public List<FlowValidationDto> validateFlow(final String flowId) {
673  final String correlationId = RequestCorrelationId.getId();
674  /*
675  * Algorithm:
676  * 1) Grab the flow from the database
677  * 2) Grab the information off of each switch
678  * 3) Do the comparison
679  */
680 
681  List<Flow> flows = pathComputer.getFlow(flowId);
682  if (flows == null) {
683  return null;
684  }
685 
686  logger.debug("VALIDATE FLOW: Found Flows: count = {}", flows.size());
687 
688  /*
689  * Since we are getting switch rules, we can use a set.
690  */
691  List<List<SimpleSwitchRule>> simpleFlowRules = new ArrayList<>();
692  Set<SwitchId> switches = new HashSet<>();
693  for (Flow flow : flows) {
694  if (flow.getFlowPath() != null) {
695  simpleFlowRules.add(SimpleSwitchRule.convertFlow(flow));
696  switches.add(flow.getSourceSwitch());
697  switches.add(flow.getDestinationSwitch());
698  for (PathNode node : flow.getFlowPath().getPath()) {
699  switches.add(node.getSwitchId());
700  }
701  } else {
702  throw new InvalidPathException(flowId, "Flow Path was not returned.");
703  }
704  }
705 
706  /*
707  * Reality check: we have the flow, and the switch rules. But they are in different formats.
708  * *AND* there are a couple of different ways that one may create a switch rule .. so that
709  * part needs to be flexible.
710  *
711  * Given the above, we'll use a flattened / simple mechanism to represent a switch rule.
712  * With that class, we can then:
713  * 1) use the flow to created the series of expected rules.
714  * 2) either convert all switch rules to the flattened structure, or we try to find the
715  * candidate rule, convert it, and then find discrepancies.
716  */
717 
718  /*)
719  * Now Walk the list, getting the switch rules, so we can process the comparisons.
720  */
721  final Map<SwitchId, SwitchFlowEntries> rules = new HashMap<>();
722  final Map<SwitchId, List<SimpleSwitchRule>> simpleRules = new HashMap<>();
723  int totalSwitchRules = 0;
724  int index = 1;
725  for (SwitchId switchId : switches) {
726  String requestId = correlationId + "-" + index++;
727  SwitchFlowEntries sfe = switchService.getRules(switchId, IGNORE_COOKIE_FILTER, requestId);
728  rules.put(switchId, sfe);
729  simpleRules.put(switchId, SimpleSwitchRule.convertSwitchRules(rules.get(switchId)));
730  totalSwitchRules += (sfe != null && sfe.getFlowEntries() != null) ? sfe.getFlowEntries().size() : 0;
731  }
732 
733  /*
734  * Now we are ready to compare all the rules.
735  */
736  List<FlowValidationDto> results = new ArrayList<>();
737  for (List<SimpleSwitchRule> oneDirection : simpleFlowRules) {
738  List<PathDiscrepancyDto> discrepancies = new ArrayList<>();
739  List<Long> pktCounts = new ArrayList<>();
740  List<Long> byteCounts = new ArrayList<>();
741  for (int i = 0; i < oneDirection.size(); i++) {
742  SimpleSwitchRule simpleRule = oneDirection.get(i);
743  // This is where the comparisons happen.
744  discrepancies.addAll(
745  SimpleSwitchRule.findDiscrepancy(simpleRule,
746  simpleRules.get(simpleRule.switchId),
747  pktCounts, byteCounts
748  ));
749  }
750 
752  result.setFlowId(flowId);
753  result.setDiscrepancies(discrepancies);
754  result.setAsExpected(discrepancies.size() == 0);
755  result.setPktCounts(pktCounts);
756  result.setByteCounts(byteCounts);
757  result.setFlowRulesTotal(oneDirection.size());
758  result.setSwitchRulesTotal(totalSwitchRules);
759  results.add(result);
760  }
761  return results;
762  }
763 
764  @Override
765  public VerificationOutput verifyFlow(String flowId, VerificationInput payload) {
766  FlowVerificationRequest query = new FlowVerificationRequest(flowId, payload.getTimeoutMillis());
767 
768  final String correlationId = RequestCorrelationId.getId();
769  CommandMessage request = new CommandMessage(query, System.currentTimeMillis(), correlationId, Destination.WFM);
770  messageProducer.send(topic, request);
771 
772  Message message = (Message) messageConsumer.poll(correlationId);
773  FlowVerificationResponse response = (FlowVerificationResponse) validateInfoMessage(
774  request, message, correlationId);
775 
776  return flowMapper.toVerificationOutput(response);
777  }
778 
782  @Override
784  final String correlationId = RequestCorrelationId.getId();
785  LOGGER.debug("Flow cache sync. Action: {}", syncCacheAction);
786  FlowCacheSyncRequest data = new FlowCacheSyncRequest(syncCacheAction);
787  CommandMessage request = new CommandMessage(data, System.currentTimeMillis(), correlationId, Destination.WFM);
788  messageProducer.send(topic, request);
789  Message message = (Message) messageConsumer.poll(correlationId);
790  FlowCacheSyncResponse response = (FlowCacheSyncResponse) validateInfoMessage(request, message, correlationId);
791  return response.getPayload();
792  }
793 
794  private FlowReroutePayload reroute(String flowId, boolean forced) {
795  logger.debug("Reroute flow: {}={}, forced={}", FLOW_ID, flowId, forced);
796  String correlationId = RequestCorrelationId.getId();
797  FlowRerouteRequest payload = new FlowRerouteRequest(flowId, forced);
799  payload, System.currentTimeMillis(), correlationId, Destination.WFM);
800 
801  messageProducer.send(topic, command);
802  Message message = (Message) messageConsumer.poll(correlationId);
803 
804  logger.debug("Got reroute response {}", message);
805  FlowRerouteResponse response = (FlowRerouteResponse) validateInfoMessage(command, message, correlationId);
806  return flowMapper.toReroutePayload(flowId, response.getPayload(), response.isRerouted());
807  }
808 }
BatchResults unpushFlows(List< FlowInfoData > externalFlows, Boolean propagate, Boolean verify)
Definition: FlowEntry.java:29
List< FlowValidationDto > validateFlow(final String flowId)
def status()
Definition: rest.py:593
FlowIdStatusPayload statusFlow(final String id)
def command(payload, fields)
Definition: share.py:102
default InfoData validateInfoMessage(final Message requestMessage, final Message responseMessage, final String correlationId)
list result
Definition: plan-d.py:72
static final String CORRELATION_ID
Definition: Utils.java:43
version
Definition: setup.py:25
BatchResults pushFlows(List< FlowInfoData > externalFlows, Boolean propagate, Boolean verify)
def index()
Definition: login.py:30
default List< Flow > getFlow(String flowId)
VerificationOutput verifyFlow(String flowId, VerificationInput payload)
FlowCacheSyncResults syncFlowCache(SynchronizeCacheAction syncCacheAction)
static final String FLOW_ID
Definition: Utils.java:61