16 package org.openkilda.wfm.topology.stats;
39 import org.apache.storm.generated.StormTopology;
40 import org.apache.storm.kafka.spout.KafkaSpout;
41 import org.apache.storm.topology.TopologyBuilder;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
48 private static final Logger logger = LoggerFactory.getLogger(
StatsTopology.class);
54 public static void main(String[]
args)
throws Exception {
58 }
catch (Exception e) {
65 logger.info(
"Creating StatsTopology - {}",
topologyName);
68 TopologyBuilder builder =
new TopologyBuilder();
73 builder.setSpout(kafkaSpoutId, kafkaSpout, parallelism);
77 builder.setBolt(statsOfsBolt, speakerBolt, parallelism)
78 .shuffleGrouping(kafkaSpoutId);
82 STATS_KILDA_SPEAKER_SPOUT.name());
83 builder.setSpout(STATS_KILDA_SPEAKER_SPOUT.name(), kafkaSpeakerSpout, parallelism);
89 .shuffleGrouping(STATS_KILDA_SPEAKER_SPOUT.name());
95 builder.setBolt(STATS_CACHE_BOLT.name(),
new CacheBolt(pathComputerAuth), parallelism)
96 .allGrouping(STATS_CACHE_FILTER_BOLT.name(), CACHE_UPDATE.name())
99 builder.setBolt(PORT_STATS_METRIC_GEN.name(),
new PortMetricGenBolt(), parallelism)
106 logger.debug(
"starting flow_stats_metric_gen");
107 builder.setBolt(FLOW_STATS_METRIC_GEN.name(),
115 .shuffleGrouping(PORT_STATS_METRIC_GEN.name())
116 .shuffleGrouping(METER_CFG_STATS_METRIC_GEN.name())
117 .shuffleGrouping(FLOW_STATS_METRIC_GEN.name());
121 return builder.createTopology();
METER_CFG_STATS_METRIC_GEN
static void main(String[] args)
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static final Fields fieldMessage
static int handleLaunchException(Exception error)
StormTopology createTopology()
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
StatsTopology(LaunchEnvironment env)
STATS_KILDA_SPEAKER_SPOUT
final ConfigurationProvider configurationProvider
final String topologyName
void checkAndCreateTopic(final String topic)