Open Kilda Java Documentation
SimulatorCommandBolt.java
Go to the documentation of this file.
1 package org.openkilda.simulator.bolts;
2 
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Fields;
8 import org.apache.storm.tuple.Tuple;
9 import org.apache.storm.tuple.Values;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 
24 public class SimulatorCommandBolt extends BaseRichBolt {
25  private static final Logger logger =LoggerFactory.getLogger(SimulatorCommandBolt.class);
26  private OutputCollector collector;
27 
28  protected String getJson(Tuple tuple) {
29  return tuple.getString(0);
30  }
31 
32  protected Map<String, Object> doCommand(Tuple tuple) throws Exception {
33  SimulatorMessage message = Utils.MAPPER.readValue(getJson(tuple), SimulatorMessage.class);
34  List<Values> values = new ArrayList<>();
35  String stream = "";
36  if (message instanceof SwitchModMessage) {
37  Values value = new Values(((SwitchModMessage) message).getDpid(), message, SimulatorCommands.DO_SWITCH_MOD);
38  values.add(value);
40  } else if (message instanceof PortModMessage) {
41  Values value = new Values(((PortModMessage) message).getDpid(), message, SimulatorCommands.DO_PORT_MOD);
42  values.add(value);
44  } else if (message instanceof TopologyMessage) {
46  values = createTopology((TopologyMessage) message);
47  } else if (message instanceof AddLinkCommandMessage) {
49  values.add(new Values(((AddLinkCommandMessage) message).getDpid(), message, SimulatorCommands.DO_ADD_LINK));
50  } else if (message instanceof AddSwitchCommand) {
52  values.add(new Values(((AddSwitchCommand) message).getDpid(), message, SimulatorCommands.DO_ADD_SWITCH));
53  }else {
54  logger.error("Unknown simulator command received: {}\n{}",
55  message.getClass().getName(), tuple.toString());
56  }
57 
58  Map<String, Object> map = new HashMap<>();
59  if (values.size() > 0) {
60  map.put("stream", stream);
61  map.put("values", values);
62  }
63 
64  return map;
65  }
66 
67  //private List<Integer> emit(String stream, Tuple tuple, List<Values> values) throws Exception {
68  private List<Integer> emit(Map<String, Object> map) {
69  List<Integer> workers = null;
70  List<Values> values = (List<Values>) map.get("values");
71  String stream = (String) map.get("stream");
72  if (values.size() > 0) {
73  for (Values value : values) {
74  workers = collector.emit(stream, value);
75  }
76  }
77  return workers;
78  }
79 
80  private List<Values> createTopology(TopologyMessage topology) {
81  List<SwitchMessage> switches = topology.getSwitches();
82  List<Values> values = new ArrayList<>();
83  for (SwitchMessage sw : switches) {
84  // Need to pre-pend 00:00 as DPID as returned by OpenflowJ has 8 octets
85  values.add(new Values(String.format("00:00:%s",sw.getDpid()).toLowerCase(), sw,
86  SimulatorCommands.DO_ADD_SWITCH));
87  logger.info("Add Switch: {}", sw.getDpid());
88  }
89  return values;
90  }
91 
92  @Override
93  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
94  this.collector = outputCollector;
95  }
96 
97  @Override
98  public void execute(Tuple tuple) {
99  try {
100  String tupleSource = tuple.getSourceComponent();
101  switch (tupleSource) {
103  emit(doCommand(tuple));
104  break;
105  default:
106  logger.error("received command from UNKNOWN source: {}", tupleSource);
107  }
108  } catch (Exception e) {
109  logger.error(e.toString());
110  } finally {
111  collector.ack(tuple);
112  }
113  }
114 
115  @Override
116  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
117 // outputFieldsDeclarer.declareStream(SimulatorTopology.SIMULATOR_COMMAND_STREAM,
118 // new Fields("dpid", "data"));
119  outputFieldsDeclarer.declareStream(SimulatorTopology.SIMULATOR_COMMAND_STREAM,
120  new Fields("dpid", "data", "command"));
121  }
122 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
value
Definition: nodes.py:62
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)