16 package org.openkilda.floodlight.pathverification;
33 import com.auth0.jwt.JWT;
34 import com.auth0.jwt.JWTVerifier;
35 import com.auth0.jwt.algorithms.Algorithm;
36 import com.auth0.jwt.exceptions.JWTVerificationException;
37 import com.auth0.jwt.interfaces.DecodedJWT;
38 import com.fasterxml.jackson.core.JsonProcessingException;
39 import com.google.common.annotations.VisibleForTesting;
40 import net.floodlightcontroller.core.FloodlightContext;
41 import net.floodlightcontroller.core.IFloodlightProviderService;
42 import net.floodlightcontroller.core.IListener;
43 import net.floodlightcontroller.core.IOFMessageListener;
44 import net.floodlightcontroller.core.IOFSwitch;
45 import net.floodlightcontroller.core.internal.IOFSwitchService;
46 import net.floodlightcontroller.core.module.FloodlightModuleContext;
47 import net.floodlightcontroller.core.module.FloodlightModuleException;
48 import net.floodlightcontroller.core.module.IFloodlightModule;
49 import net.floodlightcontroller.core.module.IFloodlightService;
50 import net.floodlightcontroller.packet.Data;
51 import net.floodlightcontroller.packet.Ethernet;
52 import net.floodlightcontroller.packet.IPv4;
53 import net.floodlightcontroller.packet.LLDPTLV;
54 import net.floodlightcontroller.packet.UDP;
55 import net.floodlightcontroller.restserver.IRestApiService;
56 import net.floodlightcontroller.util.OFMessageUtils;
57 import org.apache.commons.codec.binary.Hex;
58 import org.apache.kafka.clients.producer.KafkaProducer;
59 import org.apache.kafka.clients.producer.ProducerRecord;
60 import org.projectfloodlight.openflow.protocol.OFMessage;
61 import org.projectfloodlight.openflow.protocol.OFPacketIn;
62 import org.projectfloodlight.openflow.protocol.OFPacketOut;
63 import org.projectfloodlight.openflow.protocol.OFPortDesc;
64 import org.projectfloodlight.openflow.protocol.OFPortDescProp;
65 import org.projectfloodlight.openflow.protocol.OFPortDescPropEthernet;
66 import org.projectfloodlight.openflow.protocol.OFType;
67 import org.projectfloodlight.openflow.protocol.OFVersion;
68 import org.projectfloodlight.openflow.protocol.action.OFAction;
69 import org.projectfloodlight.openflow.protocol.match.MatchField;
70 import org.projectfloodlight.openflow.types.DatapathId;
71 import org.projectfloodlight.openflow.types.EthType;
72 import org.projectfloodlight.openflow.types.IPv4Address;
73 import org.projectfloodlight.openflow.types.IpProtocol;
74 import org.projectfloodlight.openflow.types.MacAddress;
75 import org.projectfloodlight.openflow.types.OFBufferId;
76 import org.projectfloodlight.openflow.types.OFPort;
77 import org.projectfloodlight.openflow.types.TransportPort;
78 import org.projectfloodlight.openflow.types.U64;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 import java.io.UnsupportedEncodingException;
83 import java.net.InetSocketAddress;
84 import java.nio.ByteBuffer;
85 import java.nio.charset.Charset;
86 import java.util.ArrayList;
87 import java.util.Arrays;
88 import java.util.Collection;
89 import java.util.HashMap;
90 import java.util.List;
95 private static final Logger logIsl = LoggerFactory.getLogger(
102 private String topoDiscoTopic;
103 private IFloodlightProviderService floodlightProvider;
104 private IOFSwitchService switchService;
105 private IRestApiService restApiService;
106 private boolean isAlive =
false;
107 private KafkaProducer<String, String> producer;
108 private double islBandwidthQuotient = 1.0;
109 private Algorithm algorithm;
110 private JWTVerifier verifier;
117 Collection<Class<? extends IFloodlightService>> services =
new ArrayList<>(3);
118 services.add(IFloodlightProviderService.class);
119 services.add(IOFSwitchService.class);
120 services.add(IRestApiService.class);
126 Collection<Class<? extends IFloodlightService>> services =
new ArrayList<>();
132 public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
133 Map<Class<? extends IFloodlightService>, IFloodlightService> map =
new HashMap<>();
139 public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
140 logger.debug(
"main pathverification service: " +
this);
145 initConfiguration(topicsConfig, serviceConfig);
147 initServices(context);
154 throws FloodlightModuleException {
155 topoDiscoTopic = topicsConfig.getTopoDiscoTopic();
156 islBandwidthQuotient = serviceConfig.getIslBandwidthQuotient();
158 initAlgorithm(serviceConfig.getHmac256Secret());
162 void initServices(FloodlightModuleContext context) {
163 floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
164 switchService = context.getServiceImpl(IOFSwitchService.class);
165 restApiService = context.getServiceImpl(IRestApiService.class);
169 void initAlgorithm(String secret)
throws FloodlightModuleException {
171 algorithm = Algorithm.HMAC256(secret);
172 verifier = JWT.require(algorithm).build();
173 }
catch (UnsupportedEncodingException e) {
174 logger.error(
"Ivalid secret", e);
175 throw new FloodlightModuleException(
"Invalid secret for HMAC256");
180 void setKafkaProducer(KafkaProducer<String, String> mockProducer) {
185 public void startUp(FloodlightModuleContext context)
throws FloodlightModuleException {
187 floodlightProvider.addOFMessageListener(OFType.PACKET_IN,
this);
212 public Command
receive(IOFSwitch sw, OFMessage msg, FloodlightContext context) {
213 logger.debug(
"PathVerificationService received new message of type {}: {}", msg.getType(), msg.toString());
214 switch (msg.getType()) {
216 return handlePacketIn(sw, (OFPacketIn) msg, context);
220 return Command.CONTINUE;
233 List<OFAction> actions =
new ArrayList<>();
234 actions.add(sw.getOFFactory().actions().buildOutput().setPort(
port).build());
248 IOFSwitch srcSwitch = switchService.getSwitch(srcSwId);
249 if (srcSwitch != null && srcSwitch.getPort(
port) != null) {
250 IOFSwitch dstSwitch = (dstSwId == null) ? null : switchService.getSwitch(dstSwId);
253 if (ofPacketOut != null) {
254 logger.debug(
"==> Sending verification packet out {}/{}: {}", srcSwitch.getId().toString(),
255 port.getPortNumber(),
256 Hex.encodeHexString(ofPacketOut.getData()));
257 result = srcSwitch.write(ofPacketOut);
259 logger.error(
"<== Received null from generateVerificationPacket, inputs where: " 260 +
"srcSwitch: {}, port: {}, dstSwitch: {}", srcSwitch,
port, dstSwitch);
264 logIsl.info(
"push discovery package via: {}-{}", srcSwitch.getId(),
port.getPortNumber());
267 "Failed to send PACKET_OUT(ISL discovery packet) via {}-{}",
268 srcSwitch.getId(),
port.getPortNumber());
271 }
catch (Exception exception) {
272 logger.error(String.format(
"Unhandled exception in %s", getClass().
getName()), exception);
294 byte[] dpidArray =
new byte[8];
295 ByteBuffer dpidBb = ByteBuffer.wrap(dpidArray);
297 DatapathId dpid = srcSw.getId();
298 dpidBb.putLong(dpid.getLong());
299 byte[] chassisId =
new byte[]{4, 0, 0, 0, 0, 0, 0};
300 System.arraycopy(dpidArray, 2, chassisId, 1, 6);
302 byte[] dpidTlvValue =
new byte[]{0x0, 0x26, (byte) 0xe1, 0, 0, 0, 0, 0, 0, 0, 0, 0};
303 System.arraycopy(dpidArray, 0, dpidTlvValue, 4, 8);
305 OFPortDesc ofPortDesc = srcSw.getPort(
port);
306 byte[] zeroMac = {0, 0, 0, 0, 0, 0};
307 byte[] srcMac = ofPortDesc.getHwAddr().getBytes();
308 if (Arrays.equals(srcMac, zeroMac)) {
309 int portVal = ofPortDesc.getPortNo().getPortNumber();
311 logger.debug(
"Port {}/{} has zero hardware address: overwrite with lower 6 bytes of dpid",
312 dpid.toString(), portVal);
313 System.arraycopy(dpidArray, 2, srcMac, 0, 6);
316 byte[] portId =
new byte[]{2, 0, 0};
317 ByteBuffer portBb = ByteBuffer.wrap(portId, 1, 2);
318 portBb.putShort(
port.getShortPortNumber());
322 new LLDPTLV().setType((byte) 1).setLength((
short) chassisId.length).setValue(chassisId));
324 vp.
setPortId(
new LLDPTLV().setType((byte) 2).setLength((
short) portId.length).setValue(portId));
326 byte[] ttlValue =
new byte[]{0, 0x78};
328 new LLDPTLV().setType((byte) 3).setLength((
short) ttlValue.length).setValue(ttlValue));
330 LLDPTLV dpidTlv =
new LLDPTLV().setType((byte) 127).setLength((
short) dpidTlvValue.length)
331 .setValue(dpidTlvValue);
337 long time = System.currentTimeMillis();
338 long swLatency = srcSw.getLatency().getValue();
339 byte[] timestampTlvValue = ByteBuffer.allocate(Long.SIZE / 8 + 4).put((byte) 0x00)
340 .put((byte) 0x26).put((byte) 0xe1)
342 .putLong(time + swLatency )
345 LLDPTLV timestampTlv =
new LLDPTLV().setType((byte) 127)
346 .setLength((
short) timestampTlvValue.length).setValue(timestampTlvValue);
351 byte[] typeTlvValue = ByteBuffer.allocate(Integer.SIZE / 8 + 4).put((byte) 0x00)
352 .put((byte) 0x26).put((byte) 0xe1)
355 LLDPTLV typeTlv =
new LLDPTLV().setType((byte) 127)
356 .setLength((
short) typeTlvValue.length).setValue(typeTlvValue);
360 String token = JWT.create()
361 .withClaim(
"dpid", dpid.getLong())
362 .withClaim(
"ts", time + swLatency)
365 byte[] tokenBytes = token.getBytes(Charset.forName(
"UTF-8"));
367 byte[] tokenTlvValue = ByteBuffer.allocate(4 + tokenBytes.length).put((byte) 0x00)
368 .put((byte) 0x26).put((byte) 0xe1)
370 .put(tokenBytes).array();
371 LLDPTLV tokenTlv =
new LLDPTLV().setType((byte) 127)
372 .setLength((
short) tokenTlvValue.length).setValue(tokenTlvValue);
379 OFPortDesc sw2OfPortDesc = dstSw.getPort(
port);
380 dstMac = sw2OfPortDesc.getHwAddr();
386 .of(((InetSocketAddress) dstSw.getInetAddress()).getAddress().getAddress());
390 IPv4Address.of(((InetSocketAddress) srcSw.getInetAddress()).getAddress().getAddress()))
391 .setDestinationAddress(dstIp).setTtl((byte) 64).setProtocol(IpProtocol.UDP);
398 Ethernet l2 =
new Ethernet().setSourceMACAddress(MacAddress.of(srcMac))
399 .setDestinationMACAddress(dstMac).setEtherType(EthType.IPv4);
404 byte[]
data = l2.serialize();
405 OFPacketOut.Builder pob = srcSw.getOFFactory().buildPacketOut()
408 OFMessageUtils.setInPort(pob, OFPort.CONTROLLER);
411 }
catch (Exception exception) {
412 logger.error(
"error generating verification packet: {}", exception);
418 if (eth.getPayload() instanceof IPv4) {
419 IPv4 ip = (IPv4) eth.getPayload();
421 if (ip.getPayload() instanceof UDP) {
422 UDP udp = (UDP) ip.getPayload();
425 && (udp.getDestinationPort()
432 throw new Exception(
"Ethernet packet was not a verification packet: " + eth);
435 private IListener.Command handlePacketIn(IOFSwitch sw, OFPacketIn pkt, FloodlightContext context) {
436 long time = System.currentTimeMillis();
437 logger.debug(
"packet_in {} received from {}", pkt.getXid(), sw.getId());
439 VerificationPacket verificationPacket = null;
441 Ethernet eth = IFloodlightProviderService.bcStore.get(context, IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
444 verificationPacket = deserialize(eth);
445 }
catch (Exception exception) {
446 logger.trace(
"Deserialization failure: {}, exception: {}", exception.getMessage(), exception);
447 return Command.CONTINUE;
451 ByteBuffer portBb = ByteBuffer.wrap(verificationPacket.getPortId().getValue());
455 int pathOrdinal = 10;
456 IOFSwitch remoteSwitch = null;
457 boolean signed =
false;
458 for (LLDPTLV lldptlv : verificationPacket.getOptionalTLVList()) {
459 if (lldptlv.getType() == 127 && lldptlv.getLength() == 12
460 && lldptlv.getValue()[0] == 0x0
461 && lldptlv.getValue()[1] == 0x26
462 && lldptlv.getValue()[2] == (byte) 0xe1
463 && lldptlv.getValue()[3] == 0x0) {
464 ByteBuffer dpidBb = ByteBuffer.wrap(lldptlv.getValue());
465 remoteSwitch = switchService.getSwitch(DatapathId.of(dpidBb.getLong(4)));
466 }
else if (lldptlv.getType() == 127 && lldptlv.getLength() == 12
467 && lldptlv.getValue()[0] == 0x0
468 && lldptlv.getValue()[1] == 0x26
469 && lldptlv.getValue()[2] == (byte) 0xe1
470 && lldptlv.getValue()[3] == 0x01) {
471 ByteBuffer tsBb = ByteBuffer.wrap(lldptlv.getValue());
472 long swLatency = sw.getLatency().getValue();
473 timestamp = tsBb.getLong(4);
474 timestamp = timestamp + swLatency;
475 }
else if (lldptlv.getType() == 127 && lldptlv.getLength() == 8
476 && lldptlv.getValue()[0] == 0x0
477 && lldptlv.getValue()[1] == 0x26
478 && lldptlv.getValue()[2] == (byte) 0xe1
479 && lldptlv.getValue()[3] == 0x02) {
480 ByteBuffer typeBb = ByteBuffer.wrap(lldptlv.getValue());
481 pathOrdinal = typeBb.getInt(4);
482 }
else if (lldptlv.getType() == 127
483 && lldptlv.getValue()[0] == 0x0
484 && lldptlv.getValue()[1] == 0x26
485 && lldptlv.getValue()[2] == (byte) 0xe1
486 && lldptlv.getValue()[3] == 0x03) {
487 ByteBuffer bb = ByteBuffer.wrap(lldptlv.getValue());
489 byte[] tokenArray =
new byte[lldptlv.getLength() - 4];
490 bb.get(tokenArray, 0, tokenArray.length);
491 String token =
new String(tokenArray);
494 DecodedJWT jwt = verifier.verify(token);
496 }
catch (JWTVerificationException e) {
497 logger.error(
"Packet verification failed", e);
509 if (remoteSwitch == null) {
514 logger.warn(
"verification packet without sign");
518 U64 latency = (timestamp != 0 && (time - timestamp) > 0) ? U64.of(time - timestamp) : U64.ZERO;
520 OFPort inPort = pkt.getVersion().compareTo(OFVersion.OF_12) < 0 ? pkt.getInPort()
521 : pkt.getMatch().get(MatchField.IN_PORT);
522 OFPort remotePort = OFPort.of(portBb.getShort());
523 logIsl.info(
"link discovered: {}-{} ===( {} ms )===> {}-{}",
524 remoteSwitch.getId(), remotePort, latency.getValue(), sw.getId(), inPort);
528 List<PathNode>
nodes = Arrays.asList(
529 new PathNode(
new SwitchId(remoteSwitch.getId().toString()), remotePort.getPortNumber(), 0,
531 new PathNode(
new SwitchId(sw.getId().toString()), inPort.getPortNumber(), 1));
533 OFPortDesc
port = sw.getPort(inPort);
534 long speed = Integer.MAX_VALUE;
536 if (
port.getVersion().compareTo(OFVersion.OF_13) > 0) {
537 for (OFPortDescProp prop :
port.getProperties()) {
538 if (prop.getType() == 0x0) {
539 speed = ((OFPortDescPropEthernet) prop).getCurrSpeed();
543 speed =
port.getCurrSpeed();
546 IslInfoData
path =
new IslInfoData(latency.getValue(),
nodes, speed, IslChangeType.DISCOVERED,
547 getAvailableBandwidth(speed));
549 Message message =
new InfoMessage(
path, System.currentTimeMillis(), CorrelationContext.getId(), null);
551 final String json = MAPPER.writeValueAsString(message);
552 logger.debug(
"about to send {}", json);
553 producer.send(
new ProducerRecord<>(topoDiscoTopic, json));
554 logger.debug(
"packet_in processed for {}-{}", sw.getId(), inPort);
556 }
catch (JsonProcessingException exception) {
557 logger.error(
"could not create json for path packet_in: {}", exception.getMessage(), exception);
558 }
catch (UnsupportedOperationException exception) {
559 logger.error(
"could not parse packet_in message: {}", exception.getMessage(),
561 }
catch (Exception exception) {
562 logger.error(
"unknown error during packet_in message processing: {}", exception.getMessage(), exception);
569 private long getAvailableBandwidth(
long speed) {
570 return (
long) (speed * islBandwidthQuotient);
Map< Class<? extends IFloodlightService >, IFloodlightService > getServiceImpls()
void init(FloodlightModuleContext context)
static final ObjectMapper MAPPER
static final int VERIFICATION_PACKET_UDP_PORT
List< OFAction > getDiscoveryActions(IOFSwitch sw, OFPort port)
VerificationPacket setTtl(LLDPTLV ttl)
boolean isCallbackOrderingPostreq(OFType arg0, String arg1)
VerificationPacket setPortId(LLDPTLV portId)
VerificationPacket setChassisId(LLDPTLV chassisId)
Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext context)
Collection< Class<? extends IFloodlightService > > getModuleDependencies()
boolean isCallbackOrderingPrereq(OFType arg0, String arg1)
default Properties createKafkaProducerProperties()
OFPacketOut generateVerificationPacket(IOFSwitch srcSw, OFPort port, IOFSwitch dstSw, boolean sign)
List< LLDPTLV > getOptionalTLVList()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
OFPacketOut generateVerificationPacket(IOFSwitch srcSw, OFPort port)
static final String VERIFICATION_PACKET_IP_DST
void startUp(FloodlightModuleContext context)
boolean sendDiscoveryMessage(DatapathId srcSwId, OFPort port)
Collection< Class<? extends IFloodlightService > > getModuleServices()
boolean sendDiscoveryMessage(DatapathId srcSwId, OFPort port, DatapathId dstSwId)
static final String VERIFICATION_BCAST_PACKET_DST