Open Kilda Java Documentation
FlowTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow;
17 
35 
36 import org.apache.storm.generated.ComponentObject;
37 import org.apache.storm.generated.StormTopology;
38 import org.apache.storm.kafka.bolt.KafkaBolt;
39 import org.apache.storm.kafka.spout.KafkaSpout;
40 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
41 import org.apache.storm.topology.BoltDeclarer;
42 import org.apache.storm.topology.TopologyBuilder;
43 import org.apache.storm.tuple.Fields;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 
47 import java.util.ArrayList;
48 import java.util.List;
49 
53 public class FlowTopology extends AbstractTopology<FlowTopologyConfig> {
54  public static final String SWITCH_ID_FIELD = "switch-id";
55  public static final String STATUS_FIELD = "status";
56  public static final String ERROR_TYPE_FIELD = "error-type";
57  public static final Fields fieldFlowId = new Fields(Utils.FLOW_ID);
58  public static final Fields fieldSwitchId = new Fields(SWITCH_ID_FIELD);
59  public static final Fields fieldsFlowIdStatus = new Fields(Utils.FLOW_ID, STATUS_FIELD);
60  public static final Fields fieldsMessageFlowId = new Fields(MESSAGE_FIELD, Utils.FLOW_ID);
61  public static final Fields fieldsMessageErrorType = new Fields(MESSAGE_FIELD, ERROR_TYPE_FIELD);
62  public static final Fields fieldsMessageSwitchIdFlowIdTransactionId =
64 
65  private static final Logger logger = LoggerFactory.getLogger(FlowTopology.class);
66 
67  private final PathComputerAuth pathComputerAuth;
68 
70  super(env, FlowTopologyConfig.class);
71 
72  Neo4jConfig neo4jConfig = configurationProvider.getConfiguration(Neo4jConfig.class);
73  pathComputerAuth = new PathComputerAuth(neo4jConfig.getHost(),
74  neo4jConfig.getLogin(), neo4jConfig.getPassword());
75  }
76 
77  public FlowTopology(LaunchEnvironment env, PathComputerAuth pathComputerAuth) {
78  super(env, FlowTopologyConfig.class);
79 
80  this.pathComputerAuth = pathComputerAuth;
81  }
82 
83  @Override
84  public StormTopology createTopology() throws NameCollisionException {
85  logger.info("Creating FlowTopology - {}", topologyName);
86 
87  TopologyBuilder builder = new TopologyBuilder();
88  final List<CtrlBoltRef> ctrlTargets = new ArrayList<>();
89  Integer parallelism = topologyConfig.getParallelism();
90 
91  // builder.setSpout(
92  // ComponentType.LCM_SPOUT.toString(),
93  // createKafkaSpout(config.getKafkaFlowTopic(), ComponentType.LCM_SPOUT.toString()), 1);
94  // builder.setBolt(
95  // ComponentType.LCM_FLOW_SYNC_BOLT.toString(),
96  // new LcmFlowCacheSyncBolt(ComponentType.NORTHBOUND_KAFKA_SPOUT.toString()),
97  // 1)
98  // .shuffleGrouping(ComponentType.NORTHBOUND_KAFKA_SPOUT.toString(), LcmKafkaSpout.STREAM_ID_LCM)
99  // .shuffleGrouping(ComponentType.LCM_SPOUT.toString());
100 
101  /*
102  * Spout receives all Northbound requests.
103  */
104 
105  KafkaSpoutConfig<String, String> kafkaSpoutConfig = makeKafkaSpoutConfigBuilder(
106  ComponentType.NORTHBOUND_KAFKA_SPOUT.toString(), topologyConfig.getKafkaFlowTopic()).build();
107  // (crimi) - commenting out LcmKafkaSpout here due to dying worker
108  //kafkaSpout = new LcmKafkaSpout<>(kafkaSpoutConfig);
109  KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
110  builder.setSpout(ComponentType.NORTHBOUND_KAFKA_SPOUT.toString(), kafkaSpout, parallelism);
111 
112  /*
113  * Bolt splits requests on streams.
114  * It groups requests by flow-id.
115  */
116  SplitterBolt splitterBolt = new SplitterBolt();
117  builder.setBolt(ComponentType.SPLITTER_BOLT.toString(), splitterBolt, parallelism)
118  .shuffleGrouping(ComponentType.NORTHBOUND_KAFKA_SPOUT.toString());
119 
120  /*
121  * Bolt handles flow CRUD operations.
122  * It groups requests by flow-id.
123  */
124  CrudBolt crudBolt = new CrudBolt(pathComputerAuth);
125  ComponentObject.serialized_java(org.apache.storm.utils.Utils.javaSerialize(pathComputerAuth));
126 
127  BoltDeclarer boltSetup = builder.setBolt(ComponentType.CRUD_BOLT.toString(), crudBolt, parallelism)
128  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.CREATE.toString(), fieldFlowId)
129  // TODO: this READ is used for single and for all flows. But all flows shouldn't be fieldsGrouping.
130  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.READ.toString(), fieldFlowId)
131  .shuffleGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.DUMP.toString())
132  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.UPDATE.toString(), fieldFlowId)
133  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.DELETE.toString(), fieldFlowId)
134  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.PUSH.toString(), fieldFlowId)
135  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.UNPUSH.toString(), fieldFlowId)
136  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.REROUTE.toString(), fieldFlowId)
137  // TODO: this CACHE_SYNC shouldn't be fields-grouping - there is no field - it should be all - but
138  // tackle during multi instance testing
139  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.CACHE_SYNC.toString(), fieldFlowId)
140  .fieldsGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.VERIFICATION.toString(), fieldFlowId)
141  .fieldsGrouping(ComponentType.TRANSACTION_BOLT.toString(), StreamType.STATUS.toString(), fieldFlowId)
142  .fieldsGrouping(ComponentType.SPEAKER_BOLT.toString(), StreamType.STATUS.toString(), fieldFlowId)
143  .fieldsGrouping(
145  // .shuffleGrouping(
146  // ComponentType.LCM_FLOW_SYNC_BOLT.toString(), LcmFlowCacheSyncBolt.STREAM_ID_SYNC_FLOW_CACHE);
147  ctrlTargets.add(new CtrlBoltRef(ComponentType.CRUD_BOLT.toString(), crudBolt, boltSetup));
148 
149  /*
150  * Bolt sends cache updates.
151  */
152  KafkaBolt cacheKafkaBolt = createKafkaBolt(topologyConfig.getKafkaTopoCacheTopic());
153  builder.setBolt(ComponentType.CACHE_KAFKA_BOLT.toString(), cacheKafkaBolt, parallelism)
154  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.CREATE.toString())
155  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.UPDATE.toString())
156  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.DELETE.toString())
157  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.STATUS.toString())
158  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.CACHE_SYNC.toString());
159 
160  /*
161  * Spout receives Topology Engine response
162  */
163  // FIXME(surabujin): can be replaced with NORTHBOUND_KAFKA_SPOUT (same topic)
164  KafkaSpout topologyKafkaSpout = createKafkaSpout(
165  topologyConfig.getKafkaFlowTopic(), ComponentType.TOPOLOGY_ENGINE_KAFKA_SPOUT.toString());
166  builder.setSpout(ComponentType.TOPOLOGY_ENGINE_KAFKA_SPOUT.toString(), topologyKafkaSpout, parallelism);
167 
168  /*
169  * Bolt processes Topology Engine responses, groups by flow-id field
170  */
171  TopologyEngineBolt topologyEngineBolt = new TopologyEngineBolt();
172  builder.setBolt(ComponentType.TOPOLOGY_ENGINE_BOLT.toString(), topologyEngineBolt, parallelism)
173  .shuffleGrouping(ComponentType.TOPOLOGY_ENGINE_KAFKA_SPOUT.toString());
174 
175  /*
176  * Bolt sends Speaker requests
177  */
178  KafkaBolt speakerKafkaBolt = createKafkaBolt(topologyConfig.getKafkaSpeakerTopic());
179  builder.setBolt(ComponentType.SPEAKER_KAFKA_BOLT.toString(), speakerKafkaBolt, parallelism)
180  .shuffleGrouping(ComponentType.TRANSACTION_BOLT.toString(), StreamType.CREATE.toString())
181  .shuffleGrouping(ComponentType.TRANSACTION_BOLT.toString(), StreamType.DELETE.toString())
182  .shuffleGrouping(
184 
185  /*
186  * Spout receives Speaker responses
187  */
188  // FIXME(surabujin): can be replaced with NORTHBOUND_KAFKA_SPOUT (same topic)
189  KafkaSpout speakerKafkaSpout = createKafkaSpout(
190  topologyConfig.getKafkaFlowTopic(), ComponentType.SPEAKER_KAFKA_SPOUT.toString());
191  builder.setSpout(ComponentType.SPEAKER_KAFKA_SPOUT.toString(), speakerKafkaSpout, parallelism);
192 
193  /*
194  * Bolt processes Speaker responses, groups by flow-id field
195  */
196  SpeakerBolt speakerBolt = new SpeakerBolt();
197  builder.setBolt(ComponentType.SPEAKER_BOLT.toString(), speakerBolt, parallelism)
198  .shuffleGrouping(ComponentType.SPEAKER_KAFKA_SPOUT.toString());
199 
200  /*
201  * Transaction bolt.
202  */
203  TransactionBolt transactionBolt = new TransactionBolt();
204  boltSetup = builder.setBolt(ComponentType.TRANSACTION_BOLT.toString(), transactionBolt, parallelism)
205  .fieldsGrouping(
207  .fieldsGrouping(
209  .fieldsGrouping(ComponentType.SPEAKER_BOLT.toString(), StreamType.CREATE.toString(), fieldSwitchId)
210  .fieldsGrouping(ComponentType.SPEAKER_BOLT.toString(), StreamType.DELETE.toString(), fieldSwitchId);
211  ctrlTargets.add(new CtrlBoltRef(ComponentType.TRANSACTION_BOLT.toString(), transactionBolt, boltSetup));
212 
213  /*
214  * Verification
215  */
216  builder.setBolt(ComponentType.VERIFICATION_BOLT.toString(), new VerificationBolt(), parallelism)
217  // CrudBolt in chain required to acquire BiFlow object
218  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.VERIFICATION.toString())
219  // Match FL responses and extract flowId into separate tuple field
220  .shuffleGrouping(ComponentType.SPEAKER_BOLT.toString(), SpeakerBolt.STREAM_VERIFICATION_ID);
221 
222  builder.setBolt(ComponentType.VERIFICATION_JOINT_BOLT.toString(), new VerificationJointBolt(), parallelism)
223  // proxy all tuples via VerificationBolt to be able to use fieldsGrouping by flowId field
224  .fieldsGrouping(
226 
227  /*
228  * Error processing bolt
229  */
230  ErrorBolt errorProcessingBolt = new ErrorBolt();
231  builder.setBolt(ComponentType.ERROR_BOLT.toString(), errorProcessingBolt, parallelism)
232  .shuffleGrouping(ComponentType.SPLITTER_BOLT.toString(), StreamType.ERROR.toString())
233  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.ERROR.toString());
234 
235  /*
236  * Bolt forms Northbound responses
237  */
238  NorthboundReplyBolt northboundReplyBolt = new NorthboundReplyBolt();
239  builder.setBolt(ComponentType.NORTHBOUND_REPLY_BOLT.toString(), northboundReplyBolt, parallelism)
240  .shuffleGrouping(ComponentType.CRUD_BOLT.toString(), StreamType.RESPONSE.toString())
241  .shuffleGrouping(ComponentType.ERROR_BOLT.toString(), StreamType.RESPONSE.toString())
242  .shuffleGrouping(
244 
245  /*
246  * Bolt sends Northbound responses
247  */
248  KafkaBolt northboundKafkaBolt = createKafkaBolt(topologyConfig.getKafkaNorthboundTopic());
249  builder.setBolt(ComponentType.NORTHBOUND_KAFKA_BOLT.toString(), northboundKafkaBolt, parallelism)
250  .shuffleGrouping(ComponentType.NORTHBOUND_REPLY_BOLT.toString(), StreamType.RESPONSE.toString());
251 
252  createCtrlBranch(builder, ctrlTargets);
254 
255  // builder.setBolt(
256  // ComponentType.TOPOLOGY_ENGINE_OUTPUT.toString(), createKafkaBolt(config.getKafkaTopoEngTopic()), 1)
257  // .shuffleGrouping(ComponentType.LCM_FLOW_SYNC_BOLT.toString(), LcmFlowCacheSyncBolt.STREAM_ID_TPE);
258 
259  return builder.createTopology();
260  }
261 
265  public static void main(String[] args) {
266  try {
268  (new FlowTopology(env)).setup();
269  } catch (Exception e) {
270  System.exit(handleLaunchException(e));
271  }
272  }
273 }
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
FlowTopology(LaunchEnvironment env, PathComputerAuth pathComputerAuth)
static int handleLaunchException(Exception error)
static final String TRANSACTION_ID
Definition: Utils.java:39
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
def build()
Definition: plan-e.py:73
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
final ConfigurationProvider configurationProvider
static final String FLOW_ID
Definition: Utils.java:61