16 package org.openkilda.simulator.bolts;
45 import org.apache.storm.task.OutputCollector;
46 import org.apache.storm.task.TopologyContext;
47 import org.apache.storm.topology.OutputFieldsDeclarer;
48 import org.apache.storm.topology.base.BaseRichBolt;
49 import org.apache.storm.tuple.Fields;
50 import org.apache.storm.tuple.Tuple;
51 import org.apache.storm.tuple.Values;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import java.io.IOException;
56 import java.time.Instant;
57 import java.util.ArrayList;
58 import java.util.HashMap;
59 import java.util.List;
61 import java.util.UUID;
64 private static final Logger logger = LoggerFactory.getLogger(
SpeakerBolt.class);
65 private OutputCollector collector;
75 new SwitchId(sw.getDpid().toString()),
84 Instant.now().toEpochMilli(),
85 UUID.randomUUID().toString(),
92 new SwitchId(sw.getDpid().toString()),
98 Instant.now().toEpochMilli(),
99 UUID.randomUUID().toString(),
105 List<Values> values =
new ArrayList<>();
118 List<Values> values =
new ArrayList<>();
121 logger.info(
"switch does not exist, adding it");
126 List<LinkMessage> links = switchMessage.getLinks();
144 values.add(
new Values(
"INFO",
makePortMessage(sw,
p.getNumber(), changeType)));
169 List<PathNode>
path =
new ArrayList<>();
203 if (
port.isActiveIsl()) {
204 long now = Instant.now().toEpochMilli();
206 logger.debug(
"checking isl on: {}",
data.toString());
208 new Values(
"INFO",
Utils.
MAPPER.writeValueAsString(infoMessage)));
217 port = sw.
getPort(message.getLink().getLocalPort());
218 port.setLatency(message.getLink().getLatency());
219 port.setPeerSwitch(message.getLink().getPeerSwitch());
220 port.setPeerPortNum(message.getLink().getPeerPort());
223 List<Values> values =
new ArrayList<>();
224 values.add(
new Values(
"INFO",
Utils.
MAPPER.writeValueAsString(
port.makePorChangetMessage())));
230 List<Values> values =
new ArrayList<>();
233 port.modPort(message);
234 values.add(
new Values(
"INFO",
Utils.
MAPPER.writeValueAsString(
port.makePorChangetMessage())));
260 List<Values> values =
new ArrayList<>();
261 if (tuple.getFields().contains(
"command")) {
262 String
command = tuple.getStringByField(
"command");
279 logger.error(String.format(
"Uknown SimulatorCommand %s",
command));
288 logger.error(
"Unkown SimulatorMessage {}", message.getClass().getSimpleName());
292 if (values.size() > 0) {
293 for (Values
value : values) {
294 logger.debug(
"emitting: {}",
value);
308 List<Values> values =
new ArrayList<>();
312 if (values.size() > 0) {
313 for (Values
value : values) {
314 logger.debug(
"emitting: {}",
value);
332 if (values.size() > 0) {
333 for (Values
value : values) {
334 logger.debug(
"emitting: {}",
value);
341 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
342 this.collector = outputCollector;
349 logger.debug(
"got tuple: {}", tuple.toString());
351 String tupleSource = tuple.getSourceComponent();
353 switch (tupleSource) {
362 logger.error(
"tuple from UNKNOWN source: {}", tupleSource);
364 }
catch (Exception e) {
365 logger.error(e.toString());
368 collector.ack(tuple);
String makeSwitchMessage(ISwitchImpl sw, SwitchState state)
void execute(Tuple tuple)
static final String KAFKA_BOLT_STREAM
void setLatency(int latency)
static final String DO_ADD_LINK
static final ObjectMapper MAPPER
List< Values > addSwitch(AddSwitchCommand data)
List< IPortImpl > getPorts()
Map< SwitchId, ISwitchImpl > switches
static final String DO_PORT_MOD
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final String SWITCH_BOLT
void setPeerPortNum(int peerPortNum)
List< Values > addLink(AddLinkCommandMessage message)
void discoverIsl(Tuple tuple, DiscoverIslCommandData data)
IPortImpl getPort(int portNum)
ISwitchImpl getSwitch(SwitchId name)
void setPeerSwitch(String peerSwitch)
def command(payload, fields)
static final String COMMAND_BOLT
void setSegLatency(final long latency)
static final String SWITCH_BOLT_STREAM
List< Values > modPort(PortModMessage message)
static final String SIMULATOR_COMMAND_BOLT
void discoverIslPartTwo(Tuple tuple, IslInfoData data)
String makePortMessage(ISwitchImpl sw, int portNum, PortChangeType type)
static final String DO_ADD_SWITCH
DO_DISCOVER_ISL_P2_COMMAND
void doCommand(Tuple tuple)
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void doSimulatorCommand(Tuple tuple)
List< Values > addSwitch(SwitchMessage switchMessage)