16 package org.openkilda.wfm.topology.flow.bolts;
40 import org.apache.storm.task.OutputCollector;
41 import org.apache.storm.task.TopologyContext;
42 import org.apache.storm.topology.OutputFieldsDeclarer;
43 import org.apache.storm.topology.base.BaseRichBolt;
44 import org.apache.storm.tuple.Tuple;
45 import org.apache.storm.tuple.Values;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
58 private static final Logger logger = LoggerFactory.getLogger(
SplitterBolt.class);
63 private OutputCollector outputCollector;
68 private Message tryMessage(String json) {
72 }
catch (Exception e) {
83 String request = tuple.getString(0);
84 Values values =
new Values(request);
87 Message message = tryMessage(request);
109 logger.debug(
"Request tuple={}", tuple);
122 values =
new Values(message, flowId);
123 logger.info(
"Flow {} message: operation={} values={}", flowId, fid.
getOperation(), values);
131 logger.warn(
"Skip undefined FlowInfoData Operation {}: {}={}",
148 logger.info(
"Flow {} create message: values={}", flowId, values);
150 values =
new Values(message, flowId);
156 logger.info(
"Flow {} delete message: values={}", flowId, values);
158 values =
new Values(message, flowId);
164 logger.info(
"Flow {} update message: values={}", flowId, values);
166 values =
new Values(message, flowId);
172 logger.info(
"Flow {} reroute message: values={}", flowId, values);
174 values =
new Values(message, flowId);
180 logger.info(
"Flow {} read message: values={}", flowId, values);
182 values =
new Values(message, flowId);
186 logger.info(
"Flows dump message: values={}", values);
188 values =
new Values(message, null);
191 logger.info(
"FlowCacheSyncRequest: values={}", values);
193 values =
new Values(message, null);
197 logger.info(
"Flow {} verification request", flowId);
204 }
catch (Exception e) {
205 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
208 outputCollector.ack(tuple);
210 logger.debug(
"Splitter message ack: component={}, stream={}, tuple={}, values={}",
211 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
237 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
238 this.outputCollector = outputCollector;
static final Fields fieldsMessageErrorType
void execute(Tuple tuple)
static final ObjectMapper MAPPER
static final Fields fieldsMessageFlowId
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
FlowOperation getOperation()
static final String CORRELATION_ID
Destination getDestination()
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
String getCorrelationId()