Open Kilda Java Documentation
PortStateTopology.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.portstate;
2 
10 
11 import org.apache.storm.generated.StormTopology;
12 import org.apache.storm.kafka.bolt.KafkaBolt;
13 import org.apache.storm.topology.TopologyBuilder;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 
17 public class PortStateTopology extends AbstractTopology<PortStateTopologyConfig> {
18  private static final Logger logger = LoggerFactory.getLogger(PortStateTopology.class);
19 
20  public static final String TOPO_DISCO_SPOUT = "topo.disco.spout";
21  private static final String WFM_STATS_SPOUT = "wfm.stats.spout";
22  private static final int JANITOR_REFRESH = 600;
23  private static final String PARSE_PORT_INFO_BOLT_NAME = ParsePortInfoBolt.class.getSimpleName();
24  private static final String TOPO_DISCO_PARSE_BOLT_NAME = TopoDiscoParseBolt.class.getSimpleName();
25  private static final String SWITCH_PORTS_SPOUT_NAME = SwitchPortsSpout.class.getSimpleName();
26  private static final String WFM_STATS_PARSE_BOLT_NAME = WfmStatsParseBolt.class.getSimpleName();
27  private static final String SPEAKER_KAFKA_BOLT_NAME = "speaker.kafka.bolt";
28  private static final String OTSDB_KAFKA_BOLT_NAME = "otsdb.kafka.bolt";
29 
31  super(env, PortStateTopologyConfig.class);
32  }
33 
34  @Override
35  public StormTopology createTopology() throws NameCollisionException {
36  logger.info("Creating PortStateTopology - {}", topologyName);
37 
38  TopologyBuilder builder = new TopologyBuilder();
39 
40  /*
41  * Topology:
42  *
43  * TOPO_DISCO_SPOUT ---> TopoDiscoParseBolt ---> ParsePortInfoBolt ---> OtsdbKafkaBolt(kilda.otsdb topic)
44  * ^
45  * |
46  * WFM_STATS_SPOUT ---> WfmStatsParseBolt -----------------
47  *
48  *
49  * SwitchPortsSpout ---> SpeakerKafkaBolt(kilda.speaker topic)
50  *
51  */
52 
53  // Setup spout and bolt for TOPO_DISCO_SPOUT line
54  String topoDiscoTopic = topologyConfig.getKafkaTopoDiscoTopic();
55  checkAndCreateTopic(topoDiscoTopic);
56  logger.debug("connecting to {} topic", topoDiscoTopic);
57  builder.setSpout(TOPO_DISCO_SPOUT, createKafkaSpout(topoDiscoTopic, TOPO_DISCO_SPOUT));
58 
59  TopoDiscoParseBolt topoDiscoParseBolt = new TopoDiscoParseBolt();
60  builder.setBolt(TOPO_DISCO_PARSE_BOLT_NAME, topoDiscoParseBolt, topologyConfig.getParallelism())
61  .shuffleGrouping(TOPO_DISCO_SPOUT);
62 
63  ParsePortInfoBolt parsePortInfoBolt = new ParsePortInfoBolt();
64  builder.setBolt(PARSE_PORT_INFO_BOLT_NAME, parsePortInfoBolt, topologyConfig.getParallelism())
65  .shuffleGrouping(TOPO_DISCO_PARSE_BOLT_NAME, TopoDiscoParseBolt.TOPO_TO_PORT_INFO_STREAM)
66  .shuffleGrouping(WFM_STATS_PARSE_BOLT_NAME, WfmStatsParseBolt.WFM_TO_PARSE_PORT_INFO_STREAM);
67 
68  String openTsdbTopic = topologyConfig.getKafkaOtsdbTopic();
69  checkAndCreateTopic(openTsdbTopic);
70  KafkaBolt openTsdbBolt = createKafkaBolt(openTsdbTopic);
71  builder.setBolt(OTSDB_KAFKA_BOLT_NAME, openTsdbBolt, topologyConfig.getParallelism())
72  .shuffleGrouping(PARSE_PORT_INFO_BOLT_NAME);
73 
74  // Setup spout and bolt for WFM_STATS_SPOUT line
75  String wfmStatsTopic = topologyConfig.getKafkaStatsTopic();
76  checkAndCreateTopic(wfmStatsTopic);
77  logger.debug("connecting to {} topic", wfmStatsTopic);
78  builder.setSpout(WFM_STATS_SPOUT, createKafkaSpout(wfmStatsTopic, WFM_STATS_SPOUT));
79 
80  WfmStatsParseBolt wfmStatsParseBolt = new WfmStatsParseBolt();
81  builder.setBolt(WFM_STATS_PARSE_BOLT_NAME, wfmStatsParseBolt, topologyConfig.getParallelism())
82  .shuffleGrouping(WFM_STATS_SPOUT);
83 
84  // Setup spout and bolt for sending SwitchPortsCommand every frequency seconds
85  SwitchPortsSpout switchPortsSpout = new SwitchPortsSpout(topologyConfig, JANITOR_REFRESH);
86  builder.setSpout(SWITCH_PORTS_SPOUT_NAME, switchPortsSpout);
87 
88  String speakerTopic = topologyConfig.getKafkaSpeakerTopic();
89  checkAndCreateTopic(speakerTopic);
90  KafkaBolt speakerBolt = createKafkaBolt(speakerTopic);
91  builder.setBolt(SPEAKER_KAFKA_BOLT_NAME, speakerBolt, topologyConfig.getParallelism())
92  .shuffleGrouping(SWITCH_PORTS_SPOUT_NAME);
93 
94  return builder.createTopology();
95  }
96 
97  public static void main(String[] args) {
98  try {
100  (new PortStateTopology(env)).setup();
101  } catch (Exception e) {
102  System.exit(handleLaunchException(e));
103  }
104  }
105 }
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)