Open Kilda Java Documentation
StatisticsService.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.statistics;
17 
18 import static java.util.stream.Collectors.toList;
19 
36 
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.Futures;
39 import net.floodlightcontroller.core.IFloodlightProviderService;
40 import net.floodlightcontroller.core.IOFSwitch;
41 import net.floodlightcontroller.core.internal.IOFSwitchService;
42 import net.floodlightcontroller.core.module.FloodlightModuleContext;
43 import net.floodlightcontroller.core.module.FloodlightModuleException;
44 import net.floodlightcontroller.core.module.IFloodlightModule;
45 import net.floodlightcontroller.core.module.IFloodlightService;
46 import net.floodlightcontroller.threadpool.IThreadPoolService;
47 import org.projectfloodlight.openflow.protocol.OFFactory;
48 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
49 import org.projectfloodlight.openflow.protocol.OFPortStatsProp;
50 import org.projectfloodlight.openflow.protocol.OFPortStatsPropEthernet;
51 import org.projectfloodlight.openflow.protocol.OFPortStatsRequest;
52 import org.projectfloodlight.openflow.protocol.OFStatsReply;
53 import org.projectfloodlight.openflow.protocol.OFVersion;
54 import org.projectfloodlight.openflow.types.OFGroup;
55 import org.projectfloodlight.openflow.types.OFPort;
56 import org.projectfloodlight.openflow.types.U64;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 
60 import java.util.ArrayList;
61 import java.util.Collection;
62 import java.util.Collections;
63 import java.util.List;
64 import java.util.Map;
65 import java.util.concurrent.TimeUnit;
66 import java.util.function.Function;
67 
71 public class StatisticsService implements IStatisticsService, IFloodlightModule {
72  private static final Logger logger = LoggerFactory.getLogger(StatisticsService.class);
73  private static final U64 SYSTEM_MASK = U64.of(0x8000000000000000L);
74  private static final long OFPM_ALL = 0xffffffffL;
75 
76  private IOFSwitchService switchService;
77  private KafkaMessageProducer kafkaProducer;
78  private IThreadPoolService threadPoolService;
79  private int interval;
80  private String statisticsTopic;
81 
82  @Override
83  public Collection<Class<? extends IFloodlightService>> getModuleServices() {
84  return Collections.singletonList(IStatisticsService.class);
85  }
86 
87  @Override
88  public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
89  return Collections.singletonMap(IStatisticsService.class, this);
90  }
91 
92  @Override
93  public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
94  Collection<Class<? extends IFloodlightService>> services = new ArrayList<>(3);
95  services.add(IFloodlightProviderService.class);
96  services.add(IOFSwitchService.class);
97  services.add(IThreadPoolService.class);
98  return services;
99  }
100 
101  @Override
102  public void init(FloodlightModuleContext context) throws FloodlightModuleException {
103  switchService = context.getServiceImpl(IOFSwitchService.class);
104  threadPoolService = context.getServiceImpl(IThreadPoolService.class);
105  kafkaProducer = context.getServiceImpl(KafkaMessageProducer.class);
106 
107  ConfigurationProvider provider = ConfigurationProvider.of(context, this);
108  KafkaTopicsConfig topicsConfig = provider.getConfiguration(KafkaTopicsConfig.class);
109  statisticsTopic = topicsConfig.getStatsTopic();
110 
111  StatisticsServiceConfig serviceConfig = provider.getConfiguration(StatisticsServiceConfig.class);
112  interval = serviceConfig.getInterval();
113  }
114 
115  @Override
116  public void startUp(FloodlightModuleContext context) throws FloodlightModuleException {
117  if (interval > 0) {
118  threadPoolService.getScheduledExecutor().scheduleAtFixedRate(
119  () -> switchService.getAllSwitchMap().values().forEach(iofSwitch -> {
120  gatherPortStats(iofSwitch);
121  gatherFlowStats(iofSwitch);
122  }), interval, interval, TimeUnit.SECONDS);
123  }
124  }
125 
127  private void gatherPortStats(IOFSwitch iofSwitch) {
128  OFFactory factory = iofSwitch.getOFFactory();
129  SwitchId switchId = new SwitchId(iofSwitch.getId().toString());
130 
131  OFPortStatsRequest portStatsRequest = factory
132  .buildPortStatsRequest()
133  .setPortNo(OFPort.ANY)
134  .build();
135 
136  logger.trace("Getting port stats for switch={}", iofSwitch.getId());
137 
138  Futures.addCallback(iofSwitch.writeStatsRequest(portStatsRequest),
139  new RequestCallback<>(data -> {
140  List<PortStatsReply> replies = data.stream().map(reply -> {
141  List<PortStatsEntry> entries = reply.getEntries().stream()
142  .map(entry -> {
143  if (entry.getVersion().compareTo(OFVersion.OF_13) > 0) {
144  long rxFrameErr = 0L;
145  long rxOverErr = 0L;
146  long rxCrcErr = 0L;
147  long collisions = 0L;
148 
149  for (OFPortStatsProp property : entry.getProperties()) {
150  if (property.getType() == 0x0) {
151  OFPortStatsPropEthernet etherProps =
152  (OFPortStatsPropEthernet) property;
153  rxFrameErr = etherProps.getRxFrameErr().getValue();
154  rxOverErr = etherProps.getRxOverErr().getValue();
155  rxCrcErr = etherProps.getRxCrcErr().getValue();
156  collisions = etherProps.getCollisions().getLength();
157  }
158  }
159 
160  return new PortStatsEntry(
161  entry.getPortNo().getPortNumber(),
162  entry.getRxPackets().getValue(),
163  entry.getTxPackets().getValue(),
164  entry.getRxBytes().getValue(),
165  entry.getTxBytes().getValue(),
166  entry.getRxDropped().getValue(),
167  entry.getTxDropped().getValue(),
168  entry.getRxErrors().getValue(),
169  entry.getTxErrors().getValue(),
170  rxFrameErr,
171  rxOverErr,
172  rxCrcErr,
173  collisions);
174  } else {
175  return new PortStatsEntry(
176  entry.getPortNo().getPortNumber(),
177  entry.getRxPackets().getValue(),
178  entry.getTxPackets().getValue(),
179  entry.getRxBytes().getValue(),
180  entry.getTxBytes().getValue(),
181  entry.getRxDropped().getValue(),
182  entry.getTxDropped().getValue(),
183  entry.getRxErrors().getValue(),
184  entry.getTxErrors().getValue(),
185  entry.getRxFrameErr().getValue(),
186  entry.getRxOverErr().getValue(),
187  entry.getRxCrcErr().getValue(),
188  entry.getCollisions().getValue());
189  }
190  })
191  .collect(toList());
192  return new PortStatsReply(reply.getXid(), entries);
193  }).collect(toList());
194  return new PortStatsData(switchId, replies);
195  }, "port", CorrelationContext.getId()));
196  }
197 
198  @NewCorrelationContextRequired
199  private void gatherFlowStats(IOFSwitch iofSwitch) {
200  OFFactory factory = iofSwitch.getOFFactory();
201  final SwitchId switchId = new SwitchId(iofSwitch.getId().toString());
202 
203  OFFlowStatsRequest flowStatsRequest = factory
204  .buildFlowStatsRequest()
205  .setOutGroup(OFGroup.ANY)
206  .setCookieMask(SYSTEM_MASK)
207  .build();
208 
209  if (factory.getVersion().compareTo(OFVersion.OF_15) != 0) {
210  // skip flow stats for OF 1.5 protocol version
211  logger.trace("Getting flow stats for switch={}", iofSwitch.getId());
212 
213  Futures.addCallback(iofSwitch.writeStatsRequest(flowStatsRequest),
214  new RequestCallback<>(data -> {
215  List<FlowStatsReply> replies = data.stream().map(reply -> {
216  List<FlowStatsEntry> entries = reply.getEntries().stream()
217  .map(entry -> new FlowStatsEntry(
218  entry.getTableId().getValue(),
219  entry.getCookie().getValue(),
220  entry.getPacketCount().getValue(),
221  entry.getByteCount().getValue()))
222  .collect(toList());
223  return new FlowStatsReply(reply.getXid(), entries);
224  }).collect(toList());
225  return new FlowStatsData(switchId, replies);
226  }, "flow", CorrelationContext.getId()));
227  }
228  }
229 
230  private class RequestCallback<T extends OFStatsReply> implements FutureCallback<List<T>> {
231  private Function<List<T>, InfoData> transform;
232  private String type;
233  private final String correlationId;
234 
235  RequestCallback(Function<List<T>, InfoData> transform, String type, String correlationId) {
236  this.transform = transform;
237  this.type = type;
238  this.correlationId = correlationId;
239  }
240 
241  @Override
242  public void onSuccess(List<T> data) {
243  // Restore the correlation context used for the request.
244  try (CorrelationContextClosable closable = CorrelationContext.create(correlationId)) {
245 
246  InfoMessage infoMessage = new InfoMessage(transform.apply(data),
247  System.currentTimeMillis(), correlationId, Destination.WFM_STATS);
248  kafkaProducer.postMessage(statisticsTopic, infoMessage);
249  }
250  }
251 
252  @Override
253  public void onFailure(Throwable throwable) {
254  // Restore the correlation context used for the request.
255  try (CorrelationContextClosable closable = CorrelationContext.create(correlationId)) {
256 
257  logger.error("Exception reading {} stats", type, throwable);
258  }
259  }
260  }
261 }
Collection< Class<? extends IFloodlightService > > getModuleDependencies()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
Collection< Class<? extends IFloodlightService > > getModuleServices()
Map< Class<? extends IFloodlightService >, IFloodlightService > getServiceImpls()
void postMessage(final String topic, final Message message)
net
Definition: plan-b.py:46