16 package org.openkilda.wfm.topology.nbworker;
29 import org.apache.storm.generated.StormTopology;
30 import org.apache.storm.kafka.bolt.KafkaBolt;
31 import org.apache.storm.kafka.spout.KafkaSpout;
32 import org.apache.storm.topology.TopologyBuilder;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
51 private static final Logger LOGGER = LoggerFactory.getLogger(
NbWorkerTopology.class);
53 private static final String ROUTER_BOLT_NAME =
"router-bolt";
54 private static final String SWITCHES_BOLT_NAME =
"switches-operations-bolt";
55 private static final String LINKS_BOLT_NAME =
"links-operations-bolt";
56 private static final String FLOWS_BOLT_NAME =
"flows-operations-bolt";
57 private static final String SPLITTER_BOLT_NAME =
"response-splitter-bolt";
58 private static final String NB_KAFKA_BOLT_NAME =
"nb-kafka-bolt";
59 private static final String NB_SPOUT_ID =
"nb-spout";
67 LOGGER.info(
"Creating NbWorkerTopology - {}",
topologyName);
69 TopologyBuilder tb =
new TopologyBuilder();
74 tb.setSpout(NB_SPOUT_ID, kafkaSpout, parallelism);
77 tb.setBolt(ROUTER_BOLT_NAME, router, parallelism)
78 .shuffleGrouping(NB_SPOUT_ID);
85 tb.setBolt(SWITCHES_BOLT_NAME, switchesBolt, parallelism)
89 tb.setBolt(LINKS_BOLT_NAME, linksBolt, parallelism)
90 .shuffleGrouping(ROUTER_BOLT_NAME,
StreamType.
ISL.toString());
93 tb.setBolt(FLOWS_BOLT_NAME, flowsBolt, parallelism)
97 tb.setBolt(SPLITTER_BOLT_NAME, splitterBolt, parallelism)
98 .shuffleGrouping(SWITCHES_BOLT_NAME)
99 .shuffleGrouping(LINKS_BOLT_NAME)
100 .shuffleGrouping(FLOWS_BOLT_NAME);
103 tb.setBolt(NB_KAFKA_BOLT_NAME, kafkaNbBolt, parallelism)
104 .shuffleGrouping(SPLITTER_BOLT_NAME);
106 return tb.createTopology();
113 }
catch (Exception e) {
NbWorkerTopology(LaunchEnvironment env)
static void main(String[] args)
KafkaBolt createKafkaBolt(final String topic)
StormTopology createTopology()
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
final ConfigurationProvider configurationProvider
final String topologyName