Open Kilda Java Documentation
SpeakerBolt.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.simulator.bolts;
17 
44 
45 import org.apache.storm.task.OutputCollector;
46 import org.apache.storm.task.TopologyContext;
47 import org.apache.storm.topology.OutputFieldsDeclarer;
48 import org.apache.storm.topology.base.BaseRichBolt;
49 import org.apache.storm.tuple.Fields;
50 import org.apache.storm.tuple.Tuple;
51 import org.apache.storm.tuple.Values;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 
55 import java.io.IOException;
56 import java.time.Instant;
57 import java.util.ArrayList;
58 import java.util.HashMap;
59 import java.util.List;
60 import java.util.Map;
61 import java.util.UUID;
62 
63 public class SpeakerBolt extends BaseRichBolt {
64  private static final Logger logger = LoggerFactory.getLogger(SpeakerBolt.class);
65  private OutputCollector collector;
66  protected Map<SwitchId, ISwitchImpl> switches;
67 
68  public enum TupleFields {
71  }
72 
73  protected String makeSwitchMessage(ISwitchImpl sw, SwitchState state) throws IOException {
75  new SwitchId(sw.getDpid().toString()),
76  state,
77  "192.168.0.1", // TODO: need to create these on the fly
78  "sw" + sw.getDpid().toString(),
79  "Simulated Switch",
80  "SimulatorTopology"
81  );
82  InfoMessage message = new InfoMessage(
83  data,
84  Instant.now().toEpochMilli(),
85  UUID.randomUUID().toString(),
86  null);
87  return Utils.MAPPER.writeValueAsString(message);
88  }
89 
90  protected String makePortMessage(ISwitchImpl sw, int portNum, PortChangeType type) throws IOException {
92  new SwitchId(sw.getDpid().toString()),
93  portNum,
94  type
95  );
96  InfoMessage message = new InfoMessage(
97  data,
98  Instant.now().toEpochMilli(),
99  UUID.randomUUID().toString(),
100  null);
101  return Utils.MAPPER.writeValueAsString(message);
102  }
103 
104  protected List<Values> addSwitch(AddSwitchCommand data) throws Exception {
105  List<Values> values = new ArrayList<>();
106  SwitchId dpid = data.getDpid();
107  if (switches.get(dpid) == null) {
108  ISwitchImpl sw = new ISwitchImpl(dpid, data.getNumOfPorts(), PortStateType.DOWN);
109  switches.put(new SwitchId(sw.getDpid().toString()), sw);
110  values.add(new Values("INFO", makeSwitchMessage(sw, SwitchState.ADDED)));
111  values.add(new Values("INFO", makeSwitchMessage(sw, SwitchState.ACTIVATED)));
112  }
113  return values;
114  }
115 
116  protected List<Values> addSwitch(SwitchMessage switchMessage) throws Exception {
117  ISwitchImpl sw = switches.get(switchMessage.getDpid());
118  List<Values> values = new ArrayList<>();
119 
120  if (sw == null) {
121  logger.info("switch does not exist, adding it");
122  sw = new ISwitchImpl(switchMessage.getDpid(),
123  switchMessage.getNumOfPorts(), PortStateType.DOWN);
124  sw.activate();
125 
126  List<LinkMessage> links = switchMessage.getLinks();
127  for (LinkMessage l : links) {
128  IPortImpl localPort = sw.getPort(l.getLocalPort());
129  localPort.setLatency(l.getLatency());
130  localPort.setPeerPortNum(l.getPeerPort());
131  localPort.setPeerSwitch(l.getPeerSwitch());
132  localPort.enable();
133  }
134 
135  switches.put(new SwitchId(sw.getDpid().toString()), sw);
136 
137  values.add(new Values("INFO", makeSwitchMessage(sw, SwitchState.ADDED)));
138  values.add(new Values("INFO", makeSwitchMessage(sw, SwitchState.ACTIVATED)));
139 
140  for (IPortImpl p : sw.getPorts()) {
141  PortChangeType changeType =
142  p.isActive() ? PortChangeType.UP : PortChangeType.DOWN; //TODO: see if OF sends DOWN
143  if (changeType == PortChangeType.UP) {
144  values.add(new Values("INFO", makePortMessage(sw, p.getNumber(), changeType)));
145  }
146  }
147  }
148  return values;
149  }
150 
151  protected void discoverIsl(Tuple tuple, DiscoverIslCommandData data) throws Exception {
152  /*
153  * This process is a bit screwy and does put a loop in the topology:
154  *
155  * 1. Determine if the source switch is up and the source port is an Active ISL port
156  * 2. Create the IslInfoData package as if it is a working ISL (both ports are active)
157  * 3. Emit tha IslInfoData back to SpeakerBolt with fields grouping but keyed on the second switch to
158  * ensure that the tuple goes to the instance which has that switch in it's switches Map and set command
159  * to DiscoverIslP2
160  */
161 
162  ISwitchImpl sw = getSwitch(data.getSwitchId());
163  if (!sw.isActive()) {
164  return;
165  }
166  IPortImpl localPort = sw.getPort(data.getPortNumber());
167 
168  if (localPort.isActiveIsl()) {
169  List<PathNode> path = new ArrayList<>();
170  PathNode path1 = new PathNode(new SwitchId(sw.getDpid().toString()), localPort.getNumber(), 0);
171  path1.setSegLatency(localPort.getLatency());
172  PathNode path2 = new PathNode(new SwitchId(localPort.getPeerSwitch()), localPort.getPeerPortNum(), 1);
173  path.add(path1);
174  path.add(path2);
175  IslInfoData islInfoData = new IslInfoData(
176  localPort.getLatency(),
177  path,
178  100000,
180  100000);
181  collector.emit(SimulatorTopology.SWITCH_BOLT_STREAM, tuple,
182  new Values(
183  localPort.getPeerSwitch().toLowerCase(),
185  islInfoData));
186  }
187  }
188 
189  protected void discoverIslPartTwo(Tuple tuple, IslInfoData data) throws Exception {
190  /*
191  * Second part of the discover process.
192  *
193  * 1. Grabs a message that has been sent from the first part and thus we know that the source port is
194  * and active ISL.
195  * 2. Check the status of the destination port, in Path[1], and if activeISL then emit to Kafka
196  */
197  ISwitchImpl sw = getSwitch(data.getPath().get(1).getSwitchId());
198  if (!sw.isActive()) {
199  return;
200  }
201  IPortImpl port = sw.getPort(data.getPath().get(1).getPortNo());
202 
203  if (port.isActiveIsl()) {
204  long now = Instant.now().toEpochMilli();
205  InfoMessage infoMessage = new InfoMessage(data, now, "system", null);
206  logger.debug("checking isl on: {}", data.toString());
207  collector.emit(SimulatorTopology.KAFKA_BOLT_STREAM, tuple,
208  new Values("INFO", Utils.MAPPER.writeValueAsString(infoMessage)));
209  }
210  }
211 
212  protected List<Values> addLink(AddLinkCommandMessage message) throws Exception {
213  ISwitchImpl sw;
214  IPortImpl port;
215 
216  sw = getSwitch(message.getDpid());
217  port = sw.getPort(message.getLink().getLocalPort());
218  port.setLatency(message.getLink().getLatency());
219  port.setPeerSwitch(message.getLink().getPeerSwitch());
220  port.setPeerPortNum(message.getLink().getPeerPort());
221  port.enable();
222 
223  List<Values> values = new ArrayList<>();
224  values.add(new Values("INFO", Utils.MAPPER.writeValueAsString(port.makePorChangetMessage())));
225 
226  return values;
227  }
228 
229  protected List<Values> modPort(PortModMessage message) throws Exception {
230  List<Values> values = new ArrayList<>();
231  ISwitchImpl sw = getSwitch(message.getDpid());
232  IPortImpl port = sw.getPort(message.getPortNum());
233  port.modPort(message);
234  values.add(new Values("INFO", Utils.MAPPER.writeValueAsString(port.makePorChangetMessage())));
235  return values;
236  }
237 
245  public ISwitchImpl getSwitch(SwitchId name) throws Exception {
246  ISwitchImpl sw = switches.get(name);
247  if (sw == null) {
248  throw new SimulatorException(String.format("Switch %s not found", name));
249  }
250  return sw;
251  }
252 
259  public void doSimulatorCommand(Tuple tuple) throws Exception {
260  List<Values> values = new ArrayList<>();
261  if (tuple.getFields().contains("command")) {
262  String command = tuple.getStringByField("command");
263  switch (command) {
265  //TODO: this is an ugly hack...
266  if (tuple.getValueByField("data") instanceof AddSwitchCommand) {
267  values = addSwitch((AddSwitchCommand) tuple.getValueByField("data"));
268  } else {
269  values = addSwitch((SwitchMessage) tuple.getValueByField("data"));
270  }
271  break;
273  values = addLink((AddLinkCommandMessage) tuple.getValueByField("data"));
274  break;
276  values = modPort((PortModMessage) tuple.getValueByField("data"));
277  break;
278  default:
279  logger.error(String.format("Uknown SimulatorCommand %s", command));
280  }
281  } else {
282  SimulatorMessage message = (SimulatorMessage) tuple.getValueByField("data");
283  if (message instanceof SwitchModMessage) {
284  SwitchModMessage switchModMessage = (SwitchModMessage) message;
285  ISwitchImpl sw = getSwitch(switchModMessage.getDpid());
286  // sw.mod(switchModMessage.getState());
287  } else {
288  logger.error("Unkown SimulatorMessage {}", message.getClass().getSimpleName());
289  }
290  }
291 
292  if (values.size() > 0) {
293  for (Values value : values) {
294  logger.debug("emitting: {}", value);
295  collector.emit(SimulatorTopology.KAFKA_BOLT_STREAM, tuple, value);
296  }
297  }
298  }
299 
306  public void doCommand(Tuple tuple) throws Exception {
307  String command = tuple.getStringByField(TupleFields.COMMAND.name());
308  List<Values> values = new ArrayList<>();
309 
311  values = addSwitch((SwitchMessage) tuple.getValueByField(TupleFields.DATA.name()));
312  if (values.size() > 0) {
313  for (Values value : values) {
314  logger.debug("emitting: {}", value);
315  collector.emit(SimulatorTopology.KAFKA_BOLT_STREAM, tuple, value);
316  }
317  }
318  return;
319  } else if (command.equals(Commands.DO_DISCOVER_ISL_P2_COMMAND.name())) {
320  discoverIslPartTwo(tuple, (IslInfoData) tuple.getValueByField(TupleFields.DATA.name()));
321  return;
322  }
323 
324  CommandData data = (CommandData) tuple.getValueByField(TupleFields.DATA.name());
325  if (command.equals(Commands.DO_DISCOVER_ISL_COMMAND.name())) {
327  } else {
328  logger.error("Unknown switch command: {}".format(command));
329  return;
330  }
331 
332  if (values.size() > 0) {
333  for (Values value : values) {
334  logger.debug("emitting: {}", value);
335  collector.emit(SimulatorTopology.KAFKA_BOLT_STREAM, tuple, value);
336  }
337  }
338  }
339 
340  @Override
341  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
342  this.collector = outputCollector;
343 
344  switches = new HashMap<>();
345  }
346 
347  @Override
348  public void execute(Tuple tuple) {
349  logger.debug("got tuple: {}", tuple.toString());
350  try {
351  String tupleSource = tuple.getSourceComponent();
352 
353  switch (tupleSource) {
356  doCommand(tuple);
357  break;
359  doSimulatorCommand(tuple);
360  break;
361  default:
362  logger.error("tuple from UNKNOWN source: {}", tupleSource);
363  }
364  } catch (Exception e) {
365  logger.error(e.toString());
366  e.printStackTrace();
367  } finally {
368  collector.ack(tuple);
369  }
370  }
371 
372  @Override
373  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
374  outputFieldsDeclarer.declareStream(SimulatorTopology.KAFKA_BOLT_STREAM, new Fields("key", "message"));
375  outputFieldsDeclarer.declareStream(SimulatorTopology.SWITCH_BOLT_STREAM,
376  new Fields("dpid", TupleFields.COMMAND.name(), TupleFields.DATA.name()));
377  }
378 }
String makeSwitchMessage(ISwitchImpl sw, SwitchState state)
static final ObjectMapper MAPPER
Definition: Utils.java:31
List< Values > addSwitch(AddSwitchCommand data)
value
Definition: nodes.py:62
Map< SwitchId, ISwitchImpl > switches
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
name
Definition: setup.py:24
List< Values > addLink(AddLinkCommandMessage message)
void discoverIsl(Tuple tuple, DiscoverIslCommandData data)
ISwitchImpl getSwitch(SwitchId name)
void setPeerSwitch(String peerSwitch)
Definition: IPortImpl.java:144
def command(payload, fields)
Definition: share.py:102
void setSegLatency(final long latency)
Definition: PathNode.java:207
List< Values > modPort(PortModMessage message)
void discoverIslPartTwo(Tuple tuple, IslInfoData data)
String makePortMessage(ISwitchImpl sw, int portNum, PortChangeType type)
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
List< Values > addSwitch(SwitchMessage switchMessage)