Open Kilda Java Documentation
CommandBolt.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.simulator.bolts;
17 
33 
34 import org.apache.storm.task.OutputCollector;
35 import org.apache.storm.task.TopologyContext;
36 import org.apache.storm.topology.OutputFieldsDeclarer;
37 import org.apache.storm.topology.base.BaseRichBolt;
38 import org.apache.storm.tuple.Fields;
39 import org.apache.storm.tuple.Tuple;
40 import org.apache.storm.tuple.Values;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 
44 import java.util.List;
45 import java.util.Map;
46 
47 public class CommandBolt extends BaseRichBolt {
48  private static final Logger logger = LoggerFactory.getLogger(CommandBolt.class);
49  private OutputCollector collector;
50 
51  @Override
52  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
53  this.collector = outputCollector;
54  }
55 
56  protected String getType(String json) throws Exception {
57  try {
58  Map<String, ?> root = OfeMessageUtils.fromJson(json);
59  return ((String) root.get("type")).toLowerCase();
60  } catch (Exception e) {
61  logger.error("error getting type in: {}", json);
62  throw e;
63  }
64  }
65 
66  protected String getJson(Tuple tuple) {
67  return tuple.getString(0);
68  }
69 
70  protected void processCommand(Tuple tuple) throws Exception {
71  CommandMessage command = Utils.MAPPER.readValue(getJson(tuple), CommandMessage.class);
72  if (command.getDestination() == Destination.CONTROLLER) {
73  CommandData data = command.getData();
74  Commands switchCommand;
75  SwitchId sw;
76  if (data instanceof DiscoverIslCommandData) {
77  switchCommand = Commands.DO_DISCOVER_ISL_COMMAND;
78  sw = ((DiscoverIslCommandData) data).getSwitchId();
79  } else if (data instanceof DiscoverPathCommandData) {
80  switchCommand = Commands.DO_DISCOVER_PATH_COMMAND;
81  sw = ((DiscoverPathCommandData) data).getSrcSwitchId();
82  } else if (data instanceof InstallIngressFlow) {
83  switchCommand = Commands.DO_INSTALL_INGRESS_FLOW;
84  sw = ((InstallIngressFlow) data).getSwitchId();
85  } else if (data instanceof InstallEgressFlow) {
86  switchCommand = Commands.DO_INSTALL_EGRESS_FLOW;
87  sw = ((InstallEgressFlow) data).getSwitchId();
88  } else if (data instanceof InstallTransitFlow) {
89  switchCommand = Commands.DO_INSTALL_TRANSIT_FLOW;
90  sw = ((InstallTransitFlow) data).getSwitchId();
91  } else if (data instanceof InstallOneSwitchFlow) {
92  switchCommand = Commands.DO_INSTALL_ONESWITCH_FLOW;
93  sw = ((InstallOneSwitchFlow) data).getSwitchId();
94  } else if (data instanceof RemoveFlow) {
95  switchCommand = Commands.DO_DELETE_FLOW;
96  sw = ((RemoveFlow) data).getSwitchId();
97  } else {
98  logger.error("UNKNOWN data type: {}", data.toString());
99  throw new Exception("Unknown command {}".format(data.getClass().getSimpleName()));
100  }
101  List<Integer> taskIDs = collector.emit(SimulatorTopology.COMMAND_BOLT_STREAM, tuple,
102  new Values(sw, switchCommand.name(), command.getData()));
103  // logger.info("{}: {} - {}", switchCommand.name(), sw, command.getData().toString());
104  }
105  }
106 
107  @Override
108  public void execute(Tuple tuple) {
109  try {
110  String json = getJson(tuple);
111  switch (getType(json)) {
112  case "command":
113  processCommand(tuple);
114  break;
115  default:
116  break;
117  }
118  } catch (Exception e) {
119  logger.error("Could not parse tuple: {}".format(tuple.toString()), e);
120  } finally {
121  collector.ack(tuple);
122  }
123  }
124 
125  @Override
126  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
127  outputFieldsDeclarer.declareStream(SimulatorTopology.COMMAND_BOLT_STREAM,
128  new Fields("dpid", SpeakerBolt.TupleFields.COMMAND.name(), SpeakerBolt.TupleFields.DATA.name()));
129  }
130 }
root
Definition: setup.py:12
static final ObjectMapper MAPPER
Definition: Utils.java:31
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
def command(payload, fields)
Definition: share.py:102
static Map< String, ?> fromJson(String json)