1 package org.openkilda.wfm.topology.utils;
3 import org.apache.storm.kafka.spout.KafkaSpout;
4 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
5 import org.apache.storm.spout.SpoutOutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.tuple.Fields;
9 import org.apache.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
14 import java.util.UUID;
17 private static final Logger logger = LoggerFactory.getLogger(
LcmKafkaSpout.class);
22 private UUID syncEventId = null;
23 private boolean isSyncDone =
false;
24 private String spoutId = null;
27 super(kafkaSpoutConfig);
33 logger.trace(
"Proxy .nextTuple() call to super object");
35 }
else if (syncEventId == null) {
36 logger.info(
"Spout {} - sending LCM sync event", spoutId);
38 syncEventId = UUID.randomUUID();
42 logger.debug(
"Spout {} - is in passive mode, waiting LCM sync to complete", spoutId);
47 public void ack(Object messageId) {
48 if (matchSyncId(messageId)) {
49 logger.info(
"Spout {} - LCM sync is over, switch into active mode", spoutId);
59 public void fail(Object messageId) {
60 if (matchSyncId(messageId)) {
61 logger.error(
"Spout {} - LCM sync event is not handled (reschedule sync event)", spoutId);
66 super.fail(messageId);
70 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
71 spoutId = context.getThisComponentId();
73 super.open(conf, context, collector);
78 super.declareOutputFields(declarer);
82 private boolean matchSyncId(Object messageId) {
83 boolean isMatch =
false;
84 if (syncEventId != null) {
85 isMatch = syncEventId.equals(messageId);
static final String STREAM_ID_LCM
void ack(Object messageId)
void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
static final Fields STREAM_SCHEMA_LCM
LcmKafkaSpout(KafkaSpoutConfig< K, V > kafkaSpoutConfig)
void declareOutputFields(OutputFieldsDeclarer declarer)
void fail(Object messageId)