Open Kilda Java Documentation
LcmKafkaSpout.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.utils;
2 
3 import org.apache.storm.kafka.spout.KafkaSpout;
4 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
5 import org.apache.storm.spout.SpoutOutputCollector;
6 import org.apache.storm.task.TopologyContext;
7 import org.apache.storm.topology.OutputFieldsDeclarer;
8 import org.apache.storm.tuple.Fields;
9 import org.apache.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 
13 import java.util.Map;
14 import java.util.UUID;
15 
16 public class LcmKafkaSpout<K, V> extends KafkaSpout<K, V> {
17  private static final Logger logger = LoggerFactory.getLogger(LcmKafkaSpout.class);
18 
19  public static final String STREAM_ID_LCM = "LCM";
20  public static final Fields STREAM_SCHEMA_LCM = new Fields("kind", "payload");
21 
22  private UUID syncEventId = null;
23  private boolean isSyncDone = false;
24  private String spoutId = null;
25 
26  public LcmKafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
27  super(kafkaSpoutConfig);
28  }
29 
30  @Override
31  public void nextTuple() {
32  if (isSyncDone) {
33  logger.trace("Proxy .nextTuple() call to super object");
34  super.nextTuple();
35  } else if (syncEventId == null) {
36  logger.info("Spout {} - sending LCM sync event", spoutId);
37 
38  syncEventId = UUID.randomUUID();
39  Values event = new Values(LcmEvent.SYNC_REQUEST, null);
40  collector.emit(STREAM_ID_LCM, event, syncEventId);
41  } else {
42  logger.debug("Spout {} - is in passive mode, waiting LCM sync to complete", spoutId);
43  }
44  }
45 
46  @Override
47  public void ack(Object messageId) {
48  if (matchSyncId(messageId)) {
49  logger.info("Spout {} - LCM sync is over, switch into active mode", spoutId);
50  syncEventId = null;
51  isSyncDone = true;
52  return;
53  }
54 
55  super.ack(messageId);
56  }
57 
58  @Override
59  public void fail(Object messageId) {
60  if (matchSyncId(messageId)) {
61  logger.error("Spout {} - LCM sync event is not handled (reschedule sync event)", spoutId);
62  syncEventId = null;
63  return;
64  }
65 
66  super.fail(messageId);
67  }
68 
69  @Override
70  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
71  spoutId = context.getThisComponentId();
72 
73  super.open(conf, context, collector);
74  }
75 
76  @Override
77  public void declareOutputFields(OutputFieldsDeclarer declarer) {
78  super.declareOutputFields(declarer);
79  declarer.declareStream(STREAM_ID_LCM, STREAM_SCHEMA_LCM);
80  }
81 
82  private boolean matchSyncId(Object messageId) {
83  boolean isMatch = false;
84  if (syncEventId != null) {
85  isMatch = syncEventId.equals(messageId);
86  }
87  return isMatch;
88  }
89 }
void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
LcmKafkaSpout(KafkaSpoutConfig< K, V > kafkaSpoutConfig)
void declareOutputFields(OutputFieldsDeclarer declarer)