1 package org.openkilda.simulator;
9 import org.apache.storm.generated.StormTopology;
10 import org.apache.storm.topology.TopologyBuilder;
11 import org.apache.storm.tuple.Fields;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
16 private final String topoName =
"simulatorTopology";
17 private final int parallelism = 1;
31 private static final Logger logger = LoggerFactory.getLogger(
SimulatorTopology.class);
41 final String simulatorTopic =
topologyConfig.getKafkaSimulatorTopic();
43 final TopologyBuilder builder =
new TopologyBuilder();
45 logger.debug(
"Building SimulatorTopology - {}",
topologyName);
48 logger.debug(
"connecting to {} topic", simulatorTopic);
52 logger.debug(
"connecting to {} topic", inputTopic);
78 return builder.createTopology();
85 }
catch (Exception e) {
static final String COMMAND_BOLT_STREAM
static final String SIMULATOR_SPOUT
static final String KAFKA_BOLT_STREAM
static final String SIMULATOR_COMMAND_STREAM
StormTopology createTopology()
KafkaBolt createKafkaBolt(final String topic)
SimulatorTopology(LaunchEnvironment env)
static final String SWITCH_BOLT
static final String COMMAND_SPOUT
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
static final String COMMAND_BOLT
static final String SWITCH_BOLT_STREAM
static void main(String[] args)
static final String DEPLOY_TOPOLOGY_BOLT_STREAM
static final String SIMULATOR_COMMAND_BOLT
static final String KAFKA_BOLT
final String topologyName
void checkAndCreateTopic(final String topic)