Open Kilda Java Documentation
SpeakerBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats.bolts;
17 
19 
29 
30 import org.apache.storm.task.OutputCollector;
31 import org.apache.storm.task.TopologyContext;
32 import org.apache.storm.topology.OutputFieldsDeclarer;
33 import org.apache.storm.topology.base.BaseRichBolt;
34 import org.apache.storm.tuple.Tuple;
35 import org.apache.storm.tuple.Values;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 
39 import java.io.IOException;
40 import java.util.Map;
41 
42 public class SpeakerBolt extends BaseRichBolt {
43  private static final Logger logger = LoggerFactory.getLogger(SpeakerBolt.class);
44  private static final String PORT_STATS_STREAM = StatsStreamType.PORT_STATS.toString();
45  private static final String METER_CFG_STATS_STREAM = StatsStreamType.METER_CONFIG_STATS.toString();
46  private static final String FLOW_STATS_STREAM = StatsStreamType.FLOW_STATS.toString();
47 
48  private OutputCollector outputCollector;
49 
53  @Override
54  public void execute(Tuple tuple) {
55  logger.debug("Ingoing tuple: {}", tuple);
56  String request = tuple.getString(0);
57  //String request = tuple.getStringByField("value");
58  try {
59  Message stats = Utils.MAPPER.readValue(request, Message.class);
60  if (!Destination.WFM_STATS.equals(stats.getDestination()) || !(stats instanceof InfoMessage)) {
61  return;
62  }
63  InfoMessage message = (InfoMessage) stats;
64  final InfoData data = message.getData();
65  if (data instanceof PortStatsData) {
66  logger.debug("Port stats message: {}", new Values(request));
67  outputCollector.emit(PORT_STATS_STREAM, tuple, new Values(message));
68  } else if (data instanceof MeterConfigStatsData) {
69  logger.debug("Meter config stats message: {}", new Values(request));
70  outputCollector.emit(METER_CFG_STATS_STREAM, tuple, new Values(message));
71  } else if (data instanceof FlowStatsData) {
72  logger.debug("Flow stats message: {}", new Values(request));
73  outputCollector.emit(FLOW_STATS_STREAM, tuple, new Values(message));
74  }
75  } catch (IOException exception) {
76  logger.error("Could not deserialize message={}", request, exception);
77  } finally {
78  outputCollector.ack(tuple);
79  logger.debug("Message ack: {}", request);
80  }
81  }
82 
86  @Override
87  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
88  outputFieldsDeclarer.declareStream(PORT_STATS_STREAM, fieldMessage);
89  outputFieldsDeclarer.declareStream(METER_CFG_STATS_STREAM, fieldMessage);
90  outputFieldsDeclarer.declareStream(FLOW_STATS_STREAM, fieldMessage);
91  }
92 
96  @Override
97  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
98  this.outputCollector = outputCollector;
99  }
100 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)