Open Kilda Java Documentation
StatsIntegrationService.java
Go to the documentation of this file.
1 package org.openkilda.integration.service;
2 
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.http.HttpMethod;
8 import org.springframework.stereotype.Service;
9 
10 import com.fasterxml.jackson.core.JsonProcessingException;
11 
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.regex.Matcher;
18 import java.util.regex.Pattern;
19 
20 import org.apache.http.HttpResponse;
33 
39 @Service
41 
42  private static final Logger LOGGER = LoggerFactory.getLogger(StatsIntegrationService.class);
43 
44 
45  @Autowired
46  private RestClientManager restClientManager;
47 
48  @Autowired
49  private ApplicationProperties applicationProperties;
50 
51  public String getStats(final String startDate, final String endDate, final String downsample,
52  final List<String> switchId, final String port, final String flowId,
53  final String srcSwitch, final String srcPort, final String dstSwitch,
54  final String dstPort, final StatsType statsType, final String metric,
55  final String direction) throws IntegrationException {
56 
57  LOGGER.info("Inside getStats: switchId: " + switchId);
58  try {
59  String payload = getOpenTsdbRequestBody(startDate, endDate, downsample, switchId, port,
60  flowId, srcSwitch, srcPort, dstSwitch, dstPort, statsType, metric, direction);
61 
62  LOGGER.info("Inside getStats: startDate: " + startDate + ": endDate: " + endDate
63  + ": payload: " + payload);
64 
65  HttpResponse response =
66  restClientManager.invoke(applicationProperties.getOpenTsdbQuery(),
67  HttpMethod.POST, payload, "application/json", "");
68  if (RestClientManager.isValidResponse(response)) {
69  return IoUtil.toString(response.getEntity().getContent());
70  }
71  } catch (IOException ex) {
72  LOGGER.error("Inside getStats Exception is: " + ex.getMessage());
73  throw new IntegrationException(ex);
74  }
75  return null;
76  }
77 
78  private String populateFiltersAndReturnDownsample(final List<Filter> filters,
79  final Map<String, String[]> params,final Integer index, final StatsType statsType) {
80  String downsample = "";
81  if (params != null) {
82  for (Map.Entry<String, String[]> param : params.entrySet()) {
83  if (param.getKey().equalsIgnoreCase("averageOf")) {
84  downsample = param.getValue().toString();
85  } else if (param.getValue() != null) {
86  Filter filter = new Filter();
87  filter.setGroupBy(Boolean.valueOf(OpenTsDB.GROUP_BY));
88  if (statsType.equals(StatsType.SWITCH_PORT) && param.getKey().equals("port")) {
89  filter.setType(OpenTsDB.TYPE_WILDCARD);
90  } else {
91  filter.setType(OpenTsDB.TYPE);
92  }
93  filter.setTagk(param.getKey());
94  if(index == 0 && param.getKey().equals("direction")) {
95  filter.setFilter("forward");
96  } else if(index == 1 && param.getKey().equals("direction")) {
97  filter.setFilter("reverse");
98  } else {
99  filter.setFilter(param.getValue()[0]);
100  }
101  filters.add(filter);
102  }
103  }
104  }
105  return downsample;
106  }
107 
108  private Query getQuery(final String downsample, final String metric,
109  final Map<String, String[]> params, final Integer index,final StatsType statsType) {
110  List<Filter> filters = new ArrayList<Filter>();
111  String paramDownSample = "";
112  if (params != null) {
113  paramDownSample = populateFiltersAndReturnDownsample(filters, params, index, statsType);
114  }
115 
116  if (!StringUtil.isNullOrEmpty(downsample)) {
117  paramDownSample = downsample + "-avg";
118  } else if (!StringUtil.isNullOrEmpty(paramDownSample)) {
119  paramDownSample = paramDownSample + "-avg";
120  }
121  Query query = new Query();
122  query.setAggregator(OpenTsDB.AGGREGATOR);
123  if (!statsType.equals(StatsType.ISL)) {
124  query.setRate(Boolean.valueOf(OpenTsDB.RATE));
125  }
126  if (statsType.equals(StatsType.SWITCH_PORT)
127  && Metrics.PEN_SWITCH_STATE.getDisplayTag().equals(metric)) {
128  query.setRate(false);
129  } else {
130  if(validateDownSample(paramDownSample, query)) {
131  query.setDownsample(paramDownSample);
132  }
133  }
134  query.setMetric(metric);
135  query.setFilters(filters);
136  return query;
137  }
138 
139  private boolean validateDownSample(String paramDownSample, final Query query) {
140  boolean isValidDownsample = false;
141  paramDownSample = paramDownSample.replaceFirst("^0+(?!$)", "");
142  if(Character.isDigit(paramDownSample.charAt(0))) {
143  String[] downSampleArr = paramDownSample.split("-");
144  if(downSampleArr != null && downSampleArr.length > 0){
145  String dwnSample = downSampleArr[0];
146  Pattern pattern = Pattern.compile("[msh]");
147  Matcher matcher = pattern.matcher(dwnSample);
148  if (matcher.find()) {
149  isValidDownsample = true;
150  }
151  }
152  }
153  return isValidDownsample;
154  }
155 
162  private String formatDate(final String date) {
163  return date.replaceFirst("-", "/").replaceFirst("-", "/");
164  }
165 
166  private String getOpenTsdbRequestBody(final String startDate, final String endDate,
167  final String downsample, final List<String> switchIds, final String port,
168  final String flowId, final String srcSwitch, final String srcPort,
169  final String dstSwitch, final String dstPort, final StatsType statsType,
170  final String metric, final String direction) throws JsonProcessingException {
171  LOGGER.info("Inside getOpenTsdbRequestBody :");
172 
173  List<Query> queries = getQueries(startDate, endDate, downsample, switchIds, port, flowId,
174  srcSwitch, srcPort, dstSwitch, dstPort, statsType, metric, direction);
175  return getRequest(startDate, endDate, queries);
176  }
177 
178  private String getRequest(final String startDate, final String endDate,
179  final List<Query> queryList) throws JsonProcessingException {
180  IslStats islStatsRequest = new IslStats();
181  islStatsRequest.setStart(formatDate(startDate));
182  islStatsRequest.setEnd(formatDate(endDate));
183  islStatsRequest.setQueries(queryList);
184  return JsonUtil.toString(islStatsRequest);
185  }
186 
194  private List<String> getMetircs(StatsType statsType, String metric){
195  List<String> metricList = new ArrayList<String>();
196  if (statsType.equals(StatsType.PORT)) {
197  metricList = Metrics.switchValue(metric);
198  } else if (statsType.equals(StatsType.FLOW)) {
199  metricList = Metrics.flowValue(metric, true);
200  } else if (statsType.equals(StatsType.ISL)) {
201  metricList = Metrics.switchValue(metric);
202  } else if (statsType.equals(StatsType.ISL_LOSS_PACKET)) {
203  metricList = Metrics.switchValue(metric);
204  } else if (statsType.equals(StatsType.FLOW_LOSS_PACKET)) {
205  metricList = Metrics.flowValue("packets", false);
206  metricList.addAll(Metrics.flowValue(metric, false));
207  } else if (statsType.equals(StatsType.FLOW_RAW_PACKET)) {
208  metricList = Metrics.flowValue("packets", false);
209  metricList.addAll(Metrics.flowValue(metric, false));
210  } else if (statsType.equals(StatsType.SWITCH_PORT)) {
211  metricList = Metrics.getStartsWith("Switch_");
212  }
213  return metricList;
214  }
215 
234  private List<Query> getQueries(final String startDate, final String endDate,
235  final String downsample, final List<String> switchIds, final String port,
236  final String flowId, final String srcSwitch, final String srcPort,
237  final String dstSwitch, final String dstPort, final StatsType statsType,
238  final String metric, final String direction){
239 
240  List<String> metricList = getMetircs(statsType, metric);
241 
242  List<Query> queries = new ArrayList<Query>();
243  if (statsType.equals(StatsType.FLOW_LOSS_PACKET)) {
244  queries = getFlowLossPacketsQueries(queries, downsample, flowId, srcSwitch, srcPort,
245  statsType, metricList, direction);
246  } else if (statsType.equals(StatsType.ISL_LOSS_PACKET)) {
247  queries = getIslLossPacketsQueries(queries, downsample, flowId, srcSwitch, srcPort,
248  dstSwitch, dstPort, statsType, metricList);
249  } else if (statsType.equals(StatsType.FLOW_RAW_PACKET)) {
250  queries = getFlowRawPacketsQueries(queries, downsample, switchIds, flowId, statsType,
251  metricList, direction);
252  } else if (statsType.equals(StatsType.SWITCH_PORT)) {
253  queries = getSwitchPortQueries(queries, switchIds, metricList, statsType, downsample);
254  } else {
255  String switchId = (switchIds == null || switchIds.isEmpty()) ? null : switchIds.get(0);
256  Map<String, String[]> params = getParam(statsType, switchId, port, flowId, srcSwitch,
257  srcPort, dstSwitch, dstPort);
258  if (metricList != null && !metricList.isEmpty()) {
259  for (int index = 0; index < metricList.size(); index++) {
260  String metricName = metricList.get(index);
261  queries.add(getQuery(downsample, metricName, params, index, statsType));
262  }
263  }
264  }
265  return queries;
266  }
267 
281  private List<Query> getFlowLossPacketsQueries(List<Query> queries,final String downsample, final String flowId,
282  final String srcSwitch, final String srcPort, final StatsType statsType,
283  final List<String> metricList, final String direction) {
284  Map<String, String[]> params =
285  getParam(statsType, null, null, flowId, srcSwitch, srcPort, null, null);
286  int index = (direction.isEmpty() || "forward".equalsIgnoreCase(direction)) ? 0 : 1;
287  if (metricList != null && !metricList.isEmpty()) {
288  queries.add(getQuery(downsample, metricList.get(0), params, index, statsType));
289  queries.add(getQuery(downsample, metricList.get(1), params, index, statsType));
290  }
291  return queries;
292  }
293 
308  private List<Query> getIslLossPacketsQueries(List<Query> queries, final String downsample, final String flowId,
309  final String srcSwitch, final String srcPort, final String dstSwitch,
310  final String dstPort, final StatsType statsType, final List<String> metricList) {
311  Map<String, String[]> rx_params =
312  getParam(statsType, null, null, flowId, srcSwitch, srcPort, null, null);
313  Map<String, String[]> tx_params =
314  getParam(statsType, null, null, flowId, null, null, dstSwitch, dstPort);
315  if (metricList != null && !metricList.isEmpty()) {
316  queries.add(getQuery(downsample, metricList.get(0), rx_params, 0, statsType));
317  queries.add(getQuery(downsample, metricList.get(1), tx_params, 0, statsType));
318  }
319  return queries;
320  }
321 
333  private List<Query> getFlowRawPacketsQueries(List<Query> queries, final String downsample,
334  final List<String> switchIds, final String flowId, final StatsType statsType,
335  final List<String> metricList, final String direction) {
336  Map<String, String[]> flow_params =
337  getParam(StatsType.FLOW, null, null, flowId, null, null, null, null);
338  Map<String, String[]> params =
339  getParam(StatsType.FLOW_RAW_PACKET, null, null, flowId, null, null, null, null);
340  if (metricList != null && !metricList.isEmpty()) {
341  int index = (direction.isEmpty() || "forward".equalsIgnoreCase(direction)) ? 0 : 1;
342  Query query = getQuery(downsample, metricList.get(1), params, index, statsType);
343  for (String switchId : switchIds) {
344  params = getParam(StatsType.SWITCH, switchId, null, null, null, null, null, null);
345  populateFiltersAndReturnDownsample(query.getFilters(), params, 0, statsType);
346  }
347  queries.add(query);
348  Query flow_query =
349  getQuery(downsample, metricList.get(0), flow_params, index, StatsType.FLOW);
350  queries.add(flow_query);
351  }
352  return queries;
353  }
354 
365  private List<Query> getSwitchPortQueries(List<Query> queries, final List<String> switchIds,
366  final List<String> metricList, final StatsType statsType, final String downsample) {
367  String switchId = switchIds.isEmpty() ? null : switchIds.get(0);
368  Map<String, String[]> params =
369  getParam(StatsType.PORT, switchId, "*", null, null, null, null, null);
370  if (metricList != null && !metricList.isEmpty()) {
371  for (int index = 0; index < metricList.size(); index++) {
372  String metricName = metricList.get(index);
373  queries.add(getQuery(downsample, metricName, params, 0, statsType));
374  }
375  }
376  return queries;
377  }
378 
379 
380  private Map<String, String[]> getParam(final StatsType statsType, final String switchId,
381  final String port, final String flowId, final String srcSwitch, final String srcPort,
382  final String dstSwitch, final String dstPort) {
383  Map<String, String[]> params = new HashMap<String, String[]>();
384 
385  if (statsType.equals(StatsType.SWITCH)) {
386  params.put("switchid", new String[] {switchId});
387  } else if (statsType.equals(StatsType.PORT)) {
388  params.put("switchid", new String[] {switchId});
389  params.put("port", new String[] {port});
390  } else if (statsType.equals(StatsType.FLOW)
391  || statsType.equals(StatsType.FLOW_LOSS_PACKET)) {
392  params.put("flowid", new String[] {flowId});
393  params.put("direction", new String[] {});
394  } else if (statsType.equals(StatsType.ISL)) {
395  params.put("src_switch", new String[] {srcSwitch});
396  params.put("src_port", new String[] {srcPort});
397  params.put("dst_switch", new String[] {dstSwitch});
398  params.put("dst_port", new String[] {dstPort});
399  } else if (statsType.equals(StatsType.ISL_LOSS_PACKET)) {
400  if (srcSwitch == null && srcPort == null) {
401  params.put("switchid", new String[] {dstSwitch});
402  params.put("port", new String[] {dstPort});
403  } else {
404  params.put("switchid", new String[] {srcSwitch});
405  params.put("port", new String[] {srcPort});
406  }
407  } else if (statsType.equals(StatsType.FLOW_RAW_PACKET)) {
408  params.put("flowid", new String[] {flowId});
409  }
410  return params;
411  }
412 
413 }
static boolean isValidResponse(final HttpResponse response)
String getStats(final String startDate, final String endDate, final String downsample, final List< String > switchId, final String port, final String flowId, final String srcSwitch, final String srcPort, final String dstSwitch, final String dstPort, final StatsType statsType, final String metric, final String direction)
static String toString(final InputStream inputStream)
Definition: IoUtil.java:29
def index()
Definition: login.py:30