Open Kilda Java Documentation
SwitchPortsSpout.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.portstate.spout;
2 
3 import static java.lang.String.format;
4 import static org.openkilda.messaging.Utils.PAYLOAD;
5 
12 
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import org.apache.storm.spout.SpoutOutputCollector;
15 import org.apache.storm.task.TopologyContext;
16 import org.apache.storm.topology.OutputFieldsDeclarer;
17 import org.apache.storm.topology.base.BaseRichSpout;
18 import org.apache.storm.tuple.Fields;
19 import org.apache.storm.tuple.Values;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 
23 import java.util.Map;
24 import java.util.UUID;
25 
26 public class SwitchPortsSpout extends BaseRichSpout {
27 
28  private static final Logger logger = LoggerFactory.getLogger(SwitchPortsSpout.class);
29  private static final String CRON_TUPLE = "cron.tuple";
30  private static final int DEFAULT_FREQUENCY = 600;
31  private final String REQUESTER = this.getClass().getSimpleName();
32  private final int frequency;
33  private SpoutOutputCollector collector;
34  final String speakerTopic;
35 
37  this(config, DEFAULT_FREQUENCY);
38  }
39 
41  this.frequency = frequency;
42  this.speakerTopic = config.getKafkaSpeakerTopic();
43  }
44 
45  private static long now() {
46  return System.currentTimeMillis();
47  }
48 
49  @Override
50  public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
51  this.collector = collector;
52  }
53 
54  @Override
55  public void nextTuple() {
56  Message message = buildPortsCommand(REQUESTER);
57  logger.debug("emitting portsCommand: {}", message.toString());
58 
59  try {
60  Values values = new Values(PAYLOAD, Utils.MAPPER.writeValueAsString(message));
61  collector.emit(values);
62  } catch (JsonProcessingException e) {
63  logger.error("Error sleeping");
64  }
65 
66  // Note that no tupleId which means this is an untracked tuple which is
67  // required for the sleep
68  try {
69  Thread.sleep(frequency * 1000);
70  } catch (InterruptedException e) {
71  logger.error("Error sleeping");
72  }
73  }
74 
75  private Message buildPortsCommand(String requester) {
76  String correlationId = format("SwitchPortsSpout-%s", UUID.randomUUID().toString());
77  return new CommandMessage(new PortsCommandData(requester), now(), correlationId,
79  }
80 
81  @Override
82  public void declareOutputFields(OutputFieldsDeclarer declarer) {
83  declarer.declare(new Fields("key", "message"));
84  }
85 }
static final String PAYLOAD
Definition: Utils.java:57
static final ObjectMapper MAPPER
Definition: Utils.java:31
SwitchPortsSpout(PortStateTopologyConfig config, int frequency)
void open(Map map, TopologyContext context, SpoutOutputCollector collector)