16 package org.openkilda.wfm.topology.opentsdb;
24 import com.google.common.annotations.VisibleForTesting;
25 import org.apache.storm.generated.StormTopology;
26 import org.apache.storm.kafka.spout.KafkaSpout;
27 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
28 import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
29 import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
30 import org.apache.storm.opentsdb.client.OpenTsdbClient;
31 import org.apache.storm.topology.TopologyBuilder;
32 import org.apache.storm.tuple.Fields;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import java.util.Collections;
43 private static final Logger LOGGER = LoggerFactory.getLogger(
OpenTSDBTopology.class);
50 static final String OTSDB_SPOUT_ID =
"kilda.otsdb-spout";
51 private static final String OTSDB_BOLT_ID =
"otsdb-bolt";
52 private static final String OTSDB_FILTER_BOLT_ID =
OpenTSDBFilterBolt.class.getSimpleName();
53 private static final String OTSDB_PARSE_BOLT_ID =
DatapointParseBolt.class.getSimpleName();
57 LOGGER.info(
"Creating OpenTSDBTopology - {}",
topologyName);
59 TopologyBuilder tb =
new TopologyBuilder();
67 .shuffleGrouping(OTSDB_SPOUT_ID);
70 .fieldsGrouping(OTSDB_PARSE_BOLT_ID,
new Fields(
"hash"));
72 OpenTsdbClient.Builder tsdbBuilder = OpenTsdbClient
73 .newBuilder(openTsdbConfig.
getHosts())
77 tsdbBuilder.enableChunkedEncoding();
80 OpenTsdbBolt openTsdbBolt =
new OpenTsdbBolt(tsdbBuilder,
81 Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
86 .shuffleGrouping(OTSDB_FILTER_BOLT_ID);
88 return tb.createTopology();
91 private void attachInput(TopologyBuilder
topology) {
98 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
100 KafkaSpout kafkaSpout =
new KafkaSpout<>(spoutConfig);
111 }
catch (Exception e) {
int getFilterBoltExecutors()
OpenTSDBTopology(LaunchEnvironment env)
StormTopology createTopology()
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
static int handleLaunchException(Exception error)
int getDatapointParseBoltWorkers()
int getDatapointParseBoltExecutors()
static void main(String[] args)
final String topologyName
boolean getClientChunkedRequestsEnabled()
void checkAndCreateTopic(final String topic)