Open Kilda Java Documentation
PortMetricGenBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats.metrics;
17 
20 
30 
31 import com.google.common.collect.ImmutableMap;
32 import org.apache.storm.tuple.Tuple;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 
36 import java.util.HashMap;
37 import java.util.Map;
38 
39 public class PortMetricGenBolt extends MetricGenBolt {
40  private static final Logger LOGGER = LoggerFactory.getLogger(PortMetricGenBolt.class);
41 
42  private Map<SwitchId, SwitchId> switchNameCache = new HashMap<>();
43 
44  @Override
45  public void execute(Tuple input) {
46  StatsComponentType componentId = StatsComponentType.valueOf(input.getSourceComponent());
47  InfoMessage message = (InfoMessage) input.getValueByField(MESSAGE_FIELD);
48 
49  if (!Destination.WFM_STATS.equals(message.getDestination())) {
50  collector.ack(input);
51  return;
52  }
53 
54  LOGGER.debug("Port stats message: {}={}, component={}, stream={}", CORRELATION_ID, message.getCorrelationId(),
55  componentId, StatsStreamType.valueOf(input.getSourceStreamId()));
57  long timestamp = message.getTimestamp();
58 
59  try {
60  SwitchId switchId = switchNameCache.get(data.getSwitchId());
61  if (switchId == null) {
62  switchId = data.getSwitchId();
63  switchNameCache.put(data.getSwitchId(), switchId);
64  }
65 
66  for (PortStatsReply reply : data.getStats()) {
67  for (PortStatsEntry entry : reply.getEntries()) {
68  emit(entry, timestamp, switchId);
69  }
70  }
71  } finally {
72  collector.ack(input);
73  }
74  }
75 
76  private void emit(PortStatsEntry entry, long timestamp, SwitchId switchId) {
77  try {
78  Map<String, String> tags = ImmutableMap.of(
79  "switchid", switchId.toOtsdFormat(),
80  "port", String.valueOf(entry.getPortNo())
81  );
82 
83  collector.emit(tuple("pen.switch.rx-packets", timestamp, entry.getRxPackets(), tags));
84  collector.emit(tuple("pen.switch.tx-packets", timestamp, entry.getTxPackets(), tags));
85  collector.emit(tuple("pen.switch.rx-bytes", timestamp, entry.getRxBytes(), tags));
86  collector.emit(tuple("pen.switch.rx-bits", timestamp, entry.getRxBytes() * 8, tags));
87  collector.emit(tuple("pen.switch.tx-bytes", timestamp, entry.getTxBytes(), tags));
88  collector.emit(tuple("pen.switch.tx-bits", timestamp, entry.getTxBytes() * 8, tags));
89  collector.emit(tuple("pen.switch.rx-dropped", timestamp, entry.getRxDropped(), tags));
90  collector.emit(tuple("pen.switch.tx-dropped", timestamp, entry.getTxDropped(), tags));
91  collector.emit(tuple("pen.switch.rx-errors", timestamp, entry.getRxErrors(), tags));
92  collector.emit(tuple("pen.switch.tx-errors", timestamp, entry.getTxErrors(), tags));
93  collector.emit(tuple("pen.switch.rx-frame-error", timestamp, entry.getRxFrameErr(), tags));
94  collector.emit(tuple("pen.switch.rx-over-error", timestamp, entry.getRxOverErr(), tags));
95  collector.emit(tuple("pen.switch.rx-crc-error", timestamp, entry.getRxCrcErr(), tags));
96  collector.emit(tuple("pen.switch.collisions", timestamp, entry.getCollisions(), tags));
97  } catch (JsonEncodeException e) {
98  LOGGER.error("Error during serialization of datapoint", e);
99  }
100  }
101 }
long getCollisions()
int getPortNo()
long getRxFrameErr()
long getTxPackets()
static List< Object > tuple(String metric, long timestamp, Number value, Map< String, String > tag)
long getTxErrors()
long getRxErrors()
long getRxCrcErr()
static final String CORRELATION_ID
Definition: Utils.java:43
long getTxBytes()
long getRxDropped()
long getRxBytes()
long getTxDropped()
long getRxOverErr()
long getRxPackets()