16 package org.openkilda.wfm.topology.flow.bolts;
27 import com.fasterxml.jackson.core.JsonProcessingException;
28 import org.slf4j.LoggerFactory;
29 import org.slf4j.Logger;
30 import org.apache.storm.task.OutputCollector;
31 import org.apache.storm.task.TopologyContext;
32 import org.apache.storm.topology.OutputFieldsDeclarer;
33 import org.apache.storm.topology.base.BaseRichBolt;
34 import org.apache.storm.tuple.Tuple;
35 import org.apache.storm.tuple.Values;
51 private OutputCollector outputCollector;
59 String streamId = tuple.getSourceStreamId();
64 logger.debug(
"Request tuple={}", tuple);
66 switch (componentId) {
68 case VERIFICATION_JOINT_BOLT:
71 logger.debug(
"Flow response: {}={}, component={}, stream={}, message={}",
75 values =
new Values(MAPPER.writeValueAsString(message));
81 logger.debug(
"Flow UNKNOWN response: {}={}, component={}, stream={}, message={}",
85 }
catch (JsonProcessingException exception) {
86 logger.error(
"Could not serialize message: component={}, stream={}, message={}",
87 componentId, streamId, message);
88 }
catch (Exception e) {
89 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
91 outputCollector.ack(tuple);
93 logger.debug(
"Northbound-Reply message ack: component={}, stream={}, tuple={}, values={}",
94 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
110 public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
111 this.outputCollector = outputCollector;
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final ObjectMapper MAPPER
static final String MESSAGE_FIELD
static final Fields fieldMessage
void setDestination(final Destination destination)
void execute(Tuple tuple)
static final String CORRELATION_ID
void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector)
String getCorrelationId()