Open Kilda Java Documentation
OfEventWfmTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.event;
17 
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import org.apache.storm.generated.StormTopology;
27 import org.apache.storm.topology.BoltDeclarer;
28 import org.apache.storm.topology.IStatefulBolt;
29 import org.apache.storm.topology.TopologyBuilder;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 
33 import java.util.ArrayList;
34 import java.util.List;
35 
43 public class OfEventWfmTopology extends AbstractTopology<OFEventWfmTopologyConfig> {
44  /*
45  * Progress Tracker - Phase 1: Simple message flow, a little bit of state, re-wire spkr/tpe
46  * (1) √ Switch UP - Simple pass through
47  * (2) ◊ Switch Down - Simple pass through (LinkBolt will stop Link Discovery / Health)
48  * (3) √ Port UP - Simple Pass through (will be picked up by Link bolts, Discovery started)
49  * (4) ◊ Port DOWN - Simple Pass through (LinkBolt will stop Link Discovery / Health)
50  * (5) ◊ Link UP - this will be a response from the Discovery packet.
51  * (6) ◊ Link DOWN - this will be a response from the Discovery packet
52  * (7) ◊ Add simple pass through for verification (w/ speaker) & validation (w/ TPE)
53  */
54 
55  private static Logger logger = LoggerFactory.getLogger(OfEventWfmTopology.class);
56 
57  @VisibleForTesting
58  public static final String DISCO_SPOUT_ID = "disco-spout";
59  private static final String DISCO_BOLT_ID = OfeLinkBolt.class.getSimpleName();
60  private static final String TOPO_ENG_BOLT_ID = "topo.eng-bolt";
61  private static final String SPEAKER_BOLT_ID = "speaker-bolt";
62 
64  super(env, OFEventWfmTopologyConfig.class);
65  }
66 
74  public StormTopology createTopology() throws StreamNameCollisionException {
75  logger.info("Building OfEventWfmTopology - {}", topologyName);
76 
77  String kafkaTopoDiscoTopic = topologyConfig.getKafkaTopoDiscoTopic();
78  String kafkaTopoEngTopic = topologyConfig.getKafkaTopoEngTopic();
79 
80  checkAndCreateTopic(kafkaTopoDiscoTopic);
81  checkAndCreateTopic(kafkaTopoEngTopic);
82 
83  TopologyBuilder builder = new TopologyBuilder();
84 
85  builder.setSpout(DISCO_SPOUT_ID, createKafkaSpout(kafkaTopoDiscoTopic, DISCO_SPOUT_ID));
86 
87  IStatefulBolt bolt = new OfeLinkBolt(topologyConfig);
88 
89  // TODO: resolve the comments below; are there any state issues?
90  // NB: with shuffleGrouping, we can't maintain state .. would need to parse first
91  // just to pull out switchID.
92  // (crimi) - not sure I agree here .. state can be maintained, albeit distributed.
93  //
94  BoltDeclarer bd = builder.setBolt(DISCO_BOLT_ID, bolt, topologyConfig.getParallelism())
95  .shuffleGrouping(DISCO_SPOUT_ID);
96 
97  builder.setBolt(TOPO_ENG_BOLT_ID, createKafkaBolt(kafkaTopoEngTopic),
98  topologyConfig.getParallelism()).shuffleGrouping(DISCO_BOLT_ID, OfeLinkBolt.TOPO_ENG_STREAM);
99  builder.setBolt(SPEAKER_BOLT_ID, createKafkaBolt(topologyConfig.getKafkaSpeakerTopic()),
100  topologyConfig.getParallelism()).shuffleGrouping(DISCO_BOLT_ID, OfeLinkBolt.SPEAKER_STREAM);
101 
102  List<CtrlBoltRef> ctrlTargets = new ArrayList<>();
103  // TODO: verify this ctrlTarget after refactoring.
104  ctrlTargets.add(new CtrlBoltRef(DISCO_BOLT_ID, (ICtrlBolt) bolt, bd));
105  createCtrlBranch(builder, ctrlTargets);
106  // TODO: verify WFM_TOPOLOGY health check
108 
109  return builder.createTopology();
110  }
111 
112  /*
113  * Progress Tracker - Phase 2: Speaker / TPE Integration; Cache Coherency Checks; Flapping
114  * (1) ◊ - Interact with Speaker (network element is / isn't there)
115  * (2) ◊ - Interact with TPE (graph element is / isn't there)
116  * (3) ◊ - Validate the Topology periodically - switches, ports, links
117  * - health checks should validate the known universe; what about missing stuff?
118  * (4) ◊ - See if flapping happens .. define window and if there are greater than 4 up/downs?
119  */
120 
125  public static void main(String[] args) {
126  try {
128  (new OfEventWfmTopology(env)).setup();
129  } catch (Exception e) {
130  System.exit(handleLaunchException(e));
131  }
132  }
133 }
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)