Open Kilda Java Documentation
IslStatsTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.islstats;
17 
21 
22 import org.apache.storm.generated.StormTopology;
23 import org.apache.storm.kafka.bolt.KafkaBolt;
24 import org.apache.storm.topology.TopologyBuilder;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 
28 public class IslStatsTopology extends AbstractTopology<IslStatsTopologyConfig> {
29  private static final Logger logger = LoggerFactory.getLogger(IslStatsTopology.class);
30 
31  private static final String ISL_STATS_SPOUT_ID = "islstats-spout";
32  private static final String ISL_STATS_OTSDB_BOLT_ID = "islstats-otsdb-bolt";
33  private static final String ISL_STATS_BOLT_ID = IslStatsBolt.class.getSimpleName();
34 
36  super(env, IslStatsTopologyConfig.class);
37  }
38 
39  public StormTopology createTopology() {
40  logger.info("Creating IslStatsTopology - {}", topologyName);
41 
42  TopologyBuilder builder = new TopologyBuilder();
43 
44  String topoDiscoTopic = topologyConfig.getKafkaTopoDiscoTopic();
45  checkAndCreateTopic(topoDiscoTopic);
46 
47  logger.debug("connecting to {} topic", topoDiscoTopic);
48  builder.setSpout(ISL_STATS_SPOUT_ID, createKafkaSpout(topoDiscoTopic, ISL_STATS_SPOUT_ID));
49 
50  IslStatsBolt verifyIslStatsBolt = new IslStatsBolt();
51  logger.debug("starting {} bolt", ISL_STATS_BOLT_ID);
52  builder.setBolt(ISL_STATS_BOLT_ID, verifyIslStatsBolt, topologyConfig.getParallelism())
53  .shuffleGrouping(ISL_STATS_SPOUT_ID);
54 
55  String openTsdbTopic = topologyConfig.getKafkaOtsdbTopic();
56  checkAndCreateTopic(openTsdbTopic);
57  KafkaBolt openTsdbBolt = createKafkaBolt(openTsdbTopic);
58  builder.setBolt(ISL_STATS_OTSDB_BOLT_ID, openTsdbBolt, topologyConfig.getParallelism())
59  .shuffleGrouping(ISL_STATS_BOLT_ID);
60 
61  return builder.createTopology();
62  }
63 
64  public static void main(String[] args) {
65  try {
67  (new IslStatsTopology(env)).setup();
68  } catch (Exception e) {
69  System.exit(handleLaunchException(e));
70  }
71  }
72 }
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)