Open Kilda Java Documentation
CacheTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.cache;
17 
26 
27 import org.apache.storm.generated.ComponentObject;
28 import org.apache.storm.generated.StormTopology;
29 import org.apache.storm.kafka.bolt.KafkaBolt;
30 import org.apache.storm.kafka.spout.KafkaSpout;
31 import org.apache.storm.topology.BoltDeclarer;
32 import org.apache.storm.topology.TopologyBuilder;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 
36 import java.util.ArrayList;
37 import java.util.List;
38 
39 public class CacheTopology extends AbstractTopology<CacheTopologyConfig> {
40  private static final Logger logger = LoggerFactory.getLogger(CacheTopology.class);
41 
42  private static final String BOLT_ID_COMMON_OUTPUT = "common.out";
43  private static final String BOLT_ID_OFE = "event.out";
44  private static final String BOLT_ID_TOPOLOGY_OUTPUT = "topology.out";
45  static final String BOLT_ID_CACHE = "cache";
46  private static final String SPOUT_ID_COMMON = "generic";
47 // static final String SPOUT_ID_TOPOLOGY = "topology";
48 
50  super(env, CacheTopologyConfig.class);
51  }
52 
56  @Override
57  public StormTopology createTopology() throws NameCollisionException {
58  logger.info("Creating CacheTopology - {}", topologyName);
59 
60  initKafkaTopics();
61 
62  int parallelism = topologyConfig.getParallelism();
63 
64  TopologyBuilder builder = new TopologyBuilder();
65  List<CtrlBoltRef> ctrlTargets = new ArrayList<>();
66 
67  /*
68  * Receives cache from storage.
69  */
70  KafkaSpout kafkaSpout = createKafkaSpout(topologyConfig.getKafkaTopoCacheTopic(), SPOUT_ID_COMMON);
71  builder.setSpout(SPOUT_ID_COMMON, kafkaSpout, parallelism);
72 
73 // (carmine) - as part of 0.8 refactor, merged inputs to one topic, so this isn't neccessary
74 // /*
75 // * Receives cache updates from WFM topology.
76 // */
77 // kafkaSpout = createKafkaSpout(config.getKafkaTopoCacheTopic(), SPOUT_ID_TOPOLOGY);
78 // builder.setSpout(SPOUT_ID_TOPOLOGY, kafkaSpout, parallelism);
79 
80  /*
81  * Stores network cache.
82  */
83  Neo4jConfig neo4jConfig = configurationProvider.getConfiguration(Neo4jConfig.class);
84  Auth pathComputerAuth = new PathComputerAuth(neo4jConfig.getHost(),
85  neo4jConfig.getLogin(), neo4jConfig.getPassword());
86  CacheBolt cacheBolt = new CacheBolt(pathComputerAuth);
87  ComponentObject.serialized_java(org.apache.storm.utils.Utils.javaSerialize(pathComputerAuth));
88  BoltDeclarer boltSetup = builder.setBolt(BOLT_ID_CACHE, cacheBolt, parallelism)
89  .shuffleGrouping(SPOUT_ID_COMMON)
90 // (carmine) as per above comment, only a single input streamt
91 // .shuffleGrouping(SPOUT_ID_TOPOLOGY)
92  ;
93  ctrlTargets.add(new CtrlBoltRef(BOLT_ID_CACHE, cacheBolt, boltSetup));
94 
95  /*
96  * Sends network events to storage.
97  */
98  KafkaBolt kafkaTopoEngBolt = createKafkaBolt(topologyConfig.getKafkaTopoEngTopic());
99  builder.setBolt(BOLT_ID_COMMON_OUTPUT, kafkaTopoEngBolt, parallelism)
100  .shuffleGrouping(BOLT_ID_CACHE, StreamType.TPE.toString());
101 
102  /*
103  * Sends cache dump and reroute requests to `flow` topology.
104  */
105  KafkaBolt kafkaFlowBolt = createKafkaBolt(topologyConfig.getKafkaFlowTopic());
106  builder.setBolt(BOLT_ID_TOPOLOGY_OUTPUT, kafkaFlowBolt, parallelism)
107  .shuffleGrouping(BOLT_ID_CACHE, StreamType.WFM_DUMP.toString());
108 
109  /*
110  * Sends requests for ISL to OFE topology.
111  */
112  // FIXME(surabjin): 2 kafka bold with same topic (see previous bolt)
113  KafkaBolt ofeKafkaBolt = createKafkaBolt(topologyConfig.getKafkaFlowTopic());
114  builder.setBolt(BOLT_ID_OFE, ofeKafkaBolt, parallelism)
115  .shuffleGrouping(BOLT_ID_CACHE, StreamType.OFE.toString());
116 
117  createCtrlBranch(builder, ctrlTargets);
119 
120  return builder.createTopology();
121  }
122 
123  private void initKafkaTopics() {
124  checkAndCreateTopic(topologyConfig.getKafkaFlowTopic());
125  checkAndCreateTopic(topologyConfig.getKafkaTopoEngTopic());
126  checkAndCreateTopic(topologyConfig.getKafkaTopoCacheTopic());
127  }
128 
129  public static void main(String[] args) {
130  try {
132  (new CacheTopology(env)).setup();
133  } catch (Exception e) {
134  System.exit(handleLaunchException(e));
135  }
136  }
137 }
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
final ConfigurationProvider configurationProvider