16 package org.openkilda.simulator.bolts;
34 import org.apache.storm.task.OutputCollector;
35 import org.apache.storm.task.TopologyContext;
36 import org.apache.storm.topology.OutputFieldsDeclarer;
37 import org.apache.storm.topology.base.BaseRichBolt;
38 import org.apache.storm.tuple.Fields;
39 import org.apache.storm.tuple.Tuple;
40 import org.apache.storm.tuple.Values;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import java.util.List;
48 private static final Logger logger = LoggerFactory.getLogger(
CommandBolt.class);
49 private OutputCollector collector;
52 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
53 this.collector = outputCollector;
56 protected String
getType(String json)
throws Exception {
59 return ((String)
root.get(
"type")).toLowerCase();
60 }
catch (Exception e) {
61 logger.error(
"error getting type in: {}", json);
67 return tuple.getString(0);
98 logger.error(
"UNKNOWN data type: {}",
data.toString());
99 throw new Exception(
"Unknown command {}".
format(
data.getClass().getSimpleName()));
102 new Values(sw, switchCommand.name(),
command.getData()));
118 }
catch (Exception e) {
119 logger.error(
"Could not parse tuple: {}".
format(tuple.toString()), e);
121 collector.ack(tuple);
static final String COMMAND_BOLT_STREAM
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
String getJson(Tuple tuple)
void processCommand(Tuple tuple)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
def command(payload, fields)
String getType(String json)
DO_INSTALL_ONESWITCH_FLOW
static Map< String, ?> fromJson(String json)
void execute(Tuple tuple)