Open Kilda Java Documentation
RecordHandler.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka;
17 
18 import static java.util.Arrays.asList;
19 import static org.openkilda.messaging.Utils.MAPPER;
20 
76 
77 import net.floodlightcontroller.core.IOFSwitch;
78 import org.apache.kafka.clients.consumer.ConsumerRecord;
79 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
80 import org.projectfloodlight.openflow.protocol.OFPortDesc;
81 import org.projectfloodlight.openflow.types.DatapathId;
82 import org.projectfloodlight.openflow.types.OFPort;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85 
86 import java.util.ArrayList;
87 import java.util.Collection;
88 import java.util.List;
89 import java.util.Map;
90 import java.util.Optional;
91 import java.util.Set;
92 import java.util.stream.Collectors;
93 
94 class RecordHandler implements Runnable {
95  private static final Logger logger = LoggerFactory.getLogger(RecordHandler.class);
96 
97  private final ConsumerContext context;
98  private final ConsumerRecord<String, String> record;
99  private final MeterPool meterPool;
100 
101  public RecordHandler(ConsumerContext context, ConsumerRecord<String, String> record,
102  MeterPool meterPool) {
103  this.context = context;
104  this.record = record;
105  this.meterPool = meterPool;
106  }
107 
108  protected void doControllerMsg(CommandMessage message) {
109  // Define the destination topic where the reply will be sent to.
110  final String replyToTopic;
111  if (message instanceof CommandWithReplyToMessage) {
112  replyToTopic = ((CommandWithReplyToMessage) message).getReplyTo();
113  } else {
114  replyToTopic = context.getKafkaFlowTopic();
115  }
116  final Destination replyDestination = getDestinationForTopic(replyToTopic);
117 
118  try {
119  handleCommand(message, replyToTopic, replyDestination);
120  } catch (FlowCommandException e) {
121  ErrorMessage error = new ErrorMessage(
122  e.makeErrorResponse(),
123  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
124  context.getKafkaProducer().postMessage(replyToTopic, error);
125  } catch (Exception e) {
126  logger.error("Unhandled exception: {}", e);
127  }
128  }
129 
130  private void handleCommand(CommandMessage message, String replyToTopic, Destination replyDestination)
131  throws FlowCommandException {
132  CommandData data = message.getData();
133  CommandContext context = new CommandContext(this.context.getModuleContext(), message.getCorrelationId());
134 
135  if (data instanceof DiscoverIslCommandData) {
136  doDiscoverIslCommand(message);
137  } else if (data instanceof DiscoverPathCommandData) {
138  doDiscoverPathCommand(data);
139  } else if (data instanceof InstallIngressFlow) {
140  doProcessIngressFlow(message, replyToTopic, replyDestination);
141  } else if (data instanceof InstallEgressFlow) {
142  doProcessEgressFlow(message, replyToTopic, replyDestination);
143  } else if (data instanceof InstallTransitFlow) {
144  doProcessTransitFlow(message, replyToTopic, replyDestination);
145  } else if (data instanceof InstallOneSwitchFlow) {
146  doProcessOneSwitchFlow(message, replyToTopic, replyDestination);
147  } else if (data instanceof RemoveFlow) {
148  doDeleteFlow(message, replyToTopic, replyDestination);
149  } else if (data instanceof NetworkCommandData) {
150  doNetworkDump(message);
151  } else if (data instanceof SwitchRulesDeleteRequest) {
152  doDeleteSwitchRules(message, replyToTopic, replyDestination);
153  } else if (data instanceof SwitchRulesInstallRequest) {
154  doInstallSwitchRules(message, replyToTopic, replyDestination);
155  } else if (data instanceof ConnectModeRequest) {
156  doConnectMode(message, replyToTopic, replyDestination);
157  } else if (data instanceof DumpRulesRequest) {
158  doDumpRulesRequest(message, replyToTopic);
159  } else if (data instanceof BatchInstallRequest) {
160  doBatchInstall(message);
161  } else if (data instanceof PortsCommandData) {
162  doPortsCommandDataRequest(message);
163  } else if (data instanceof UniFlowVerificationRequest) {
164  doFlowVerificationRequest(context, (UniFlowVerificationRequest) data);
165  } else if (data instanceof DeleteMeterRequest) {
166  doDeleteMeter(message, replyToTopic, replyDestination);
167  } else if (data instanceof PortConfigurationRequest) {
168  doConfigurePort(message, replyToTopic, replyDestination);
169  } else {
170  logger.error("unknown data type: {}", data.toString());
171  }
172  }
173 
174  private Destination getDestinationForTopic(String replyToTopic) {
175  //TODO: depending on the future system design, either get rid of destination or complete the switch-case.
176  if (context.getKafkaNorthboundTopic().equals(replyToTopic)) {
177  return Destination.NORTHBOUND;
178  } else {
179  return Destination.WFM_TRANSACTION;
180  }
181  }
182 
183  private void doDiscoverIslCommand(CommandMessage message) {
184  DiscoverIslCommandData command = (DiscoverIslCommandData) message.getData();
185  SwitchId switchId = command.getSwitchId();
186  context.getPathVerificationService().sendDiscoveryMessage(
187  DatapathId.of(switchId.toLong()), OFPort.of(command.getPortNumber()));
188 
189  DiscoPacketSendingConfirmation confirmation = new DiscoPacketSendingConfirmation(
190  new NetworkEndpoint(command.getSwitchId(), command.getPortNumber()));
191  context.getKafkaProducer().postMessage(context.getKafkaTopoDiscoTopic(),
192  new InfoMessage(confirmation, System.currentTimeMillis(), message.getCorrelationId()));
193  }
194 
195  private void doDiscoverPathCommand(CommandData data) {
196  DiscoverPathCommandData command = (DiscoverPathCommandData) data;
197  logger.warn("NOT IMPLEMENTED: sending discover Path to {}", command);
198  }
199 
205  private void doProcessIngressFlow(final CommandMessage message, String replyToTopic, Destination replyDestination)
206  throws FlowCommandException {
207  InstallIngressFlow command = (InstallIngressFlow) message.getData();
208 
209  try {
210  installIngressFlow(command);
211  message.setDestination(replyDestination);
212  context.getKafkaProducer().postMessage(replyToTopic, message);
213  } catch (SwitchOperationException e) {
214  throw new FlowCommandException(command.getId(), ErrorType.CREATION_FAILURE, e);
215  }
216  }
217 
223  private void installIngressFlow(final InstallIngressFlow command) throws SwitchOperationException {
224  logger.debug("Creating an ingress flow: {}", command);
225 
226  long meterId = 0;
227  if (command.getMeterId() != null && command.getMeterId() > 0) {
228  meterId = allocateMeterId(
229  command.getMeterId(), command.getSwitchId(), command.getId(), command.getCookie());
230 
231  context.getSwitchManager().installMeter(
232  DatapathId.of(command.getSwitchId().toLong()),
233  command.getBandwidth(), 1024, meterId);
234  } else {
235  logger.debug("Installing unmetered ingress flow. Switch: {}, cookie: {}",
236  command.getSwitchId(), command.getCookie());
237  }
238 
239  context.getSwitchManager().installIngressFlow(
240  DatapathId.of(command.getSwitchId().toLong()),
241  command.getId(),
242  command.getCookie(),
243  command.getInputPort(),
244  command.getOutputPort(),
245  command.getInputVlanId(),
246  command.getTransitVlanId(),
247  command.getOutputVlanType(),
248  meterId);
249  }
250 
256  private void doProcessEgressFlow(final CommandMessage message, String replyToTopic, Destination replyDestination)
257  throws FlowCommandException {
258  InstallEgressFlow command = (InstallEgressFlow) message.getData();
259 
260  try {
261  installEgressFlow(command);
262  message.setDestination(replyDestination);
263  context.getKafkaProducer().postMessage(replyToTopic, message);
264  } catch (SwitchOperationException e) {
265  throw new FlowCommandException(command.getId(), ErrorType.CREATION_FAILURE, e);
266  }
267  }
268 
274  private void installEgressFlow(InstallEgressFlow command) throws SwitchOperationException {
275  logger.debug("Creating an egress flow: {}", command);
276 
277  context.getSwitchManager().installEgressFlow(
278  DatapathId.of(command.getSwitchId().toLong()),
279  command.getId(),
280  command.getCookie(),
281  command.getInputPort(),
282  command.getOutputPort(),
283  command.getTransitVlanId(),
284  command.getOutputVlanId(),
285  command.getOutputVlanType());
286  }
287 
293  private void doProcessTransitFlow(final CommandMessage message, String replyToTopic, Destination replyDestination)
294  throws FlowCommandException {
295  InstallTransitFlow command = (InstallTransitFlow) message.getData();
296 
297  try {
298  installTransitFlow(command);
299  message.setDestination(replyDestination);
300  context.getKafkaProducer().postMessage(replyToTopic, message);
301  } catch (SwitchOperationException e) {
302  throw new FlowCommandException(command.getId(), ErrorType.CREATION_FAILURE, e);
303  }
304  }
305 
311  private void installTransitFlow(final InstallTransitFlow command) throws SwitchOperationException {
312  logger.debug("Creating a transit flow: {}", command);
313 
314  context.getSwitchManager().installTransitFlow(
315  DatapathId.of(command.getSwitchId().toLong()),
316  command.getId(),
317  command.getCookie(),
318  command.getInputPort(),
319  command.getOutputPort(),
320  command.getTransitVlanId());
321  }
322 
328  private void doProcessOneSwitchFlow(final CommandMessage message, String replyToTopic, Destination replyDestination)
329  throws FlowCommandException {
330  InstallOneSwitchFlow command = (InstallOneSwitchFlow) message.getData();
331  logger.debug("creating a flow through one switch: {}", command);
332 
333  try {
334  installOneSwitchFlow(command);
335  message.setDestination(replyDestination);
336  context.getKafkaProducer().postMessage(replyToTopic, message);
337  } catch (SwitchOperationException e) {
338  throw new FlowCommandException(command.getId(), ErrorType.CREATION_FAILURE, e);
339  }
340  }
341 
347  private void installOneSwitchFlow(InstallOneSwitchFlow command) throws SwitchOperationException {
348  long meterId = 0;
349  if (command.getMeterId() != null && command.getMeterId() > 0) {
350  meterId = allocateMeterId(
351  command.getMeterId(), command.getSwitchId(), command.getId(), command.getCookie());
352 
353  context.getSwitchManager().installMeter(
354  DatapathId.of(command.getSwitchId().toLong()),
355  command.getBandwidth(), 1024, meterId);
356  } else {
357  logger.debug("Installing unmetered one switch flow. Switch: {}, cookie: {}",
358  command.getSwitchId(), command.getCookie());
359  }
360 
361  OutputVlanType directOutputVlanType = command.getOutputVlanType();
362  context.getSwitchManager().installOneSwitchFlow(
363  DatapathId.of(command.getSwitchId().toLong()),
364  command.getId(),
365  command.getCookie(),
366  command.getInputPort(),
367  command.getOutputPort(),
368  command.getInputVlanId(),
369  command.getOutputVlanId(),
370  directOutputVlanType,
371  meterId);
372  }
373 
379  private void doDeleteFlow(final CommandMessage message, String replyToTopic, Destination replyDestination)
380  throws FlowCommandException {
381  RemoveFlow command = (RemoveFlow) message.getData();
382  logger.debug("deleting a flow: {}", command);
383 
384  DatapathId dpid = DatapathId.of(command.getSwitchId().toLong());
385  ISwitchManager switchManager = context.getSwitchManager();
386  try {
387  logger.info("Deleting flow {} from switch {}", command.getId(), dpid);
388 
389  DeleteRulesCriteria criteria = Optional.ofNullable(command.getCriteria())
390  .orElseGet(() -> DeleteRulesCriteria.builder().cookie(command.getCookie()).build());
391  List<Long> cookiesOfRemovedRules = switchManager.deleteRulesByCriteria(dpid, criteria);
392  if (cookiesOfRemovedRules.isEmpty()) {
393  logger.warn("No rules were removed by criteria {} for flow {} from switch {}",
394  criteria, command.getId(), dpid);
395  }
396 
397  // FIXME(surabujin): QUICK FIX - try to drop meterPool completely
398  Long meterId = command.getMeterId();
399  if (meterId != null) {
400  switchManager.deleteMeter(dpid, meterId);
401  }
402 
403  message.setDestination(replyDestination);
404  context.getKafkaProducer().postMessage(replyToTopic, message);
405  } catch (SwitchOperationException e) {
406  throw new FlowCommandException(command.getId(), ErrorType.DELETION_FAILURE, e);
407  }
408  }
409 
415  private void doNetworkDump(final CommandMessage message) {
416 
417  String correlationId = message.getCorrelationId();
418  KafkaMessageProducer kafkaProducer = context.getKafkaProducer();
419  String outputDiscoTopic = context.getKafkaTopoDiscoTopic();
420 
421  logger.debug("Processing request from WFM to dump switches. {}", correlationId);
422 
423  kafkaProducer.getProducer().enableGuaranteedOrder(outputDiscoTopic);
424  try {
425 
426  kafkaProducer.postMessage(outputDiscoTopic,
427  new InfoMessage(new NetworkDumpBeginMarker(), System.currentTimeMillis(), correlationId));
428 
429  Map<DatapathId, IOFSwitch> allSwitchMap = context.getSwitchManager().getAllSwitchMap();
430 
431  allSwitchMap.values().stream()
432  .map(this::buildNetworkDumpSwitchData)
433  .forEach(sw ->
434  kafkaProducer.postMessage(outputDiscoTopic,
435  new InfoMessage(sw, System.currentTimeMillis(), correlationId)));
436 
437  allSwitchMap.values().stream()
438  .flatMap(sw -> sw.getEnabledPorts().stream()
439  .filter(port -> SwitchEventCollector.isPhysicalPort(port.getPortNo()))
440  .map(port -> buildNetworkDumpPortData(sw, port)))
441  .forEach(port ->
442  kafkaProducer.postMessage(outputDiscoTopic,
443  new InfoMessage(port, System.currentTimeMillis(), correlationId)));
444 
445  kafkaProducer.postMessage(
446  outputDiscoTopic,
447  new InfoMessage(
448  new NetworkDumpEndMarker(), System.currentTimeMillis(),
449  correlationId));
450  } finally {
451  kafkaProducer.getProducer().disableGuaranteedOrder(outputDiscoTopic);
452  }
453  }
454 
458  protected NetworkDumpSwitchData buildNetworkDumpSwitchData(IOFSwitch sw) {
459  return new NetworkDumpSwitchData(new SwitchId(sw.getId().getLong()));
460  }
461 
465  private NetworkDumpPortData buildNetworkDumpPortData(IOFSwitch sw, OFPortDesc port) {
466  return new NetworkDumpPortData(new SwitchId(sw.getId().getLong()), port.getPortNo().getPortNumber());
467  }
468 
469  private void doInstallSwitchRules(final CommandMessage message, String replyToTopic, Destination replyDestination) {
470  SwitchRulesInstallRequest request = (SwitchRulesInstallRequest) message.getData();
471  logger.debug("Installing rules on '{}' switch: action={}",
472  request.getSwitchId(), request.getInstallRulesAction());
473 
474  DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
475  ISwitchManager switchManager = context.getSwitchManager();
476  InstallRulesAction installAction = request.getInstallRulesAction();
477  List<Long> installedRules = new ArrayList<>();
478  try {
479  if (installAction == InstallRulesAction.INSTALL_DROP) {
480  switchManager.installDropFlow(dpid);
481  installedRules.add(ISwitchManager.DROP_RULE_COOKIE);
482  } else if (installAction == InstallRulesAction.INSTALL_BROADCAST) {
483  switchManager.installVerificationRule(dpid, true);
484  installedRules.add(ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE);
485  } else if (installAction == InstallRulesAction.INSTALL_UNICAST) {
486  // TODO: this isn't always added (ie if OF1.2). Is there a better response?
487  switchManager.installVerificationRule(dpid, false);
488  installedRules.add(ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE);
489  } else {
490  switchManager.installDefaultRules(dpid);
491  installedRules.addAll(asList(
492  ISwitchManager.DROP_RULE_COOKIE,
493  ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE,
494  ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE
495  ));
496  }
497 
498  SwitchRulesResponse response = new SwitchRulesResponse(installedRules);
499  InfoMessage infoMessage = new InfoMessage(response,
500  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
501  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
502 
503  } catch (SwitchOperationException e) {
504  ErrorData errorData = new ErrorData(ErrorType.CREATION_FAILURE, e.getMessage(),
505  request.getSwitchId().toString());
506  ErrorMessage error = new ErrorMessage(errorData,
507  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
508  context.getKafkaProducer().postMessage(replyToTopic, error);
509  }
510  }
511 
512  private void doDeleteSwitchRules(final CommandMessage message, String replyToTopic, Destination replyDestination) {
513  SwitchRulesDeleteRequest request = (SwitchRulesDeleteRequest) message.getData();
514  logger.debug("Deleting rules from '{}' switch: action={}, criteria={}", request.getSwitchId(),
515  request.getDeleteRulesAction(), request.getCriteria());
516 
517  DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
518  DeleteRulesAction deleteAction = request.getDeleteRulesAction();
519  DeleteRulesCriteria criteria = request.getCriteria();
520 
521  ISwitchManager switchManager = context.getSwitchManager();
522 
523  try {
524  List<Long> removedRules = new ArrayList<>();
525 
526  if (deleteAction != null) {
527  switch (deleteAction) {
528  case REMOVE_DROP:
529  criteria = DeleteRulesCriteria.builder()
530  .cookie(ISwitchManager.DROP_RULE_COOKIE).build();
531  break;
532  case REMOVE_BROADCAST:
533  criteria = DeleteRulesCriteria.builder()
534  .cookie(ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE).build();
535  break;
536  case REMOVE_UNICAST:
537  criteria = DeleteRulesCriteria.builder()
538  .cookie(ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE).build();
539  break;
540  default:
541  }
542 
543  // The cases when we delete all non-default rules.
544  if (deleteAction.nonDefaultRulesToBeRemoved()) {
545  removedRules.addAll(switchManager.deleteAllNonDefaultRules(dpid));
546  }
547 
548  // The cases when we delete the default rules.
549  if (deleteAction.defaultRulesToBeRemoved()) {
550  removedRules.addAll(switchManager.deleteDefaultRules(dpid));
551  }
552  }
553 
554  // The case when we either delete by criteria or a specific default rule.
555  if (criteria != null) {
556  removedRules.addAll(switchManager.deleteRulesByCriteria(dpid, criteria));
557  }
558 
559  // The cases when we (re)install the default rules.
560  if (deleteAction != null && deleteAction.defaultRulesToBeInstalled()) {
561  switchManager.installDefaultRules(dpid);
562  }
563 
564  SwitchRulesResponse response = new SwitchRulesResponse(removedRules);
565  InfoMessage infoMessage = new InfoMessage(response,
566  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
567  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
568 
569  } catch (SwitchOperationException e) {
570  ErrorData errorData = new ErrorData(ErrorType.DELETION_FAILURE, e.getMessage(),
571  request.getSwitchId().toString());
572  ErrorMessage error = new ErrorMessage(errorData,
573  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
574  context.getKafkaProducer().postMessage(replyToTopic, error);
575  }
576  }
577 
578  private void doConnectMode(final CommandMessage message, String replyToTopic, Destination replyDestination) {
579  ConnectModeRequest request = (ConnectModeRequest) message.getData();
580  if (request.getMode() != null) {
581  logger.debug("Setting CONNECT MODE to '{}'", request.getMode());
582  } else {
583  logger.debug("Getting CONNECT MODE");
584  }
585 
586  ISwitchManager switchManager = context.getSwitchManager();
587  ConnectModeRequest.Mode result = switchManager.connectMode(request.getMode());
588 
589  logger.debug("CONNECT MODE is now '{}'", result);
590  ConnectModeResponse response = new ConnectModeResponse(result);
591  InfoMessage infoMessage = new InfoMessage(response,
592  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
593  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
594 
595  }
596 
597  private void doDumpRulesRequest(final CommandMessage message, String replyToTopic) {
598  DumpRulesRequest request = (DumpRulesRequest) message.getData();
599  SwitchId switchId = request.getSwitchId();
600  logger.debug("Loading installed rules for switch {}", switchId);
601 
602  List<OFFlowStatsEntry> flowEntries =
603  context.getSwitchManager().dumpFlowTable(DatapathId.of(switchId.toLong()));
604  List<FlowEntry> flows = flowEntries.stream()
606  .collect(Collectors.toList());
607 
608  SwitchFlowEntries response = SwitchFlowEntries.builder()
609  .switchId(switchId)
610  .flowEntries(flows)
611  .build();
612  InfoMessage infoMessage = new InfoMessage(response, message.getTimestamp(),
613  message.getCorrelationId());
614  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
615  }
616 
622  private void doBatchInstall(final CommandMessage message) throws FlowCommandException {
623  BatchInstallRequest request = (BatchInstallRequest) message.getData();
624  final String switchId = request.getSwitchId();
625  logger.debug("Processing flow commands for switch {}", switchId);
626 
627  for (BaseInstallFlow command : request.getFlowCommands()) {
628  logger.debug("Processing command for switch {} {}", switchId, command);
629  try {
630  if (command instanceof InstallIngressFlow) {
631  installIngressFlow((InstallIngressFlow) command);
632  } else if (command instanceof InstallEgressFlow) {
633  installEgressFlow((InstallEgressFlow) command);
634  } else if (command instanceof InstallTransitFlow) {
635  installTransitFlow((InstallTransitFlow) command);
636  } else if (command instanceof InstallOneSwitchFlow) {
637  installOneSwitchFlow((InstallOneSwitchFlow) command);
638  } else {
639  throw new FlowCommandException(command.getId(), ErrorType.REQUEST_INVALID,
640  "Unsupported command for batch install.");
641  }
642  } catch (SwitchOperationException e) {
643  logger.error("Error during flow installation", e);
644  }
645  }
646  }
647 
648  private void doPortsCommandDataRequest(CommandMessage message) {
649  try {
650  PortsCommandData request = (PortsCommandData) message.getData();
651  Map<DatapathId, IOFSwitch> allSwitchMap = context.getSwitchManager().getAllSwitchMap();
652  for (Map.Entry<DatapathId, IOFSwitch> entry : allSwitchMap.entrySet()) {
653  SwitchId switchId = new SwitchId(entry.getKey().toString());
654  try {
655  IOFSwitch sw = entry.getValue();
656  Collection<OFPort> enabledPortNumbers = sw.getEnabledPortNumbers();
657 
658  Set<PortStatus> portsStatus = sw.getPorts().stream()
659  .filter(port -> SwitchEventCollector.isPhysicalPort(port.getPortNo()))
660  .map(port -> PortStatus.builder()
661  .id(port.getPortNo().getPortNumber())
662  .status(enabledPortNumbers.contains(port.getPortNo())
663  ? PortChangeType.UP : PortChangeType.DOWN)
664  .build()
665  ).collect(Collectors.toSet());
666  SwitchPortStatusData response = SwitchPortStatusData.builder()
667  .switchId(switchId)
668  .ports(portsStatus)
669  .requester(request.getRequester())
670  .build();
671 
672  InfoMessage infoMessage = new InfoMessage(response, message.getTimestamp(),
673  message.getCorrelationId());
674  context.getKafkaProducer().postMessage(context.getKafkaStatsTopic(), infoMessage);
675  } catch (Exception e) {
676  logger.error("Could not get port stats data for switch {} with error {}",
677  switchId, e.getMessage(), e);
678  }
679  }
680  } catch (Exception e) {
681  logger.error("Could not get port data for stats {}", e.getMessage(), e);
682  }
683  }
684 
685  private void doFlowVerificationRequest(CommandContext context, UniFlowVerificationRequest request) {
686  VerificationDispatchCommand verification = new VerificationDispatchCommand(context, request);
687  verification.run();
688  }
689 
690  private void doDeleteMeter(CommandMessage message, String replyToTopic, Destination replyDestination) {
691  DeleteMeterRequest request = (DeleteMeterRequest) message.getData();
692  try {
693  DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
694  long txId = context.getSwitchManager().deleteMeter(dpid, request.getMeterId());
695 
696  DeleteMeterResponse response = new DeleteMeterResponse(txId != 0L);
697  InfoMessage infoMessage = new InfoMessage(response, System.currentTimeMillis(), message.getCorrelationId(),
698  replyDestination);
699  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
700  } catch (SwitchOperationException e) {
701  logger.info("Meter deletion is unsuccessful. Switch {} not found", request.getSwitchId());
702  ErrorData errorData =
703  new ErrorData(ErrorType.DATA_INVALID, e.getMessage(), request.getSwitchId().toString());
704  ErrorMessage error = new ErrorMessage(errorData,
705  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
706  context.getKafkaProducer().postMessage(replyToTopic, error);
707  }
708  }
709 
710  private void doConfigurePort(final CommandMessage message, final String replyToTopic,
711  final Destination replyDestination) {
712  PortConfigurationRequest request = (PortConfigurationRequest) message.getData();
713 
714  logger.info("Port configuration request. Switch '{}', Port '{}'", request.getSwitchId(),
715  request.getPortNumber());
716  try {
717  ISwitchManager switchManager = context.getSwitchManager();
718 
719  DatapathId dpId = DatapathId.of(request.getSwitchId().toLong());
720  switchManager.configurePort(dpId, request.getPortNumber(), request.getAdminDown());
721 
722  InfoMessage infoMessage = new InfoMessage(
723  new PortConfigurationResponse(request.getSwitchId(), request.getPortNumber()),
724  message.getTimestamp(),
725  message.getCorrelationId());
726  context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
727  } catch (SwitchOperationException e) {
728  logger.error("Port configuration request failed. " + e.getMessage(), e);
729  ErrorData errorData = new ErrorData(ErrorType.DATA_INVALID, e.getMessage(),
730  "Port configuration request failed");
731  ErrorMessage error = new ErrorMessage(errorData,
732  System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
733  context.getKafkaProducer().postMessage(replyToTopic, error);
734  }
735  }
736 
737  private long allocateMeterId(Long meterId, SwitchId switchId, String flowId, Long cookie) {
738  long allocatedId;
739 
740  if (meterId == null) {
741  logger.error("Meter_id should be passed within one switch flow command. Cookie is {}", cookie);
742  allocatedId = (long) meterPool.allocate(switchId, flowId);
743  logger.error("Allocated meter_id {} for cookie {}", allocatedId, cookie);
744  } else {
745  allocatedId = meterPool.allocate(switchId, flowId, Math.toIntExact(meterId));
746  }
747  return allocatedId;
748  }
749 
750  private void parseRecord(ConsumerRecord<String, String> record) {
751  CommandMessage message;
752  try {
753  String value = record.value();
754  // TODO: Prior to Message changes, this MAPPER would read Message ..
755  // but, changed to BaseMessage and got an error wrt "timestamp" ..
756  // so, need to experiment with why CommandMessage can't be read as
757  // a BaseMessage
758  message = MAPPER.readValue(value, CommandMessage.class);
759  } catch (Exception exception) {
760  logger.error("error parsing record={}", record.value(), exception);
761  return;
762  }
763 
764  // Process the message within the message correlation context.
765  try (CorrelationContextClosable closable = CorrelationContext.create(message.getCorrelationId())) {
766  doControllerMsg(message);
767  } catch (Exception exception) {
768  logger.error("error processing message={}", message, exception);
769  }
770  }
771 
772  @Override
773  public void run() {
774  parseRecord(record);
775  }
776 
777  public static class Factory {
778  private final ConsumerContext context;
779  private final MeterPool meterPool = new MeterPool();
780 
781  public Factory(ConsumerContext context) {
782  this.context = context;
783  }
784 
785  public RecordHandler produce(ConsumerRecord<String, String> record) {
786  return new RecordHandler(context, record, meterPool);
787  }
788  }
789 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
static FlowEntry toFlowEntry(final OFFlowStatsEntry entry)
value
Definition: nodes.py:62
Definition: FlowEntry.java:29
def status()
Definition: rest.py:593
def command(payload, fields)
Definition: share.py:102
list result
Definition: plan-d.py:72
def build()
Definition: plan-e.py:73
net
Definition: plan-b.py:46