1 package org.openkilda.wfm.topology.portstate.spout;
3 import static java.lang.String.format;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import org.apache.storm.spout.SpoutOutputCollector;
15 import org.apache.storm.task.TopologyContext;
16 import org.apache.storm.topology.OutputFieldsDeclarer;
17 import org.apache.storm.topology.base.BaseRichSpout;
18 import org.apache.storm.tuple.Fields;
19 import org.apache.storm.tuple.Values;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 import java.util.UUID;
28 private static final Logger logger = LoggerFactory.getLogger(
SwitchPortsSpout.class);
29 private static final String CRON_TUPLE =
"cron.tuple";
30 private static final int DEFAULT_FREQUENCY = 600;
31 private final String REQUESTER = this.getClass().getSimpleName();
32 private final int frequency;
33 private SpoutOutputCollector collector;
34 final String speakerTopic;
37 this(
config, DEFAULT_FREQUENCY);
41 this.frequency = frequency;
42 this.speakerTopic =
config.getKafkaSpeakerTopic();
45 private static long now() {
46 return System.currentTimeMillis();
50 public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
51 this.collector = collector;
56 Message message = buildPortsCommand(REQUESTER);
57 logger.debug(
"emitting portsCommand: {}", message.
toString());
60 Values values =
new Values(PAYLOAD,
Utils.
MAPPER.writeValueAsString(message));
61 collector.emit(values);
62 }
catch (JsonProcessingException e) {
63 logger.error(
"Error sleeping");
69 Thread.sleep(frequency * 1000);
70 }
catch (InterruptedException e) {
71 logger.error(
"Error sleeping");
75 private Message buildPortsCommand(String requester) {
76 String correlationId =
format(
"SwitchPortsSpout-%s", UUID.randomUUID().toString());
83 declarer.declare(
new Fields(
"key",
"message"));
static final String PAYLOAD
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer declarer)
SwitchPortsSpout(PortStateTopologyConfig config, int frequency)
void open(Map map, TopologyContext context, SpoutOutputCollector collector)
SwitchPortsSpout(PortStateTopologyConfig config)