1 package org.openkilda.wfm.topology.opentsdb.bolts;
5 import org.apache.storm.task.OutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.topology.base.BaseRichBolt;
9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
15 import java.util.List;
17 import java.util.stream.Collectors;
18 import java.util.stream.Stream;
21 private static final Logger LOGGER = LoggerFactory.getLogger(
DatapointParseBolt.class);
22 private OutputCollector collector;
25 public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
26 this.collector = collector;
31 final String
data = tuple.getString(0);
32 LOGGER.debug(
"Processing datapoint: " +
data);
35 List<Object> stream = Stream.of(datapoint.
simpleHashCode(), datapoint)
36 .collect(Collectors.toList());
37 collector.emit(stream);
38 }
catch (Exception e) {
39 LOGGER.error(
"Failed reading data: " +
data, e);
47 declarer.declare(
new Fields(
"hash",
"datapoint"));
static final ObjectMapper MAPPER
void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
void declareOutputFields(OutputFieldsDeclarer declarer)
void execute(Tuple tuple)