16 package org.openkilda.wfm.topology.islstats;
22 import org.apache.storm.generated.StormTopology;
23 import org.apache.storm.kafka.bolt.KafkaBolt;
24 import org.apache.storm.topology.TopologyBuilder;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 private static final Logger logger = LoggerFactory.getLogger(
IslStatsTopology.class);
31 private static final String ISL_STATS_SPOUT_ID =
"islstats-spout";
32 private static final String ISL_STATS_OTSDB_BOLT_ID =
"islstats-otsdb-bolt";
33 private static final String ISL_STATS_BOLT_ID =
IslStatsBolt.class.getSimpleName();
40 logger.info(
"Creating IslStatsTopology - {}",
topologyName);
42 TopologyBuilder builder =
new TopologyBuilder();
47 logger.debug(
"connecting to {} topic", topoDiscoTopic);
48 builder.setSpout(ISL_STATS_SPOUT_ID,
createKafkaSpout(topoDiscoTopic, ISL_STATS_SPOUT_ID));
51 logger.debug(
"starting {} bolt", ISL_STATS_BOLT_ID);
52 builder.setBolt(ISL_STATS_BOLT_ID, verifyIslStatsBolt,
topologyConfig.getParallelism())
53 .shuffleGrouping(ISL_STATS_SPOUT_ID);
58 builder.setBolt(ISL_STATS_OTSDB_BOLT_ID, openTsdbBolt,
topologyConfig.getParallelism())
59 .shuffleGrouping(ISL_STATS_BOLT_ID);
61 return builder.createTopology();
68 }
catch (Exception e) {
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
StormTopology createTopology()
final String topologyName
IslStatsTopology(LaunchEnvironment env)
static void main(String[] args)
void checkAndCreateTopic(final String topic)