16 package org.openkilda.wfm.topology.event;
25 import com.google.common.annotations.VisibleForTesting;
26 import org.apache.storm.generated.StormTopology;
27 import org.apache.storm.topology.BoltDeclarer;
28 import org.apache.storm.topology.IStatefulBolt;
29 import org.apache.storm.topology.TopologyBuilder;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import java.util.ArrayList;
34 import java.util.List;
59 private static final String DISCO_BOLT_ID =
OfeLinkBolt.class.getSimpleName();
60 private static final String TOPO_ENG_BOLT_ID =
"topo.eng-bolt";
61 private static final String SPEAKER_BOLT_ID =
"speaker-bolt";
75 logger.info(
"Building OfEventWfmTopology - {}",
topologyName);
77 String kafkaTopoDiscoTopic =
topologyConfig.getKafkaTopoDiscoTopic();
83 TopologyBuilder builder =
new TopologyBuilder();
94 BoltDeclarer bd = builder.setBolt(DISCO_BOLT_ID, bolt,
topologyConfig.getParallelism())
102 List<CtrlBoltRef> ctrlTargets =
new ArrayList<>();
109 return builder.createTopology();
129 }
catch (Exception e) {
OfEventWfmTopology(LaunchEnvironment env)
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
static final String DISCO_SPOUT_ID
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
static void main(String[] args)
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
StormTopology createTopology()
final String topologyName
void checkAndCreateTopic(final String topic)