16 package org.openkilda.wfm.topology.stats.bolts;
21 import org.apache.storm.task.OutputCollector;
22 import org.apache.storm.task.TopologyContext;
23 import org.apache.storm.topology.OutputFieldsDeclarer;
24 import org.apache.storm.topology.base.BaseRichBolt;
25 import org.apache.storm.tuple.Fields;
26 import org.apache.storm.tuple.Tuple;
27 import org.apache.storm.tuple.Values;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import java.io.IOException;
68 private static final Logger logger = LoggerFactory.getLogger(
CacheFilterBolt.class);
70 private TopologyContext context;
71 private OutputCollector outputCollector;
78 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
79 this.context = topologyContext;
80 this.outputCollector = outputCollector;
89 String json = tuple.getString(0);
115 }
catch (IOException exception) {
116 logger.error(
"Could not deserialize message {}", tuple, exception);
117 }
catch (Exception e) {
118 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
120 outputCollector.ack(tuple);
125 emit(tuple, action,
command, null);
129 Values values =
new Values(
135 outputCollector.emit(CACHE_UPDATE.name(), tuple, values);
146 private void logMatchedRecord(
BaseFlow flowCommand) {
147 logger.debug(
"Catch {} command flow_id={} sw={} cookie={}",
148 flowCommand.getClass().getCanonicalName(),
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void execute(Tuple tuple)
def command(payload, fields)
static final Fields fieldsMessageUpdateCache
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)