Open Kilda Java Documentation
IslStatsBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.islstats.bolts;
17 
18 import org.apache.storm.task.OutputCollector;
19 import org.apache.storm.task.TopologyContext;
20 import org.apache.storm.topology.OutputFieldsDeclarer;
21 import org.apache.storm.topology.base.BaseRichBolt;
22 import org.apache.storm.tuple.Tuple;
31 
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 
35 import java.io.IOException;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 
41 public class IslStatsBolt extends BaseRichBolt {
42 
43  private OutputCollector collector;
44  private static final Logger logger = LoggerFactory.getLogger(IslStatsBolt.class);
45 
46  private static List<Object> tsdbTuple(String metric, long timestamp, Number value, Map<String, String> tag)
47  throws IOException{
48  Datapoint datapoint = new Datapoint(metric, timestamp, tag, value);
49  return Collections.singletonList(Utils.MAPPER.writeValueAsString(datapoint));
50  }
51 
52  @Override
53  public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
54  this.collector = collector;
55  }
56 
57  public List<Object> buildTsdbTuple(IslInfoData data, long timestamp) throws IOException {
58  List<PathNode> path = data.getPath();
59  Map<String, String> tags = new HashMap<>();
60  tags.put("src_switch", path.get(0).getSwitchId().toOtsdFormat());
61  tags.put("src_port", String.valueOf(path.get(0).getPortNo()));
62  tags.put("dst_switch", path.get(1).getSwitchId().toOtsdFormat());
63  tags.put("dst_port", String.valueOf(path.get(1).getPortNo()));
64 
65  return tsdbTuple("pen.isl.latency", timestamp, data.getLatency(), tags);
66  }
67 
68  public String getJson(Tuple tuple) {
69  return tuple.getString(0);
70  }
71 
72  public Message getMessage(String json) throws IOException {
73  return Utils.MAPPER.readValue(json, Message.class);
74  }
75 
76  public InfoData getInfoData(Message message) throws Exception {
77  if (!(message instanceof InfoMessage)) {
78  throw new Exception(message.getClass().getName() + " is not an InfoMessage");
79  }
80  InfoData data = ((InfoMessage) message).getData();
81  return data;
82  }
83 
84  public IslInfoData getIslInfoData(InfoData data) throws Exception {
85  if (!(data instanceof IslInfoData)) {
86  throw new Exception(data.getClass().getName() + " is not an IslInfoData");
87  }
88  return (IslInfoData) data;
89  }
90 
91  @Override
92  public void execute(Tuple tuple) {
93  logger.debug("tuple: " + tuple);
94  String json = getJson(tuple);
95  try {
96  Message message = getMessage(json);
98  List<Object> results = buildTsdbTuple(data, message.getTimestamp());
99  logger.debug("emit: " + results);
100  collector.emit(results);
101  } catch(IOException e) {
102  logger.error("Could not deserialize message={}", json, e);
103  } catch(Exception e) {
104  // TODO: has to be a cleaner way to do this?
105  } finally {
106  collector.ack(tuple);
107  }
108  }
109 
110  @Override
111  public void declareOutputFields(OutputFieldsDeclarer declarer) {
112  declarer.declare(AbstractTopology.fieldMessage);
113  }
114 
115 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
value
Definition: nodes.py:62
void declareOutputFields(OutputFieldsDeclarer declarer)
List< Object > buildTsdbTuple(IslInfoData data, long timestamp)
void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)