16 package org.openkilda.wfm.topology.cache;
27 import org.apache.storm.generated.ComponentObject;
28 import org.apache.storm.generated.StormTopology;
29 import org.apache.storm.kafka.bolt.KafkaBolt;
30 import org.apache.storm.kafka.spout.KafkaSpout;
31 import org.apache.storm.topology.BoltDeclarer;
32 import org.apache.storm.topology.TopologyBuilder;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import java.util.ArrayList;
37 import java.util.List;
40 private static final Logger logger = LoggerFactory.getLogger(
CacheTopology.class);
42 private static final String BOLT_ID_COMMON_OUTPUT =
"common.out";
43 private static final String BOLT_ID_OFE =
"event.out";
44 private static final String BOLT_ID_TOPOLOGY_OUTPUT =
"topology.out";
45 static final String BOLT_ID_CACHE =
"cache";
46 private static final String SPOUT_ID_COMMON =
"generic";
58 logger.info(
"Creating CacheTopology - {}",
topologyName);
64 TopologyBuilder builder =
new TopologyBuilder();
65 List<CtrlBoltRef> ctrlTargets =
new ArrayList<>();
71 builder.setSpout(SPOUT_ID_COMMON, kafkaSpout, parallelism);
87 ComponentObject.serialized_java(
org.apache.storm.utils.Utils.javaSerialize(pathComputerAuth));
88 BoltDeclarer boltSetup = builder.setBolt(BOLT_ID_CACHE, cacheBolt, parallelism)
89 .shuffleGrouping(SPOUT_ID_COMMON)
93 ctrlTargets.add(
new CtrlBoltRef(BOLT_ID_CACHE, cacheBolt, boltSetup));
99 builder.setBolt(BOLT_ID_COMMON_OUTPUT, kafkaTopoEngBolt, parallelism)
106 builder.setBolt(BOLT_ID_TOPOLOGY_OUTPUT, kafkaFlowBolt, parallelism)
114 builder.setBolt(BOLT_ID_OFE, ofeKafkaBolt, parallelism)
120 return builder.createTopology();
123 private void initKafkaTopics() {
133 }
catch (Exception e) {
static void main(String[] args)
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
CacheTopology(LaunchEnvironment env)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
StormTopology createTopology()
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
final ConfigurationProvider configurationProvider
final String topologyName
void checkAndCreateTopic(final String topic)