1 package org.openkilda.wfm.topology.portstate;
11 import org.apache.storm.generated.StormTopology;
12 import org.apache.storm.kafka.bolt.KafkaBolt;
13 import org.apache.storm.topology.TopologyBuilder;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
18 private static final Logger logger = LoggerFactory.getLogger(
PortStateTopology.class);
21 private static final String WFM_STATS_SPOUT =
"wfm.stats.spout";
22 private static final int JANITOR_REFRESH = 600;
23 private static final String PARSE_PORT_INFO_BOLT_NAME =
ParsePortInfoBolt.class.getSimpleName();
24 private static final String TOPO_DISCO_PARSE_BOLT_NAME =
TopoDiscoParseBolt.class.getSimpleName();
25 private static final String SWITCH_PORTS_SPOUT_NAME =
SwitchPortsSpout.class.getSimpleName();
26 private static final String WFM_STATS_PARSE_BOLT_NAME =
WfmStatsParseBolt.class.getSimpleName();
27 private static final String SPEAKER_KAFKA_BOLT_NAME =
"speaker.kafka.bolt";
28 private static final String OTSDB_KAFKA_BOLT_NAME =
"otsdb.kafka.bolt";
36 logger.info(
"Creating PortStateTopology - {}",
topologyName);
38 TopologyBuilder builder =
new TopologyBuilder();
56 logger.debug(
"connecting to {} topic", topoDiscoTopic);
60 builder.setBolt(TOPO_DISCO_PARSE_BOLT_NAME, topoDiscoParseBolt,
topologyConfig.getParallelism())
64 builder.setBolt(PARSE_PORT_INFO_BOLT_NAME, parsePortInfoBolt,
topologyConfig.getParallelism())
71 builder.setBolt(OTSDB_KAFKA_BOLT_NAME, openTsdbBolt,
topologyConfig.getParallelism())
72 .shuffleGrouping(PARSE_PORT_INFO_BOLT_NAME);
77 logger.debug(
"connecting to {} topic", wfmStatsTopic);
78 builder.setSpout(WFM_STATS_SPOUT,
createKafkaSpout(wfmStatsTopic, WFM_STATS_SPOUT));
81 builder.setBolt(WFM_STATS_PARSE_BOLT_NAME, wfmStatsParseBolt,
topologyConfig.getParallelism())
82 .shuffleGrouping(WFM_STATS_SPOUT);
86 builder.setSpout(SWITCH_PORTS_SPOUT_NAME, switchPortsSpout);
91 builder.setBolt(SPEAKER_KAFKA_BOLT_NAME, speakerBolt,
topologyConfig.getParallelism())
92 .shuffleGrouping(SWITCH_PORTS_SPOUT_NAME);
94 return builder.createTopology();
101 }
catch (Exception e) {
static final String WFM_TO_PARSE_PORT_INFO_STREAM
static final String TOPO_DISCO_SPOUT
KafkaBolt createKafkaBolt(final String topic)
PortStateTopology(LaunchEnvironment env)
static final String TOPO_TO_PORT_INFO_STREAM
static int handleLaunchException(Exception error)
static void main(String[] args)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
StormTopology createTopology()
final String topologyName
void checkAndCreateTopic(final String topic)