Open Kilda Java Documentation
SimulatorTopology.java
Go to the documentation of this file.
1 package org.openkilda.simulator;
2 
8 
9 import org.apache.storm.generated.StormTopology;
10 import org.apache.storm.topology.TopologyBuilder;
11 import org.apache.storm.tuple.Fields;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 
15 public class SimulatorTopology extends AbstractTopology<SimulatorTopologyConfig> {
16  private final String topoName = "simulatorTopology";
17  private final int parallelism = 1;
18 
19  public static final String SIMULATOR_SPOUT = "simulator-spout";
20  public static final String COMMAND_SPOUT = "command-spout";
21  public static final String DEPLOY_TOPOLOGY_BOLT_STREAM = "deploy_topology_stream";
22  public static final String COMMAND_BOLT_STREAM = "command_bolt_stream";
23  public static final String COMMAND_BOLT = "command_bolt";
24  public static final String SWITCH_BOLT = "switch_bolt";
25  public static final String SWITCH_BOLT_STREAM = "switch_bolt_stream";
26  public static final String KAFKA_BOLT = "kafka_bolt";
27  public static final String KAFKA_BOLT_STREAM = "kafka_bolt_stream";
28  public static final String SIMULATOR_COMMAND_BOLT = "simulator_command_bolt";
29  public static final String SIMULATOR_COMMAND_STREAM = "simulator_command_stream";
30 
31  private static final Logger logger = LoggerFactory.getLogger(SimulatorTopology.class);
32 
34  super(env, SimulatorTopologyConfig.class);
35  }
36 
37  @Override
38  public StormTopology createTopology() {
39  final int parallelism = topologyConfig.getParallelism();
40  final String inputTopic = topologyConfig.getKafkaSpeakerTopic();
41  final String simulatorTopic = topologyConfig.getKafkaSimulatorTopic();
42 
43  final TopologyBuilder builder = new TopologyBuilder();
44 
45  logger.debug("Building SimulatorTopology - {}", topologyName);
46 
47  checkAndCreateTopic(simulatorTopic);
48  logger.debug("connecting to {} topic", simulatorTopic);
49  builder.setSpout(SIMULATOR_SPOUT, createKafkaSpout(simulatorTopic, SIMULATOR_SPOUT));
50 
51  checkAndCreateTopic(inputTopic);
52  logger.debug("connecting to {} topic", inputTopic);
53  builder.setSpout(COMMAND_SPOUT, createKafkaSpout(inputTopic, COMMAND_SPOUT));
54 
55  CommandBolt commandBolt = new CommandBolt();
56  logger.debug("starting " + COMMAND_BOLT + " bolt");
57  builder.setBolt(COMMAND_BOLT, commandBolt, parallelism)
58  .shuffleGrouping(SIMULATOR_SPOUT)
59  .shuffleGrouping(COMMAND_SPOUT);
60 
61  SimulatorCommandBolt simulatorCommandBolt = new SimulatorCommandBolt();
62  logger.debug("starting " + SIMULATOR_COMMAND_BOLT + " bolt");
63  builder.setBolt(SIMULATOR_COMMAND_BOLT, simulatorCommandBolt, parallelism)
64  .shuffleGrouping(SIMULATOR_SPOUT);
65 
66  SpeakerBolt speakerBolt = new SpeakerBolt();
67  logger.debug("starting " + SWITCH_BOLT + " bolt");
68  builder.setBolt(SWITCH_BOLT, speakerBolt, 1)
69  .fieldsGrouping(COMMAND_BOLT, COMMAND_BOLT_STREAM, new Fields("dpid"))
70  .fieldsGrouping(SWITCH_BOLT, SWITCH_BOLT_STREAM, new Fields("dpid"))
71  .fieldsGrouping(SIMULATOR_COMMAND_BOLT, SIMULATOR_COMMAND_STREAM, new Fields("dpid"));
72 
73  // TODO(dbogun): check is it must be output topic
74  checkAndCreateTopic(inputTopic);
75  builder.setBolt(KAFKA_BOLT, createKafkaBolt(inputTopic), parallelism)
76  .shuffleGrouping(SWITCH_BOLT, KAFKA_BOLT_STREAM);
77 
78  return builder.createTopology();
79  }
80 
81  public static void main(String[] args) {
82  try {
84  (new SimulatorTopology(env)).setup();
85  } catch (Exception e) {
86  System.exit(handleLaunchException(e));
87  }
88  }
89 }
KafkaBolt createKafkaBolt(final String topic)
static int handleLaunchException(Exception error)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)