16 package org.openkilda.wfm.topology.flow.bolts;
36 import org.apache.storm.task.OutputCollector;
37 import org.apache.storm.task.TopologyContext;
38 import org.apache.storm.topology.OutputFieldsDeclarer;
39 import org.apache.storm.topology.base.BaseRichBolt;
40 import org.apache.storm.tuple.Fields;
41 import org.apache.storm.tuple.Tuple;
42 import org.apache.storm.tuple.Values;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import java.io.IOException;
61 private static final Logger logger = LoggerFactory.getLogger(
SpeakerBolt.class);
66 private OutputCollector outputCollector;
73 String request = tuple.getString(0);
79 logger.debug(
"Request tuple={}", tuple);
99 logger.debug(
"Flow install message: {}={}, switch-id={}, {}={}, {}={}, message={}",
104 values =
new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
114 logger.debug(
"Flow remove message: {}={}, switch-id={}, {}={}, {}={}, message={}",
119 values =
new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
123 logger.debug(
"Skip undefined command message: {}={}, message={}",
127 String flowId = ((
ErrorMessage) message).getData().getErrorDescription();
131 if (flowId != null) {
132 logger.error(
"Flow error message: {}={}, {}={}, message={}",
135 values =
new Values(flowId,
status);
138 logger.debug(
"Skip error message without flow-id: {}={}, message={}",
145 logger.debug(
"Skip undefined message: {}={}, message={}",
148 }
catch (IOException exception) {
149 logger.error(
"\n\nCould not deserialize message={}", request, exception);
150 }
catch (Exception e) {
151 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
153 outputCollector.ack(tuple);
155 logger.debug(
"Speaker message ack: component={}, stream={}, tuple={}, values={}",
156 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
160 private void handleInfoMessage(Tuple input,
InfoMessage message) {
164 Values proxyData =
new Values(rawPayload, message);
167 logger.debug(
"Unhandled InfoMessage with payload type: {}", rawPayload.getClass().getName());
176 outputFieldsDeclarer.declareStream(
178 outputFieldsDeclarer.declareStream(
188 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
189 this.outputCollector = outputCollector;
static final ObjectMapper MAPPER
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
static final String FIELD_ID_INPUT
void setDestination(final Destination destination)
static final String TRANSACTION_ID
static final String CORRELATION_ID
Destination getDestination()
static final String STREAM_VERIFICATION_ID
static final String FIELD_ID_PAYLOAD
void execute(Tuple tuple)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final Fields STREAM_VERIFICATION_FIELDS
String getCorrelationId()
static final String FLOW_ID
static final Fields fieldsFlowIdStatus
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)