16 package org.openkilda.floodlight.statistics;
18 import static java.util.stream.Collectors.toList;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.Futures;
39 import net.floodlightcontroller.core.IFloodlightProviderService;
40 import net.floodlightcontroller.core.IOFSwitch;
41 import net.floodlightcontroller.core.internal.IOFSwitchService;
42 import net.floodlightcontroller.core.module.FloodlightModuleContext;
43 import net.floodlightcontroller.core.module.FloodlightModuleException;
44 import net.floodlightcontroller.core.module.IFloodlightModule;
45 import net.floodlightcontroller.core.module.IFloodlightService;
46 import net.floodlightcontroller.threadpool.IThreadPoolService;
47 import org.projectfloodlight.openflow.protocol.OFFactory;
48 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
49 import org.projectfloodlight.openflow.protocol.OFPortStatsProp;
50 import org.projectfloodlight.openflow.protocol.OFPortStatsPropEthernet;
51 import org.projectfloodlight.openflow.protocol.OFPortStatsRequest;
52 import org.projectfloodlight.openflow.protocol.OFStatsReply;
53 import org.projectfloodlight.openflow.protocol.OFVersion;
54 import org.projectfloodlight.openflow.types.OFGroup;
55 import org.projectfloodlight.openflow.types.OFPort;
56 import org.projectfloodlight.openflow.types.U64;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import java.util.ArrayList;
61 import java.util.Collection;
62 import java.util.Collections;
63 import java.util.List;
65 import java.util.concurrent.TimeUnit;
66 import java.util.function.Function;
72 private static final Logger logger = LoggerFactory.getLogger(
StatisticsService.class);
73 private static final U64 SYSTEM_MASK = U64.of(0x8000000000000000L);
74 private static final long OFPM_ALL = 0xffffffffL;
76 private IOFSwitchService switchService;
78 private IThreadPoolService threadPoolService;
80 private String statisticsTopic;
88 public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
94 Collection<Class<? extends IFloodlightService>> services =
new ArrayList<>(3);
95 services.add(IFloodlightProviderService.class);
96 services.add(IOFSwitchService.class);
97 services.add(IThreadPoolService.class);
102 public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
103 switchService = context.getServiceImpl(IOFSwitchService.class);
104 threadPoolService = context.getServiceImpl(IThreadPoolService.class);
116 public void startUp(FloodlightModuleContext context)
throws FloodlightModuleException {
118 threadPoolService.getScheduledExecutor().scheduleAtFixedRate(
119 () -> switchService.getAllSwitchMap().values().forEach(iofSwitch -> {
120 gatherPortStats(iofSwitch);
121 gatherFlowStats(iofSwitch);
122 }), interval, interval, TimeUnit.SECONDS);
127 private void gatherPortStats(IOFSwitch iofSwitch) {
128 OFFactory factory = iofSwitch.getOFFactory();
131 OFPortStatsRequest portStatsRequest = factory
132 .buildPortStatsRequest()
133 .setPortNo(OFPort.ANY)
136 logger.trace(
"Getting port stats for switch={}", iofSwitch.getId());
138 Futures.addCallback(iofSwitch.writeStatsRequest(portStatsRequest),
139 new RequestCallback<>(
data -> {
140 List<PortStatsReply> replies =
data.stream().map(reply -> {
141 List<PortStatsEntry> entries = reply.getEntries().stream()
143 if (entry.getVersion().compareTo(OFVersion.OF_13) > 0) {
144 long rxFrameErr = 0L;
147 long collisions = 0L;
149 for (OFPortStatsProp property : entry.getProperties()) {
150 if (property.getType() == 0x0) {
151 OFPortStatsPropEthernet etherProps =
152 (OFPortStatsPropEthernet) property;
153 rxFrameErr = etherProps.getRxFrameErr().getValue();
154 rxOverErr = etherProps.getRxOverErr().getValue();
155 rxCrcErr = etherProps.getRxCrcErr().getValue();
156 collisions = etherProps.getCollisions().getLength();
160 return new PortStatsEntry(
161 entry.getPortNo().getPortNumber(),
162 entry.getRxPackets().getValue(),
163 entry.getTxPackets().getValue(),
164 entry.getRxBytes().getValue(),
165 entry.getTxBytes().getValue(),
166 entry.getRxDropped().getValue(),
167 entry.getTxDropped().getValue(),
168 entry.getRxErrors().getValue(),
169 entry.getTxErrors().getValue(),
175 return new PortStatsEntry(
176 entry.getPortNo().getPortNumber(),
177 entry.getRxPackets().getValue(),
178 entry.getTxPackets().getValue(),
179 entry.getRxBytes().getValue(),
180 entry.getTxBytes().getValue(),
181 entry.getRxDropped().getValue(),
182 entry.getTxDropped().getValue(),
183 entry.getRxErrors().getValue(),
184 entry.getTxErrors().getValue(),
185 entry.getRxFrameErr().getValue(),
186 entry.getRxOverErr().getValue(),
187 entry.getRxCrcErr().getValue(),
188 entry.getCollisions().getValue());
192 return new PortStatsReply(reply.getXid(), entries);
193 }).collect(toList());
194 return new PortStatsData(switchId, replies);
195 },
"port", CorrelationContext.getId()));
198 @NewCorrelationContextRequired
199 private void gatherFlowStats(IOFSwitch iofSwitch) {
200 OFFactory factory = iofSwitch.getOFFactory();
201 final SwitchId switchId =
new SwitchId(iofSwitch.getId().toString());
203 OFFlowStatsRequest flowStatsRequest = factory
204 .buildFlowStatsRequest()
205 .setOutGroup(OFGroup.ANY)
206 .setCookieMask(SYSTEM_MASK)
209 if (factory.getVersion().compareTo(OFVersion.OF_15) != 0) {
211 logger.trace(
"Getting flow stats for switch={}", iofSwitch.getId());
213 Futures.addCallback(iofSwitch.writeStatsRequest(flowStatsRequest),
214 new RequestCallback<>(
data -> {
215 List<FlowStatsReply> replies =
data.stream().map(reply -> {
216 List<FlowStatsEntry> entries = reply.getEntries().stream()
217 .map(entry ->
new FlowStatsEntry(
218 entry.getTableId().getValue(),
219 entry.getCookie().getValue(),
220 entry.getPacketCount().getValue(),
221 entry.getByteCount().getValue()))
223 return new FlowStatsReply(reply.getXid(), entries);
224 }).collect(toList());
225 return new FlowStatsData(switchId, replies);
226 },
"flow", CorrelationContext.getId()));
230 private class RequestCallback<T
extends OFStatsReply> implements FutureCallback<List<T>> {
231 private Function<List<T>, InfoData> transform;
233 private final String correlationId;
235 RequestCallback(Function<List<T>, InfoData> transform, String
type, String correlationId) {
236 this.transform = transform;
238 this.correlationId = correlationId;
242 public void onSuccess(List<T>
data) {
244 try (CorrelationContextClosable closable = CorrelationContext.create(correlationId)) {
246 InfoMessage infoMessage =
new InfoMessage(transform.apply(
data),
247 System.currentTimeMillis(), correlationId, Destination.WFM_STATS);
248 kafkaProducer.
postMessage(statisticsTopic, infoMessage);
253 public void onFailure(Throwable throwable) {
255 try (CorrelationContextClosable closable = CorrelationContext.create(correlationId)) {
257 logger.error(
"Exception reading {} stats",
type, throwable);
void startUp(FloodlightModuleContext context)
void init(FloodlightModuleContext context)
Collection< Class<? extends IFloodlightService > > getModuleDependencies()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
Collection< Class<? extends IFloodlightService > > getModuleServices()
Map< Class<? extends IFloodlightService >, IFloodlightService > getServiceImpls()
void postMessage(final String topic, final Message message)