16 package org.openkilda.wfm.topology.utils;
28 import org.apache.storm.task.OutputCollector;
29 import org.apache.storm.task.TopologyContext;
30 import org.apache.storm.topology.OutputFieldsDeclarer;
31 import org.apache.storm.topology.base.BaseRichBolt;
32 import org.apache.storm.tuple.Tuple;
33 import org.apache.storm.tuple.Values;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.io.IOException;
41 private static final Logger logger = LoggerFactory.getLogger(
HealthCheckBolt.class);
44 private final String healthCheckTopic;
46 private OutputCollector collector;
51 this.healthCheckTopic = healthCheckTopic;
55 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
56 this.collector = collector;
61 String request = input.getString(0);
67 collector.emit(healthCheckTopic, input, values);
69 }
catch (IOException exception) {
70 logger.error(
"Could not deserialize message: ", request, exception);
78 declarer.declareStream(healthCheckTopic, fieldMessage);
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer declarer)
static final Fields fieldMessage
HealthCheckBolt(String service, String healthCheckTopic)
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final String HEALTH_CHECK_OPERATIONAL_STATUS
void execute(Tuple input)
String getCorrelationId()