16 package org.openkilda.wfm.topology.stats.metrics;
35 import org.apache.storm.task.OutputCollector;
36 import org.apache.storm.task.TopologyContext;
37 import org.apache.storm.tuple.Tuple;
38 import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import java.util.HashMap;
44 import javax.annotation.Nonnull;
45 import javax.annotation.Nullable;
52 private static final Logger LOGGER = LoggerFactory.getLogger(
FlowMetricGenBolt.class);
56 public void prepare(Map stormConf, TopologyContext context, OutputCollector
collector) {
65 Map<Long, CacheFlowEntry> dataCache =
66 (Map<Long, CacheFlowEntry>) input.getValueByField(CACHE_FIELD);
68 LOGGER.debug(
"dataCache in FlowMetricGenBolt {}", dataCache);
75 LOGGER.debug(
"Flow stats message: {}={}, component={}, stream={}",
85 @Nullable
CacheFlowEntry flowEntry = dataCache.get(entry.getCookie());
86 emit(entry, timestamp, switchId, flowEntry);
90 }
catch (ServiceUnavailableException e) {
91 LOGGER.error(
"Error process: {}", input.toString(), e);
94 }
catch (Exception e) {
101 String flowId =
"unknown";
102 if (flowEntry != null) {
103 flowId = flowEntry.getFlowId();
105 LOGGER.warn(
"missed cache for sw {} cookie {}", switchId, entry.getCookie());
108 emitAnySwitchMetrics(entry, timestamp, switchId, flowId);
110 if (flowEntry != null) {
111 Map<String, String> flowTags = makeFlowTags(entry, flowEntry.getFlowId());
113 boolean isMatch =
false;
114 if (switchId.toOtsdFormat().equals(flowEntry.getIngressSwitch())) {
115 emitIngressMetrics(entry, timestamp, flowTags);
118 if (switchId.toOtsdFormat().equals(flowEntry.getEgressSwitch())) {
119 emitEgressMetrics(entry, timestamp, flowTags);
123 if (!isMatch && LOGGER.isDebugEnabled()) {
124 LOGGER.debug(
"FlowStatsEntry with cookie {} and flow {} is not ingress not egress bc switch {} " 125 +
"is not any of {}, {}", entry.getCookie(), flowId, switchId,
126 flowEntry.getIngressSwitch(), flowEntry.getEgressSwitch());
131 private void emitAnySwitchMetrics(FlowStatsEntry entry,
long timestamp, SwitchId switchId, String flowId)
132 throws JsonEncodeException {
133 Map<String, String> tags =
new HashMap<>();
134 tags.put(
"switchid", switchId.toOtsdFormat());
135 tags.put(
"cookie", String.valueOf(entry.getCookie()));
136 tags.put(
"tableid", String.valueOf(entry.getTableId()));
137 tags.put(
"flowid", flowId);
139 collector.emit(
tuple(
"pen.flow.raw.packets", timestamp, entry.getPacketCount(), tags));
140 collector.emit(
tuple(
"pen.flow.raw.bytes", timestamp, entry.getByteCount(), tags));
141 collector.emit(
tuple(
"pen.flow.raw.bits", timestamp, entry.getByteCount() * 8, tags));
144 private void emitIngressMetrics(FlowStatsEntry entry,
long timestamp, Map<String, String> tags)
145 throws JsonEncodeException {
146 collector.emit(
tuple(
"pen.flow.ingress.packets", timestamp, entry.getPacketCount(), tags));
147 collector.emit(
tuple(
"pen.flow.ingress.bytes", timestamp, entry.getByteCount(), tags));
148 collector.emit(
tuple(
"pen.flow.ingress.bits", timestamp, entry.getByteCount() * 8, tags));
151 private void emitEgressMetrics(FlowStatsEntry entry,
long timestamp, Map<String, String> tags)
152 throws JsonEncodeException {
153 collector.emit(
tuple(
"pen.flow.packets", timestamp, entry.getPacketCount(), tags));
154 collector.emit(
tuple(
"pen.flow.bytes", timestamp, entry.getByteCount(), tags));
155 collector.emit(
tuple(
"pen.flow.bits", timestamp, entry.getByteCount() * 8, tags));
158 private Map<String, String> makeFlowTags(FlowStatsEntry entry, String flowId)
throws FlowCookieException {
159 Map<String, String> tags =
new HashMap<>();
160 tags.put(
"flowid", flowId);
161 tags.put(
"direction", FlowDirectionHelper.findDirection(entry.getCookie()).
name().toLowerCase());
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final String MESSAGE_FIELD
OutputCollector collector
static List< Object > tuple(String metric, long timestamp, Number value, Map< String, String > tag)
static final String CORRELATION_ID
Destination getDestination()
String getCorrelationId()
void execute(Tuple input)
static final String CACHE_FIELD