Open Kilda Java Documentation
FlowMetricGenBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats.metrics;
17 
21 
34 
35 import org.apache.storm.task.OutputCollector;
36 import org.apache.storm.task.TopologyContext;
37 import org.apache.storm.tuple.Tuple;
38 import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
42 import java.util.HashMap;
43 import java.util.Map;
44 import javax.annotation.Nonnull;
45 import javax.annotation.Nullable;
46 
50 public class FlowMetricGenBolt extends MetricGenBolt {
51 
52  private static final Logger LOGGER = LoggerFactory.getLogger(FlowMetricGenBolt.class);
53 
54 
55  @Override
56  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
57  this.collector = collector;
58  }
59 
60  @Override
61  public void execute(Tuple input) {
62  StatsComponentType componentId = StatsComponentType.valueOf(input.getSourceComponent());
63  InfoMessage message = (InfoMessage) input.getValueByField(MESSAGE_FIELD);
64 
65  Map<Long, CacheFlowEntry> dataCache =
66  (Map<Long, CacheFlowEntry>) input.getValueByField(CACHE_FIELD);
67 
68  LOGGER.debug("dataCache in FlowMetricGenBolt {}", dataCache);
69 
70  if (!Destination.WFM_STATS.equals(message.getDestination())) {
71  collector.ack(input);
72  return;
73  }
74 
75  LOGGER.debug("Flow stats message: {}={}, component={}, stream={}",
76  CORRELATION_ID, message.getCorrelationId(), componentId,
77  StatsStreamType.valueOf(input.getSourceStreamId()));
79  long timestamp = message.getTimestamp();
80  SwitchId switchId = data.getSwitchId();
81 
82  try {
83  for (FlowStatsReply reply : data.getStats()) {
84  for (FlowStatsEntry entry : reply.getEntries()) {
85  @Nullable CacheFlowEntry flowEntry = dataCache.get(entry.getCookie());
86  emit(entry, timestamp, switchId, flowEntry);
87  }
88  }
89  collector.ack(input);
90  } catch (ServiceUnavailableException e) {
91  LOGGER.error("Error process: {}", input.toString(), e);
92  collector.ack(input); // If we can't connect to Neo then don't know if valid input,
93  // but if NEO is down puts a loop to kafka, so fail the request.
94  } catch (Exception e) {
95  collector.ack(input); // We tried, no need to try again
96  }
97  }
98 
99  private void emit(FlowStatsEntry entry, long timestamp, @Nonnull SwitchId switchId,
100  @Nullable CacheFlowEntry flowEntry) throws Exception {
101  String flowId = "unknown";
102  if (flowEntry != null) {
103  flowId = flowEntry.getFlowId();
104  } else {
105  LOGGER.warn("missed cache for sw {} cookie {}", switchId, entry.getCookie());
106  }
107 
108  emitAnySwitchMetrics(entry, timestamp, switchId, flowId);
109 
110  if (flowEntry != null) {
111  Map<String, String> flowTags = makeFlowTags(entry, flowEntry.getFlowId());
112 
113  boolean isMatch = false;
114  if (switchId.toOtsdFormat().equals(flowEntry.getIngressSwitch())) {
115  emitIngressMetrics(entry, timestamp, flowTags);
116  isMatch = true;
117  }
118  if (switchId.toOtsdFormat().equals(flowEntry.getEgressSwitch())) {
119  emitEgressMetrics(entry, timestamp, flowTags);
120  isMatch = true;
121  }
122 
123  if (!isMatch && LOGGER.isDebugEnabled()) {
124  LOGGER.debug("FlowStatsEntry with cookie {} and flow {} is not ingress not egress bc switch {} "
125  + "is not any of {}, {}", entry.getCookie(), flowId, switchId,
126  flowEntry.getIngressSwitch(), flowEntry.getEgressSwitch());
127  }
128  }
129  }
130 
131  private void emitAnySwitchMetrics(FlowStatsEntry entry, long timestamp, SwitchId switchId, String flowId)
132  throws JsonEncodeException {
133  Map<String, String> tags = new HashMap<>();
134  tags.put("switchid", switchId.toOtsdFormat());
135  tags.put("cookie", String.valueOf(entry.getCookie()));
136  tags.put("tableid", String.valueOf(entry.getTableId()));
137  tags.put("flowid", flowId);
138 
139  collector.emit(tuple("pen.flow.raw.packets", timestamp, entry.getPacketCount(), tags));
140  collector.emit(tuple("pen.flow.raw.bytes", timestamp, entry.getByteCount(), tags));
141  collector.emit(tuple("pen.flow.raw.bits", timestamp, entry.getByteCount() * 8, tags));
142  }
143 
144  private void emitIngressMetrics(FlowStatsEntry entry, long timestamp, Map<String, String> tags)
145  throws JsonEncodeException {
146  collector.emit(tuple("pen.flow.ingress.packets", timestamp, entry.getPacketCount(), tags));
147  collector.emit(tuple("pen.flow.ingress.bytes", timestamp, entry.getByteCount(), tags));
148  collector.emit(tuple("pen.flow.ingress.bits", timestamp, entry.getByteCount() * 8, tags));
149  }
150 
151  private void emitEgressMetrics(FlowStatsEntry entry, long timestamp, Map<String, String> tags)
152  throws JsonEncodeException {
153  collector.emit(tuple("pen.flow.packets", timestamp, entry.getPacketCount(), tags));
154  collector.emit(tuple("pen.flow.bytes", timestamp, entry.getByteCount(), tags));
155  collector.emit(tuple("pen.flow.bits", timestamp, entry.getByteCount() * 8, tags));
156  }
157 
158  private Map<String, String> makeFlowTags(FlowStatsEntry entry, String flowId) throws FlowCookieException {
159  Map<String, String> tags = new HashMap<>();
160  tags.put("flowid", flowId);
161  tags.put("direction", FlowDirectionHelper.findDirection(entry.getCookie()).name().toLowerCase());
162 
163  return tags;
164  }
165 }
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
name
Definition: setup.py:24
static List< Object > tuple(String metric, long timestamp, Number value, Map< String, String > tag)
static final String CORRELATION_ID
Definition: Utils.java:43