Open Kilda Java Documentation
AbstractTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology;
17 
18 import static java.lang.String.format;
19 
33 
34 import com.google.common.annotations.VisibleForTesting;
35 import org.apache.kafka.clients.producer.ProducerConfig;
36 import org.apache.kafka.common.serialization.StringDeserializer;
37 import org.apache.storm.Config;
38 import org.apache.storm.LocalCluster;
39 import org.apache.storm.StormSubmitter;
40 import org.apache.storm.kafka.bolt.KafkaBolt;
41 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
42 import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
43 import org.apache.storm.kafka.spout.KafkaSpout;
44 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
45 import org.apache.storm.thrift.TException;
46 import org.apache.storm.topology.BoltDeclarer;
47 import org.apache.storm.topology.TopologyBuilder;
48 import org.apache.storm.tuple.Fields;
49 import org.kohsuke.args4j.CmdLineException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 
53 import java.util.List;
54 import java.util.Optional;
55 import java.util.Properties;
56 
60 public abstract class AbstractTopology<T extends AbstractTopologyConfig> implements Topology {
61  private static final Logger logger = LoggerFactory.getLogger(AbstractTopology.class);
62 
63  public static final String SPOUT_ID_CTRL = "ctrl.in";
64  public static final String BOLT_ID_CTRL_ROUTE = "ctrl.route";
65  public static final String BOLT_ID_CTRL_OUTPUT = "ctrl.out";
66 
67  public static final String MESSAGE_FIELD = "message";
68  public static final Fields fieldMessage = new Fields(MESSAGE_FIELD);
69 
70  protected final String topologyName;
71 
75 
76  protected final T topologyConfig;
77  private final KafkaConfig kafkaConfig;
78 
79  protected AbstractTopology(LaunchEnvironment env, Class<T> topologyConfigClass) {
80  kafkaNamingStrategy = env.getKafkaNamingStrategy();
81  topoNamingStrategy = env.getTopologyNamingStrategy();
82 
83  String defaultTopologyName = getDefaultTopologyName();
84  // Use the default topology name with naming strategy applied only if no specific name provided via CLI.
85  topologyName = Optional.ofNullable(env.getTopologyName())
86  .orElse(topoNamingStrategy.stormTopologyName(defaultTopologyName));
87 
88  configurationProvider = env.getConfigurationProvider(topologyName,
89  TOPOLOGY_PROPERTIES_DEFAULTS_PREFIX + defaultTopologyName);
90 
91  topologyConfig = configurationProvider.getConfiguration(topologyConfigClass);
92  kafkaConfig = configurationProvider.getConfiguration(KafkaConfig.class);
93 
94  logger.debug("Topology built {}: kafka={}, parallelism={}, workers={}",
95  topologyName, kafkaConfig.getHosts(), topologyConfig.getParallelism(),
96  topologyConfig.getWorkers());
97  }
98 
99  protected String getDefaultTopologyName() {
100  return getClass().getSimpleName().toLowerCase();
101  }
102 
103  protected void setup() throws TException, NameCollisionException {
104  if (topologyConfig.getUseLocalCluster()) {
105  setupLocal();
106  } else {
107  setupRemote();
108  }
109  }
110 
111  private void setupRemote() throws TException, NameCollisionException {
112  Config config = makeStormConfig();
113  config.setDebug(false);
114 
115  logger.info("Submit Topology: {}", topologyName);
116  StormSubmitter.submitTopology(topologyName, config, createTopology());
117  }
118 
119  private void setupLocal() throws NameCollisionException {
120  Config config = makeStormConfig();
121  config.setDebug(true);
122 
123  LocalCluster cluster = new LocalCluster();
124  cluster.submitTopology(topologyName, config, createTopology());
125 
126  logger.info("Start Topology: {} (local)", topologyName);
128 
129  cluster.shutdown();
130  }
131 
132  protected static int handleLaunchException(Exception error) {
133  int errorCode;
134 
135  try {
136  throw error;
137  } catch (CmdLineException e) {
138  System.err.println(e.getMessage());
139  System.err.println();
140  System.err.println("Allowed options and arguments:");
141  e.getParser().printUsage(System.err);
142  errorCode = 2;
143  } catch (ConfigurationException e) {
144  System.err.println(e.getMessage());
145  errorCode = 3;
146  } catch (TException e) {
147  logger.error("Unable to complete topology setup: {}", e.getMessage());
148  errorCode = 4;
149  } catch (Exception e) {
150  logger.error("Unhandled exception", e);
151  errorCode = 1;
152  }
153 
154  return errorCode;
155  }
156 
157  private Properties getKafkaProducerProperties() {
158  Properties kafka = new Properties();
159 
160  kafka.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
161  "org.apache.kafka.common.serialization.StringSerializer");
162  kafka.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
163  "org.apache.kafka.common.serialization.StringSerializer");
164  kafka.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getHosts());
165  kafka.setProperty("request.required.acks", "1");
166 
167  return kafka;
168  }
169 
170  protected Config makeStormConfig() {
171  Config stormConfig = new Config();
172 
173  stormConfig.setNumWorkers(topologyConfig.getWorkers());
174  if (topologyConfig.getUseLocalCluster()) {
175  stormConfig.setMaxTaskParallelism(topologyConfig.getParallelism());
176  }
177 
178  return stormConfig;
179  }
180 
181  protected void localExecutionMainLoop() {
182  logger.info("Sleep while local topology is executing");
183  try {
184  Thread.sleep(topologyConfig.getLocalExecutionTime());
185  } catch (InterruptedException e) {
186  logger.warn("Execution process have been interrupted.");
187  }
188  }
189 
190  @Override
191  public final String getTopologyName() {
192  return topologyName;
193  }
194 
195  @VisibleForTesting
196  public final T getConfig() {
197  return topologyConfig;
198  }
199 
205  protected void checkAndCreateTopic(final String topic) {
206  // FIXME(nmarchenko): do we need this? need check
207  }
208 
215  protected KafkaSpout<String, String> createKafkaSpout(String topic, String spoutId) {
216  KafkaSpoutConfig<String, String> config = makeKafkaSpoutConfigBuilder(spoutId, topic)
217  .build();
218 
219  return new KafkaSpout<>(config);
220  }
221 
228  protected KafkaBolt createKafkaBolt(final String topic) {
229  return new KafkaBolt<String, String>()
230  .withProducerProperties(getKafkaProducerProperties())
231  .withTopicSelector(new DefaultTopicSelector(topic))
232  .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
233  }
234 
235  protected void createCtrlBranch(TopologyBuilder builder, List<CtrlBoltRef> targets)
237  String ctrlTopic = topologyConfig.getKafkaCtrlTopic();
238 
239  checkAndCreateTopic(ctrlTopic);
240 
241  KafkaSpout kafkaSpout;
242  kafkaSpout = createKafkaSpout(ctrlTopic, SPOUT_ID_CTRL);
243  builder.setSpout(SPOUT_ID_CTRL, kafkaSpout);
244 
245  RouteBolt route = new RouteBolt(topologyName);
246  builder.setBolt(BOLT_ID_CTRL_ROUTE, route)
247  .shuffleGrouping(SPOUT_ID_CTRL);
248 
249  KafkaBolt kafkaBolt = createKafkaBolt(ctrlTopic);
250  BoltDeclarer outputSetup = builder.setBolt(BOLT_ID_CTRL_OUTPUT, kafkaBolt)
251  .shuffleGrouping(BOLT_ID_CTRL_ROUTE, route.STREAM_ID_ERROR);
252 
253  for (CtrlBoltRef ref : targets) {
254  String boltId = ref.getBoltId();
255  ref.getDeclarer().allGrouping(BOLT_ID_CTRL_ROUTE, route.registerEndpoint(boltId));
256  outputSetup.shuffleGrouping(boltId, ref.getBolt().getCtrlStreamId());
257  }
258  }
259 
266  protected void createHealthCheckHandler(TopologyBuilder builder, String prefix) {
267  String healthCheckTopic = topologyConfig.getKafkaHealthCheckTopic();
268 
269  checkAndCreateTopic(healthCheckTopic);
270 
271  KafkaSpout healthCheckKafkaSpout = createKafkaSpout(healthCheckTopic, prefix);
272  builder.setSpout(prefix + "HealthCheckKafkaSpout", healthCheckKafkaSpout, 1);
273  HealthCheckBolt healthCheckBolt = new HealthCheckBolt(prefix, healthCheckTopic);
274  builder.setBolt(prefix + "HealthCheckBolt", healthCheckBolt, 1)
275  .shuffleGrouping(prefix + "HealthCheckKafkaSpout");
276  KafkaBolt healthCheckKafkaBolt = createKafkaBolt(healthCheckTopic);
277  builder.setBolt(prefix + "HealthCheckKafkaBolt", healthCheckKafkaBolt, 1)
278  .shuffleGrouping(prefix + "HealthCheckBolt", healthCheckTopic);
279  }
280 
281  protected KafkaSpoutConfig.Builder<String, String> makeKafkaSpoutConfigBuilder(String spoutId, String topic) {
282  return new KafkaSpoutConfig.Builder<>(
283  kafkaConfig.getHosts(), StringDeserializer.class, StringDeserializer.class,
285 
286  .setGroupId(makeKafkaGroupName(spoutId))
287  .setRecordTranslator(new KafkaRecordTranslator<>())
288 
289  // NB: There is an issue with using the default of "earliest uncommitted message" -
290  // if we erase the topics, then the committed will be > the latest .. and so
291  // we won't process any messages.
292  // NOW: we'll miss any messages generated while the topology is down.
293  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST);
294  }
295 
296  private String makeKafkaGroupName(String spoutId) {
298  }
299 }
String registerEndpoint(String boltId)
Definition: RouteBolt.java:31
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
AbstractTopology(LaunchEnvironment env, Class< T > topologyConfigClass)
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
final ConfigurationProvider configurationProvider
final TopologyNamingStrategy topoNamingStrategy