16 package org.openkilda.wfm.topology;
18 import static java.lang.String.format;
34 import com.google.common.annotations.VisibleForTesting;
35 import org.apache.kafka.clients.producer.ProducerConfig;
36 import org.apache.kafka.common.serialization.StringDeserializer;
37 import org.apache.storm.Config;
38 import org.apache.storm.LocalCluster;
39 import org.apache.storm.StormSubmitter;
40 import org.apache.storm.kafka.bolt.KafkaBolt;
41 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
42 import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
43 import org.apache.storm.kafka.spout.KafkaSpout;
44 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
45 import org.apache.storm.thrift.TException;
46 import org.apache.storm.topology.BoltDeclarer;
47 import org.apache.storm.topology.TopologyBuilder;
48 import org.apache.storm.tuple.Fields;
49 import org.kohsuke.args4j.CmdLineException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 import java.util.List;
54 import java.util.Optional;
55 import java.util.Properties;
61 private static final Logger logger = LoggerFactory.getLogger(
AbstractTopology.class);
94 logger.debug(
"Topology built {}: kafka={}, parallelism={}, workers={}",
100 return getClass().getSimpleName().toLowerCase();
119 private void setupLocal() throws NameCollisionException {
123 LocalCluster cluster =
new LocalCluster();
126 logger.info(
"Start Topology: {} (local)",
topologyName);
137 }
catch (CmdLineException e) {
138 System.err.println(e.getMessage());
139 System.err.println();
140 System.err.println(
"Allowed options and arguments:");
141 e.getParser().printUsage(System.err);
144 System.err.println(e.getMessage());
146 }
catch (TException e) {
147 logger.error(
"Unable to complete topology setup: {}", e.getMessage());
149 }
catch (Exception e) {
150 logger.error(
"Unhandled exception", e);
157 private Properties getKafkaProducerProperties() {
158 Properties kafka =
new Properties();
160 kafka.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
161 "org.apache.kafka.common.serialization.StringSerializer");
162 kafka.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
163 "org.apache.kafka.common.serialization.StringSerializer");
164 kafka.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.
getHosts());
165 kafka.setProperty(
"request.required.acks",
"1");
171 Config stormConfig =
new Config();
175 stormConfig.setMaxTaskParallelism(
topologyConfig.getParallelism());
182 logger.info(
"Sleep while local topology is executing");
185 }
catch (InterruptedException e) {
186 logger.warn(
"Execution process have been interrupted.");
219 return new KafkaSpout<>(
config);
229 return new KafkaBolt<String, String>()
230 .withProducerProperties(getKafkaProducerProperties())
231 .withTopicSelector(
new DefaultTopicSelector(
topic))
232 .withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper<>());
241 KafkaSpout kafkaSpout;
254 String boltId = ref.getBoltId();
256 outputSetup.shuffleGrouping(boltId, ref.getBolt().getCtrlStreamId());
267 String healthCheckTopic =
topologyConfig.getKafkaHealthCheckTopic();
271 KafkaSpout healthCheckKafkaSpout =
createKafkaSpout(healthCheckTopic, prefix);
272 builder.setSpout(prefix +
"HealthCheckKafkaSpout", healthCheckKafkaSpout, 1);
274 builder.setBolt(prefix +
"HealthCheckBolt", healthCheckBolt, 1)
275 .shuffleGrouping(prefix +
"HealthCheckKafkaSpout");
277 builder.setBolt(prefix +
"HealthCheckKafkaBolt", healthCheckKafkaBolt, 1)
278 .shuffleGrouping(prefix +
"HealthCheckBolt", healthCheckTopic);
282 return new KafkaSpoutConfig.Builder<>(
283 kafkaConfig.
getHosts(), StringDeserializer.class, StringDeserializer.class,
286 .setGroupId(makeKafkaGroupName(spoutId))
293 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST);
296 private String makeKafkaGroupName(String spoutId) {
String registerEndpoint(String boltId)
static final String SPOUT_ID_CTRL
final KafkaNamingStrategy kafkaNamingStrategy
String kafkaConsumerGroupName(String consumerGroup)
static final String MESSAGE_FIELD
static final String BOLT_ID_CTRL_OUTPUT
static final String BOLT_ID_CTRL_ROUTE
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static final Fields fieldMessage
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
void localExecutionMainLoop()
final String getTopologyName()
static int handleLaunchException(Exception error)
String stormTopologyName(String topology)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
final String STREAM_ID_ERROR
String getDefaultTopologyName()
AbstractTopology(LaunchEnvironment env, Class< T > topologyConfigClass)
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
String TOPOLOGY_PROPERTIES_DEFAULTS_PREFIX
final ConfigurationProvider configurationProvider
final String topologyName
final TopologyNamingStrategy topoNamingStrategy
StormTopology createTopology()
void checkAndCreateTopic(final String topic)