Open Kilda Java Documentation
NbWorkerTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.nbworker;
17 
28 
29 import org.apache.storm.generated.StormTopology;
30 import org.apache.storm.kafka.bolt.KafkaBolt;
31 import org.apache.storm.kafka.spout.KafkaSpout;
32 import org.apache.storm.topology.TopologyBuilder;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 
49 public class NbWorkerTopology extends AbstractTopology<NbWorkerTopologyConfig> {
50 
51  private static final Logger LOGGER = LoggerFactory.getLogger(NbWorkerTopology.class);
52 
53  private static final String ROUTER_BOLT_NAME = "router-bolt";
54  private static final String SWITCHES_BOLT_NAME = "switches-operations-bolt";
55  private static final String LINKS_BOLT_NAME = "links-operations-bolt";
56  private static final String FLOWS_BOLT_NAME = "flows-operations-bolt";
57  private static final String SPLITTER_BOLT_NAME = "response-splitter-bolt";
58  private static final String NB_KAFKA_BOLT_NAME = "nb-kafka-bolt";
59  private static final String NB_SPOUT_ID = "nb-spout";
60 
62  super(env, NbWorkerTopologyConfig.class);
63  }
64 
65  @Override
66  public StormTopology createTopology() {
67  LOGGER.info("Creating NbWorkerTopology - {}", topologyName);
68 
69  TopologyBuilder tb = new TopologyBuilder();
70 
71  final Integer parallelism = topologyConfig.getParallelism();
72 
73  KafkaSpout kafkaSpout = createKafkaSpout(topologyConfig.getKafkaTopoNbTopic(), NB_SPOUT_ID);
74  tb.setSpout(NB_SPOUT_ID, kafkaSpout, parallelism);
75 
76  RouterBolt router = new RouterBolt();
77  tb.setBolt(ROUTER_BOLT_NAME, router, parallelism)
78  .shuffleGrouping(NB_SPOUT_ID);
79 
80  Neo4jConfig neo4jConfig = configurationProvider.getConfiguration(Neo4jConfig.class);
81  Auth pathComputerAuth = new PathComputerAuth(neo4jConfig.getHost(),
82  neo4jConfig.getLogin(), neo4jConfig.getPassword());
83 
84  SwitchOperationsBolt switchesBolt = new SwitchOperationsBolt(pathComputerAuth);
85  tb.setBolt(SWITCHES_BOLT_NAME, switchesBolt, parallelism)
86  .shuffleGrouping(ROUTER_BOLT_NAME, StreamType.SWITCH.toString());
87 
88  LinkOperationsBolt linksBolt = new LinkOperationsBolt(pathComputerAuth);
89  tb.setBolt(LINKS_BOLT_NAME, linksBolt, parallelism)
90  .shuffleGrouping(ROUTER_BOLT_NAME, StreamType.ISL.toString());
91 
92  FlowOperationsBolt flowsBolt = new FlowOperationsBolt(pathComputerAuth);
93  tb.setBolt(FLOWS_BOLT_NAME, flowsBolt, parallelism)
94  .shuffleGrouping(ROUTER_BOLT_NAME, StreamType.FLOW.toString());
95 
96  ResponseSplitterBolt splitterBolt = new ResponseSplitterBolt();
97  tb.setBolt(SPLITTER_BOLT_NAME, splitterBolt, parallelism)
98  .shuffleGrouping(SWITCHES_BOLT_NAME)
99  .shuffleGrouping(LINKS_BOLT_NAME)
100  .shuffleGrouping(FLOWS_BOLT_NAME);
101 
102  KafkaBolt kafkaNbBolt = createKafkaBolt(topologyConfig.getKafkaNorthboundTopic());
103  tb.setBolt(NB_KAFKA_BOLT_NAME, kafkaNbBolt, parallelism)
104  .shuffleGrouping(SPLITTER_BOLT_NAME);
105 
106  return tb.createTopology();
107  }
108 
109  public static void main(String[] args) {
110  try {
112  new NbWorkerTopology(env).setup();
113  } catch (Exception e) {
114  System.exit(handleLaunchException(e));
115  }
116  }
117 
118 }
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
final ConfigurationProvider configurationProvider