16 package org.openkilda.wfm.topology.flow.bolts;
33 import org.apache.storm.task.OutputCollector;
34 import org.apache.storm.task.TopologyContext;
35 import org.apache.storm.topology.OutputFieldsDeclarer;
36 import org.apache.storm.topology.base.BaseRichBolt;
37 import org.apache.storm.tuple.Tuple;
38 import org.apache.storm.tuple.Values;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import java.io.IOException;
44 import java.util.UUID;
53 private static final Logger logger = LoggerFactory.getLogger(
TopologyEngineBolt.class);
58 private OutputCollector outputCollector;
65 String request = tuple.getString(0);
73 logger.debug(
"Request tuple={}", tuple);
80 Long transactionId = UUID.randomUUID().getLeastSignificantBits();
83 String flowId = installData.
getId();
85 logger.debug(
"Flow install message: {}={}, switch-id={}, {}={}, {}={}, message={}",
91 values =
new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
96 Long transactionId = UUID.randomUUID().getLeastSignificantBits();
99 String flowId = removeData.
getId();
101 logger.debug(
"Flow remove message: {}={}, switch-id={}, {}={}, {}={}, message={}",
106 values =
new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
110 logger.debug(
"Skip undefined command message: {}={}, message={}",
114 values =
new Values(message);
116 logger.debug(
"Flow response message: {}={}, message={}",
122 String flowId = ((
ErrorMessage) message).getData().getErrorDescription();
124 logger.error(
"Flow error message: {}={}, {}={}, message={}",
127 values =
new Values(message, flowId);
131 logger.debug(
"Skip undefined message: {}={}, message={}",
134 }
catch (IOException exception) {
135 logger.error(
"Could not deserialize message={}", request, exception);
136 }
catch (Exception e) {
137 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
139 outputCollector.ack(tuple);
141 logger.debug(
"Topology-Engine message ack: component={}, stream={}, tuple={}, values={}",
142 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
151 outputFieldsDeclarer.declareStream(
155 outputFieldsDeclarer.declareStream(
159 outputFieldsDeclarer.declareStream(
163 outputFieldsDeclarer.declareStream(
173 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
174 this.outputCollector = outputCollector;
static final ObjectMapper MAPPER
void execute(Tuple tuple)
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
static final Fields fieldsMessageFlowId
static final Fields fieldMessage
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void setTransactionId(final Long transactionId)
void setDestination(final Destination destination)
static final String TRANSACTION_ID
static final String CORRELATION_ID
Destination getDestination()
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
String getCorrelationId()
static final String FLOW_ID