16 package org.openkilda.wfm.topology.flow.bolts;
26 import org.slf4j.LoggerFactory;
27 import org.slf4j.Logger;
28 import org.apache.storm.task.OutputCollector;
29 import org.apache.storm.task.TopologyContext;
30 import org.apache.storm.topology.OutputFieldsDeclarer;
31 import org.apache.storm.topology.base.BaseRichBolt;
32 import org.apache.storm.tuple.Tuple;
33 import org.apache.storm.tuple.Values;
44 private static final Logger logger = LoggerFactory.getLogger(
ErrorBolt.class);
49 private OutputCollector outputCollector;
55 public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
56 this.outputCollector = outputCollector;
69 Values values =
new Values(error);
72 logger.debug(
"Request tuple={}", tuple);
74 switch (componentId) {
77 logger.debug(
"Error message: data={}", error.
getData());
81 logger.debug(
"Skip message from UNKNOWN component: component={}, stream={}, error-type={}",
82 componentId, streamId, errorType);
85 }
catch (Exception exception) {
86 logger.error(
"Could not process message: {}", tuple, exception);
88 outputCollector.ack(tuple);
90 logger.debug(
"Error message ack: component={}, stream={}, tuple={}, values={}",
91 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
static final String ERROR_TYPE_FIELD
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final String MESSAGE_FIELD
static final Fields fieldMessage
void execute(Tuple tuple)
void setDestination(final Destination destination)
void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector)