Open Kilda Java Documentation
DatapointParseBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.opentsdb.bolts;
2 
3 import static org.openkilda.messaging.Utils.MAPPER;
4 
5 import org.apache.storm.task.OutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.topology.base.BaseRichBolt;
9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 
15 import java.util.List;
16 import java.util.Map;
17 import java.util.stream.Collectors;
18 import java.util.stream.Stream;
19 
20 public class DatapointParseBolt extends BaseRichBolt {
21  private static final Logger LOGGER = LoggerFactory.getLogger(DatapointParseBolt.class);
22  private OutputCollector collector;
23 
24  @Override
25  public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
26  this.collector = collector;
27  }
28 
29  @Override
30  public void execute(Tuple tuple) {
31  final String data = tuple.getString(0);
32  LOGGER.debug("Processing datapoint: " + data);
33  try {
34  Datapoint datapoint = MAPPER.readValue(data, Datapoint.class);
35  List<Object> stream = Stream.of(datapoint.simpleHashCode(), datapoint)
36  .collect(Collectors.toList());
37  collector.emit(stream);
38  } catch (Exception e) {
39  LOGGER.error("Failed reading data: " + data, e);
40  } finally {
41  collector.ack(tuple);
42  }
43  }
44 
45  @Override
46  public void declareOutputFields(OutputFieldsDeclarer declarer) {
47  declarer.declare(new Fields("hash", "datapoint"));
48  }
49 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)