1 package org.openkilda.simulator.bolts;
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Fields;
8 import org.apache.storm.tuple.Tuple;
9 import org.apache.storm.tuple.Values;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
26 private OutputCollector collector;
29 return tuple.getString(0);
32 protected Map<String, Object>
doCommand(Tuple tuple)
throws Exception {
34 List<Values> values =
new ArrayList<>();
54 logger.error(
"Unknown simulator command received: {}\n{}",
55 message.getClass().getName(), tuple.toString());
58 Map<String, Object> map =
new HashMap<>();
59 if (values.size() > 0) {
60 map.put(
"stream", stream);
61 map.put(
"values", values);
68 private List<Integer> emit(Map<String, Object> map) {
69 List<Integer> workers = null;
70 List<Values> values = (List<Values>) map.get(
"values");
71 String stream = (String) map.get(
"stream");
72 if (values.size() > 0) {
73 for (Values
value : values) {
74 workers = collector.emit(stream,
value);
81 List<SwitchMessage> switches =
topology.getSwitches();
82 List<Values> values =
new ArrayList<>();
85 values.add(
new Values(String.format(
"00:00:%s",sw.getDpid()).toLowerCase(), sw,
86 SimulatorCommands.DO_ADD_SWITCH));
87 logger.info(
"Add Switch: {}", sw.getDpid());
93 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
94 this.collector = outputCollector;
100 String tupleSource = tuple.getSourceComponent();
101 switch (tupleSource) {
106 logger.error(
"received command from UNKNOWN source: {}", tupleSource);
108 }
catch (Exception e) {
109 logger.error(e.toString());
111 collector.ack(tuple);
120 new Fields(
"dpid",
"data",
"command"));
static final String SIMULATOR_SPOUT
static final String DO_ADD_LINK
static final ObjectMapper MAPPER
static final String SIMULATOR_COMMAND_STREAM
static final String DO_PORT_MOD
Map< String, Object > doCommand(Tuple tuple)
String getJson(Tuple tuple)
static final String DO_SWITCH_MOD
static final String DO_ADD_SWITCH
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
void execute(Tuple tuple)
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)