Open Kilda Java Documentation
StatsTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats;
17 
25 
38 
39 import org.apache.storm.generated.StormTopology;
40 import org.apache.storm.kafka.spout.KafkaSpout;
41 import org.apache.storm.topology.TopologyBuilder;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 
45 
46 public class StatsTopology extends AbstractTopology<StatsTopologyConfig> {
47 
48  private static final Logger logger = LoggerFactory.getLogger(StatsTopology.class);
49 
51  super(env, StatsTopologyConfig.class);
52  }
53 
54  public static void main(String[] args) throws Exception {
55  try {
57  (new StatsTopology(env)).setup();
58  } catch (Exception e) {
59  System.exit(handleLaunchException(e));
60  }
61  }
62 
63  @Override
64  public StormTopology createTopology() {
65  logger.info("Creating StatsTopology - {}", topologyName);
66 
67  final Integer parallelism = topologyConfig.getParallelism();
68  TopologyBuilder builder = new TopologyBuilder();
69 
70 
71  final String kafkaSpoutId = StatsComponentType.STATS_OFS_KAFKA_SPOUT.toString();
72  KafkaSpout kafkaSpout = createKafkaSpout(topologyConfig.getKafkaStatsTopic(), kafkaSpoutId);
73  builder.setSpout(kafkaSpoutId, kafkaSpout, parallelism);
74 
75  SpeakerBolt speakerBolt = new SpeakerBolt();
76  final String statsOfsBolt = StatsComponentType.STATS_OFS_BOLT.toString();
77  builder.setBolt(statsOfsBolt, speakerBolt, parallelism)
78  .shuffleGrouping(kafkaSpoutId);
79 
80  // Spout for listening kilda.speaker topic and collect changes for cache
81  KafkaSpout kafkaSpeakerSpout = createKafkaSpout(topologyConfig.getKafkaSpeakerTopic(),
82  STATS_KILDA_SPEAKER_SPOUT.name());
83  builder.setSpout(STATS_KILDA_SPEAKER_SPOUT.name(), kafkaSpeakerSpout, parallelism);
84 
85  // CacheFilterBolt catch data from kilda.speaker spout and tried to find InstallEgressFlow
86  // or InstallOneSwitchFlow and throw tuple to CacheBolt
87  builder.setBolt(STATS_CACHE_FILTER_BOLT.name(), new CacheFilterBolt(),
88  parallelism)
89  .shuffleGrouping(STATS_KILDA_SPEAKER_SPOUT.name());
90 
91  // Cache bolt get data from NEO4J on start
92  Neo4jConfig neo4jConfig = configurationProvider.getConfiguration(Neo4jConfig.class);
93  AuthNeo4j pathComputerAuth = new PathComputerAuth(neo4jConfig.getHost(),
94  neo4jConfig.getLogin(), neo4jConfig.getPassword());
95  builder.setBolt(STATS_CACHE_BOLT.name(), new CacheBolt(pathComputerAuth), parallelism)
96  .allGrouping(STATS_CACHE_FILTER_BOLT.name(), CACHE_UPDATE.name())
97  .fieldsGrouping(statsOfsBolt, StatsStreamType.FLOW_STATS.toString(), fieldMessage);
98 
99  builder.setBolt(PORT_STATS_METRIC_GEN.name(), new PortMetricGenBolt(), parallelism)
100  .fieldsGrouping(statsOfsBolt, StatsStreamType.PORT_STATS.toString(), fieldMessage);
101  builder.setBolt(METER_CFG_STATS_METRIC_GEN.name(), new MeterConfigMetricGenBolt(),
102  parallelism)
103  .fieldsGrouping(statsOfsBolt, StatsStreamType.METER_CONFIG_STATS.toString(),
104  fieldMessage);
105 
106  logger.debug("starting flow_stats_metric_gen");
107  builder.setBolt(FLOW_STATS_METRIC_GEN.name(),
108  new FlowMetricGenBolt(),
109  parallelism)
110  .fieldsGrouping(STATS_CACHE_BOLT.name(), StatsStreamType.FLOW_STATS.toString(), fieldMessage);
111 
112  String openTsdbTopic = topologyConfig.getKafkaOtsdbTopic();
113  checkAndCreateTopic(openTsdbTopic);
114  builder.setBolt("stats-opentsdb", createKafkaBolt(openTsdbTopic))
115  .shuffleGrouping(PORT_STATS_METRIC_GEN.name())
116  .shuffleGrouping(METER_CFG_STATS_METRIC_GEN.name())
117  .shuffleGrouping(FLOW_STATS_METRIC_GEN.name());
118 
120 
121  return builder.createTopology();
122  }
123 }
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
final ConfigurationProvider configurationProvider