16 package org.openkilda.wfm.topology.flow;
36 import org.apache.storm.generated.ComponentObject;
37 import org.apache.storm.generated.StormTopology;
38 import org.apache.storm.kafka.bolt.KafkaBolt;
39 import org.apache.storm.kafka.spout.KafkaSpout;
40 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
41 import org.apache.storm.topology.BoltDeclarer;
42 import org.apache.storm.topology.TopologyBuilder;
43 import org.apache.storm.tuple.Fields;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import java.util.ArrayList;
48 import java.util.List;
65 private static final Logger logger = LoggerFactory.getLogger(
FlowTopology.class);
80 this.pathComputerAuth = pathComputerAuth;
87 TopologyBuilder builder =
new TopologyBuilder();
88 final List<CtrlBoltRef> ctrlTargets =
new ArrayList<>();
109 KafkaSpout<String, String> kafkaSpout =
new KafkaSpout<>(kafkaSpoutConfig);
125 ComponentObject.serialized_java(
org.apache.storm.utils.Utils.javaSerialize(pathComputerAuth));
259 return builder.createTopology();
269 }
catch (Exception e) {
static final String ERROR_TYPE_FIELD
static final String STATUS_FIELD
static final Fields fieldsMessageErrorType
static void main(String[] args)
static final String SWITCH_ID_FIELD
static final String STREAM_RESPONSE_ID
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
static final Fields fieldsMessageFlowId
static final String MESSAGE_FIELD
void createHealthCheckHandler(TopologyBuilder builder, String prefix)
KafkaBolt createKafkaBolt(final String topic)
static final Fields fieldFlowId
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
FlowTopology(LaunchEnvironment env, PathComputerAuth pathComputerAuth)
FlowTopology(LaunchEnvironment env)
static int handleLaunchException(Exception error)
static final String TRANSACTION_ID
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
StormTopology createTopology()
static final String STREAM_ID_PROXY
void createCtrlBranch(TopologyBuilder builder, List< CtrlBoltRef > targets)
static final String STREAM_VERIFICATION_ID
TOPOLOGY_ENGINE_KAFKA_SPOUT
final ConfigurationProvider configurationProvider
final String topologyName
static final Fields fieldSwitchId
static final String FLOW_ID
static final String STREAM_SPEAKER_ID
static final Fields fieldsFlowIdStatus