16 package org.openkilda.floodlight.kafka;
18 import static java.util.Arrays.asList;
77 import net.floodlightcontroller.core.IOFSwitch;
78 import org.apache.kafka.clients.consumer.ConsumerRecord;
79 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
80 import org.projectfloodlight.openflow.protocol.OFPortDesc;
81 import org.projectfloodlight.openflow.types.DatapathId;
82 import org.projectfloodlight.openflow.types.OFPort;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
86 import java.util.ArrayList;
87 import java.util.Collection;
88 import java.util.List;
90 import java.util.Optional;
92 import java.util.stream.Collectors;
94 class RecordHandler
implements Runnable {
95 private static final Logger
logger = LoggerFactory.getLogger(RecordHandler.class);
97 private final ConsumerContext context;
98 private final ConsumerRecord<String, String> record;
99 private final MeterPool meterPool;
101 public RecordHandler(ConsumerContext context, ConsumerRecord<String, String> record,
102 MeterPool meterPool) {
103 this.context = context;
104 this.record = record;
105 this.meterPool = meterPool;
108 protected void doControllerMsg(CommandMessage message) {
110 final String replyToTopic;
111 if (message instanceof CommandWithReplyToMessage) {
112 replyToTopic = ((CommandWithReplyToMessage) message).getReplyTo();
114 replyToTopic = context.getKafkaFlowTopic();
116 final Destination replyDestination = getDestinationForTopic(replyToTopic);
119 handleCommand(message, replyToTopic, replyDestination);
120 }
catch (FlowCommandException e) {
121 ErrorMessage error =
new ErrorMessage(
122 e.makeErrorResponse(),
123 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
124 context.getKafkaProducer().postMessage(replyToTopic, error);
125 }
catch (Exception e) {
126 logger.error(
"Unhandled exception: {}", e);
130 private void handleCommand(CommandMessage message, String replyToTopic, Destination replyDestination)
131 throws FlowCommandException {
132 CommandData
data = message.getData();
133 CommandContext context =
new CommandContext(this.context.getModuleContext(), message.getCorrelationId());
135 if (
data instanceof DiscoverIslCommandData) {
136 doDiscoverIslCommand(message);
137 }
else if (
data instanceof DiscoverPathCommandData) {
138 doDiscoverPathCommand(
data);
139 }
else if (
data instanceof InstallIngressFlow) {
140 doProcessIngressFlow(message, replyToTopic, replyDestination);
141 }
else if (
data instanceof InstallEgressFlow) {
142 doProcessEgressFlow(message, replyToTopic, replyDestination);
143 }
else if (
data instanceof InstallTransitFlow) {
144 doProcessTransitFlow(message, replyToTopic, replyDestination);
145 }
else if (
data instanceof InstallOneSwitchFlow) {
146 doProcessOneSwitchFlow(message, replyToTopic, replyDestination);
147 }
else if (
data instanceof RemoveFlow) {
148 doDeleteFlow(message, replyToTopic, replyDestination);
149 }
else if (
data instanceof NetworkCommandData) {
150 doNetworkDump(message);
151 }
else if (
data instanceof SwitchRulesDeleteRequest) {
152 doDeleteSwitchRules(message, replyToTopic, replyDestination);
153 }
else if (
data instanceof SwitchRulesInstallRequest) {
154 doInstallSwitchRules(message, replyToTopic, replyDestination);
155 }
else if (
data instanceof ConnectModeRequest) {
156 doConnectMode(message, replyToTopic, replyDestination);
157 }
else if (
data instanceof DumpRulesRequest) {
158 doDumpRulesRequest(message, replyToTopic);
159 }
else if (
data instanceof BatchInstallRequest) {
160 doBatchInstall(message);
161 }
else if (
data instanceof PortsCommandData) {
162 doPortsCommandDataRequest(message);
163 }
else if (
data instanceof UniFlowVerificationRequest) {
164 doFlowVerificationRequest(context, (UniFlowVerificationRequest)
data);
165 }
else if (
data instanceof DeleteMeterRequest) {
166 doDeleteMeter(message, replyToTopic, replyDestination);
167 }
else if (
data instanceof PortConfigurationRequest) {
168 doConfigurePort(message, replyToTopic, replyDestination);
170 logger.error(
"unknown data type: {}",
data.toString());
174 private Destination getDestinationForTopic(String replyToTopic) {
176 if (context.getKafkaNorthboundTopic().equals(replyToTopic)) {
177 return Destination.NORTHBOUND;
179 return Destination.WFM_TRANSACTION;
183 private void doDiscoverIslCommand(CommandMessage message) {
184 DiscoverIslCommandData
command = (DiscoverIslCommandData) message.getData();
185 SwitchId switchId =
command.getSwitchId();
186 context.getPathVerificationService().sendDiscoveryMessage(
187 DatapathId.of(switchId.toLong()), OFPort.of(
command.getPortNumber()));
189 DiscoPacketSendingConfirmation confirmation =
new DiscoPacketSendingConfirmation(
190 new NetworkEndpoint(
command.getSwitchId(),
command.getPortNumber()));
191 context.getKafkaProducer().postMessage(context.getKafkaTopoDiscoTopic(),
192 new InfoMessage(confirmation, System.currentTimeMillis(), message.getCorrelationId()));
195 private void doDiscoverPathCommand(CommandData
data) {
196 DiscoverPathCommandData
command = (DiscoverPathCommandData)
data;
197 logger.warn(
"NOT IMPLEMENTED: sending discover Path to {}",
command);
205 private void doProcessIngressFlow(
final CommandMessage message, String replyToTopic, Destination replyDestination)
206 throws FlowCommandException {
207 InstallIngressFlow
command = (InstallIngressFlow) message.getData();
211 message.setDestination(replyDestination);
212 context.getKafkaProducer().postMessage(replyToTopic, message);
213 }
catch (SwitchOperationException e) {
214 throw new FlowCommandException(
command.getId(), ErrorType.CREATION_FAILURE, e);
223 private void installIngressFlow(
final InstallIngressFlow
command)
throws SwitchOperationException {
228 meterId = allocateMeterId(
231 context.getSwitchManager().installMeter(
232 DatapathId.of(
command.getSwitchId().toLong()),
233 command.getBandwidth(), 1024, meterId);
235 logger.debug(
"Installing unmetered ingress flow. Switch: {}, cookie: {}",
239 context.getSwitchManager().installIngressFlow(
240 DatapathId.of(
command.getSwitchId().toLong()),
256 private void doProcessEgressFlow(
final CommandMessage message, String replyToTopic, Destination replyDestination)
257 throws FlowCommandException {
258 InstallEgressFlow
command = (InstallEgressFlow) message.getData();
262 message.setDestination(replyDestination);
263 context.getKafkaProducer().postMessage(replyToTopic, message);
264 }
catch (SwitchOperationException e) {
265 throw new FlowCommandException(
command.getId(), ErrorType.CREATION_FAILURE, e);
274 private void installEgressFlow(InstallEgressFlow
command)
throws SwitchOperationException {
277 context.getSwitchManager().installEgressFlow(
278 DatapathId.of(
command.getSwitchId().toLong()),
293 private void doProcessTransitFlow(
final CommandMessage message, String replyToTopic, Destination replyDestination)
294 throws FlowCommandException {
295 InstallTransitFlow
command = (InstallTransitFlow) message.getData();
299 message.setDestination(replyDestination);
300 context.getKafkaProducer().postMessage(replyToTopic, message);
301 }
catch (SwitchOperationException e) {
302 throw new FlowCommandException(
command.getId(), ErrorType.CREATION_FAILURE, e);
311 private void installTransitFlow(
final InstallTransitFlow
command)
throws SwitchOperationException {
314 context.getSwitchManager().installTransitFlow(
315 DatapathId.of(
command.getSwitchId().toLong()),
328 private void doProcessOneSwitchFlow(
final CommandMessage message, String replyToTopic, Destination replyDestination)
329 throws FlowCommandException {
330 InstallOneSwitchFlow
command = (InstallOneSwitchFlow) message.getData();
331 logger.debug(
"creating a flow through one switch: {}",
command);
335 message.setDestination(replyDestination);
336 context.getKafkaProducer().postMessage(replyToTopic, message);
337 }
catch (SwitchOperationException e) {
338 throw new FlowCommandException(
command.getId(), ErrorType.CREATION_FAILURE, e);
347 private void installOneSwitchFlow(InstallOneSwitchFlow
command)
throws SwitchOperationException {
350 meterId = allocateMeterId(
353 context.getSwitchManager().installMeter(
354 DatapathId.of(
command.getSwitchId().toLong()),
355 command.getBandwidth(), 1024, meterId);
357 logger.debug(
"Installing unmetered one switch flow. Switch: {}, cookie: {}",
361 OutputVlanType directOutputVlanType =
command.getOutputVlanType();
362 context.getSwitchManager().installOneSwitchFlow(
363 DatapathId.of(
command.getSwitchId().toLong()),
370 directOutputVlanType,
379 private void doDeleteFlow(
final CommandMessage message, String replyToTopic, Destination replyDestination)
380 throws FlowCommandException {
381 RemoveFlow
command = (RemoveFlow) message.getData();
384 DatapathId dpid = DatapathId.of(
command.getSwitchId().toLong());
385 ISwitchManager switchManager = context.getSwitchManager();
387 logger.info(
"Deleting flow {} from switch {}",
command.getId(), dpid);
389 DeleteRulesCriteria criteria = Optional.ofNullable(
command.getCriteria())
390 .orElseGet(() -> DeleteRulesCriteria.builder().cookie(
command.getCookie()).
build());
391 List<Long> cookiesOfRemovedRules = switchManager.deleteRulesByCriteria(dpid, criteria);
392 if (cookiesOfRemovedRules.isEmpty()) {
393 logger.warn(
"No rules were removed by criteria {} for flow {} from switch {}",
394 criteria,
command.getId(), dpid);
398 Long meterId =
command.getMeterId();
399 if (meterId != null) {
400 switchManager.deleteMeter(dpid, meterId);
403 message.setDestination(replyDestination);
404 context.getKafkaProducer().postMessage(replyToTopic, message);
405 }
catch (SwitchOperationException e) {
406 throw new FlowCommandException(
command.getId(), ErrorType.DELETION_FAILURE, e);
415 private void doNetworkDump(
final CommandMessage message) {
417 String correlationId = message.getCorrelationId();
418 KafkaMessageProducer kafkaProducer = context.getKafkaProducer();
419 String outputDiscoTopic = context.getKafkaTopoDiscoTopic();
421 logger.debug(
"Processing request from WFM to dump switches. {}", correlationId);
423 kafkaProducer.getProducer().enableGuaranteedOrder(outputDiscoTopic);
426 kafkaProducer.postMessage(outputDiscoTopic,
427 new InfoMessage(
new NetworkDumpBeginMarker(), System.currentTimeMillis(), correlationId));
429 Map<DatapathId, IOFSwitch> allSwitchMap = context.getSwitchManager().getAllSwitchMap();
431 allSwitchMap.values().stream()
432 .map(this::buildNetworkDumpSwitchData)
434 kafkaProducer.postMessage(outputDiscoTopic,
435 new InfoMessage(sw, System.currentTimeMillis(), correlationId)));
437 allSwitchMap.values().stream()
438 .flatMap(sw -> sw.getEnabledPorts().stream()
439 .filter(
port -> SwitchEventCollector.isPhysicalPort(
port.getPortNo()))
440 .map(
port -> buildNetworkDumpPortData(sw,
port)))
442 kafkaProducer.postMessage(outputDiscoTopic,
443 new InfoMessage(
port, System.currentTimeMillis(), correlationId)));
445 kafkaProducer.postMessage(
448 new NetworkDumpEndMarker(), System.currentTimeMillis(),
451 kafkaProducer.getProducer().disableGuaranteedOrder(outputDiscoTopic);
458 protected NetworkDumpSwitchData buildNetworkDumpSwitchData(IOFSwitch sw) {
459 return new NetworkDumpSwitchData(
new SwitchId(sw.getId().getLong()));
465 private NetworkDumpPortData buildNetworkDumpPortData(IOFSwitch sw, OFPortDesc
port) {
466 return new NetworkDumpPortData(
new SwitchId(sw.getId().getLong()),
port.getPortNo().getPortNumber());
469 private void doInstallSwitchRules(
final CommandMessage message, String replyToTopic, Destination replyDestination) {
470 SwitchRulesInstallRequest request = (SwitchRulesInstallRequest) message.getData();
471 logger.debug(
"Installing rules on '{}' switch: action={}",
472 request.getSwitchId(), request.getInstallRulesAction());
474 DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
475 ISwitchManager switchManager = context.getSwitchManager();
476 InstallRulesAction installAction = request.getInstallRulesAction();
477 List<Long> installedRules =
new ArrayList<>();
479 if (installAction == InstallRulesAction.INSTALL_DROP) {
480 switchManager.installDropFlow(dpid);
481 installedRules.add(ISwitchManager.DROP_RULE_COOKIE);
482 }
else if (installAction == InstallRulesAction.INSTALL_BROADCAST) {
483 switchManager.installVerificationRule(dpid,
true);
484 installedRules.add(ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE);
485 }
else if (installAction == InstallRulesAction.INSTALL_UNICAST) {
487 switchManager.installVerificationRule(dpid,
false);
488 installedRules.add(ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE);
490 switchManager.installDefaultRules(dpid);
491 installedRules.addAll(asList(
492 ISwitchManager.DROP_RULE_COOKIE,
493 ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE,
494 ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE
498 SwitchRulesResponse response =
new SwitchRulesResponse(installedRules);
499 InfoMessage infoMessage =
new InfoMessage(response,
500 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
501 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
503 }
catch (SwitchOperationException e) {
504 ErrorData errorData =
new ErrorData(ErrorType.CREATION_FAILURE, e.getMessage(),
505 request.getSwitchId().toString());
506 ErrorMessage error =
new ErrorMessage(errorData,
507 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
508 context.getKafkaProducer().postMessage(replyToTopic, error);
512 private void doDeleteSwitchRules(
final CommandMessage message, String replyToTopic, Destination replyDestination) {
513 SwitchRulesDeleteRequest request = (SwitchRulesDeleteRequest) message.getData();
514 logger.debug(
"Deleting rules from '{}' switch: action={}, criteria={}", request.getSwitchId(),
515 request.getDeleteRulesAction(), request.getCriteria());
517 DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
518 DeleteRulesAction deleteAction = request.getDeleteRulesAction();
519 DeleteRulesCriteria criteria = request.getCriteria();
521 ISwitchManager switchManager = context.getSwitchManager();
524 List<Long> removedRules =
new ArrayList<>();
526 if (deleteAction != null) {
527 switch (deleteAction) {
529 criteria = DeleteRulesCriteria.builder()
530 .cookie(ISwitchManager.DROP_RULE_COOKIE).build();
532 case REMOVE_BROADCAST:
533 criteria = DeleteRulesCriteria.builder()
534 .cookie(ISwitchManager.VERIFICATION_BROADCAST_RULE_COOKIE).build();
537 criteria = DeleteRulesCriteria.builder()
538 .cookie(ISwitchManager.VERIFICATION_UNICAST_RULE_COOKIE).build();
544 if (deleteAction.nonDefaultRulesToBeRemoved()) {
545 removedRules.addAll(switchManager.deleteAllNonDefaultRules(dpid));
549 if (deleteAction.defaultRulesToBeRemoved()) {
550 removedRules.addAll(switchManager.deleteDefaultRules(dpid));
555 if (criteria != null) {
556 removedRules.addAll(switchManager.deleteRulesByCriteria(dpid, criteria));
560 if (deleteAction != null && deleteAction.defaultRulesToBeInstalled()) {
561 switchManager.installDefaultRules(dpid);
564 SwitchRulesResponse response =
new SwitchRulesResponse(removedRules);
565 InfoMessage infoMessage =
new InfoMessage(response,
566 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
567 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
569 }
catch (SwitchOperationException e) {
570 ErrorData errorData =
new ErrorData(ErrorType.DELETION_FAILURE, e.getMessage(),
571 request.getSwitchId().toString());
572 ErrorMessage error =
new ErrorMessage(errorData,
573 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
574 context.getKafkaProducer().postMessage(replyToTopic, error);
578 private void doConnectMode(
final CommandMessage message, String replyToTopic, Destination replyDestination) {
579 ConnectModeRequest request = (ConnectModeRequest) message.getData();
580 if (request.getMode() != null) {
581 logger.debug(
"Setting CONNECT MODE to '{}'", request.getMode());
583 logger.debug(
"Getting CONNECT MODE");
586 ISwitchManager switchManager = context.getSwitchManager();
587 ConnectModeRequest.Mode
result = switchManager.connectMode(request.getMode());
590 ConnectModeResponse response =
new ConnectModeResponse(
result);
591 InfoMessage infoMessage =
new InfoMessage(response,
592 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
593 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
597 private void doDumpRulesRequest(
final CommandMessage message, String replyToTopic) {
598 DumpRulesRequest request = (DumpRulesRequest) message.getData();
599 SwitchId switchId = request.getSwitchId();
600 logger.debug(
"Loading installed rules for switch {}", switchId);
602 List<OFFlowStatsEntry> flowEntries =
603 context.getSwitchManager().dumpFlowTable(DatapathId.of(switchId.toLong()));
604 List<FlowEntry> flows = flowEntries.stream()
606 .collect(Collectors.toList());
608 SwitchFlowEntries response = SwitchFlowEntries.builder()
612 InfoMessage infoMessage =
new InfoMessage(response, message.getTimestamp(),
613 message.getCorrelationId());
614 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
622 private void doBatchInstall(
final CommandMessage message)
throws FlowCommandException {
623 BatchInstallRequest request = (BatchInstallRequest) message.getData();
624 final String switchId = request.getSwitchId();
625 logger.debug(
"Processing flow commands for switch {}", switchId);
627 for (BaseInstallFlow
command : request.getFlowCommands()) {
628 logger.debug(
"Processing command for switch {} {}", switchId,
command);
630 if (
command instanceof InstallIngressFlow) {
631 installIngressFlow((InstallIngressFlow)
command);
632 }
else if (
command instanceof InstallEgressFlow) {
633 installEgressFlow((InstallEgressFlow)
command);
634 }
else if (
command instanceof InstallTransitFlow) {
635 installTransitFlow((InstallTransitFlow)
command);
636 }
else if (
command instanceof InstallOneSwitchFlow) {
637 installOneSwitchFlow((InstallOneSwitchFlow)
command);
639 throw new FlowCommandException(
command.getId(), ErrorType.REQUEST_INVALID,
640 "Unsupported command for batch install.");
642 }
catch (SwitchOperationException e) {
643 logger.error(
"Error during flow installation", e);
648 private void doPortsCommandDataRequest(CommandMessage message) {
650 PortsCommandData request = (PortsCommandData) message.getData();
651 Map<DatapathId, IOFSwitch> allSwitchMap = context.getSwitchManager().getAllSwitchMap();
652 for (Map.Entry<DatapathId, IOFSwitch> entry : allSwitchMap.entrySet()) {
653 SwitchId switchId =
new SwitchId(entry.getKey().toString());
655 IOFSwitch sw = entry.getValue();
656 Collection<OFPort> enabledPortNumbers = sw.getEnabledPortNumbers();
658 Set<PortStatus> portsStatus = sw.getPorts().stream()
659 .filter(
port -> SwitchEventCollector.isPhysicalPort(
port.getPortNo()))
660 .map(
port -> PortStatus.builder()
661 .id(
port.getPortNo().getPortNumber())
662 .
status(enabledPortNumbers.contains(
port.getPortNo())
663 ? PortChangeType.UP : PortChangeType.DOWN)
665 ).collect(Collectors.toSet());
666 SwitchPortStatusData response = SwitchPortStatusData.builder()
669 .requester(request.getRequester())
672 InfoMessage infoMessage =
new InfoMessage(response, message.getTimestamp(),
673 message.getCorrelationId());
674 context.getKafkaProducer().postMessage(context.getKafkaStatsTopic(), infoMessage);
675 }
catch (Exception e) {
676 logger.error(
"Could not get port stats data for switch {} with error {}",
677 switchId, e.getMessage(), e);
680 }
catch (Exception e) {
681 logger.error(
"Could not get port data for stats {}", e.getMessage(), e);
685 private void doFlowVerificationRequest(CommandContext context, UniFlowVerificationRequest request) {
686 VerificationDispatchCommand verification =
new VerificationDispatchCommand(context, request);
690 private void doDeleteMeter(CommandMessage message, String replyToTopic, Destination replyDestination) {
691 DeleteMeterRequest request = (DeleteMeterRequest) message.getData();
693 DatapathId dpid = DatapathId.of(request.getSwitchId().toLong());
694 long txId = context.getSwitchManager().deleteMeter(dpid, request.getMeterId());
696 DeleteMeterResponse response =
new DeleteMeterResponse(txId != 0L);
697 InfoMessage infoMessage =
new InfoMessage(response, System.currentTimeMillis(), message.getCorrelationId(),
699 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
700 }
catch (SwitchOperationException e) {
701 logger.info(
"Meter deletion is unsuccessful. Switch {} not found", request.getSwitchId());
702 ErrorData errorData =
703 new ErrorData(ErrorType.DATA_INVALID, e.getMessage(), request.getSwitchId().toString());
704 ErrorMessage error =
new ErrorMessage(errorData,
705 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
706 context.getKafkaProducer().postMessage(replyToTopic, error);
710 private void doConfigurePort(
final CommandMessage message,
final String replyToTopic,
711 final Destination replyDestination) {
712 PortConfigurationRequest request = (PortConfigurationRequest) message.getData();
714 logger.info(
"Port configuration request. Switch '{}', Port '{}'", request.getSwitchId(),
715 request.getPortNumber());
717 ISwitchManager switchManager = context.getSwitchManager();
719 DatapathId dpId = DatapathId.of(request.getSwitchId().toLong());
720 switchManager.configurePort(dpId, request.getPortNumber(), request.getAdminDown());
722 InfoMessage infoMessage =
new InfoMessage(
723 new PortConfigurationResponse(request.getSwitchId(), request.getPortNumber()),
724 message.getTimestamp(),
725 message.getCorrelationId());
726 context.getKafkaProducer().postMessage(replyToTopic, infoMessage);
727 }
catch (SwitchOperationException e) {
728 logger.error(
"Port configuration request failed. " + e.getMessage(), e);
729 ErrorData errorData =
new ErrorData(ErrorType.DATA_INVALID, e.getMessage(),
730 "Port configuration request failed");
731 ErrorMessage error =
new ErrorMessage(errorData,
732 System.currentTimeMillis(), message.getCorrelationId(), replyDestination);
733 context.getKafkaProducer().postMessage(replyToTopic, error);
737 private long allocateMeterId(Long meterId, SwitchId switchId, String flowId, Long cookie) {
740 if (meterId == null) {
741 logger.error(
"Meter_id should be passed within one switch flow command. Cookie is {}", cookie);
742 allocatedId = (long) meterPool.allocate(switchId, flowId);
743 logger.error(
"Allocated meter_id {} for cookie {}", allocatedId, cookie);
745 allocatedId = meterPool.allocate(switchId, flowId, Math.toIntExact(meterId));
750 private void parseRecord(ConsumerRecord<String, String> record) {
751 CommandMessage message;
753 String
value = record.value();
758 message = MAPPER.readValue(
value, CommandMessage.class);
759 }
catch (Exception exception) {
760 logger.error(
"error parsing record={}", record.value(), exception);
765 try (CorrelationContextClosable closable = CorrelationContext.create(message.getCorrelationId())) {
766 doControllerMsg(message);
767 }
catch (Exception exception) {
768 logger.error(
"error processing message={}", message, exception);
777 public static class Factory {
778 private final ConsumerContext context;
779 private final MeterPool meterPool =
new MeterPool();
781 public Factory(ConsumerContext context) {
782 this.context = context;
785 public RecordHandler produce(ConsumerRecord<String, String> record) {
786 return new RecordHandler(context, record, meterPool);
static final ObjectMapper MAPPER
static FlowEntry toFlowEntry(final OFFlowStatsEntry entry)
def command(payload, fields)