16 package org.openkilda.wfm.topology.utils;
18 import org.slf4j.LoggerFactory;
19 import org.slf4j.Logger;
20 import org.apache.storm.Config;
21 import org.apache.storm.Constants;
22 import org.apache.storm.state.State;
23 import org.apache.storm.task.OutputCollector;
24 import org.apache.storm.task.TopologyContext;
25 import org.apache.storm.topology.base.BaseStatefulBolt;
26 import org.apache.storm.tuple.Tuple;
38 private Integer emitFrequency;
40 private static final int DEFAULT_FREQUENCY = 1;
43 emitFrequency = DEFAULT_FREQUENCY;
48 emitFrequency = frequency;
52 this.emitFrequency = frequency;
62 Config conf =
new Config();
63 conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
68 return (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
69 && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID));
73 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
87 protected abstract void doTick(Tuple tuple);
89 protected abstract void doWork(Tuple tuple);
abstract void doTick(Tuple tuple)
AbstractTickStatefulBolt withFrequency(Integer frequency)
AbstractTickStatefulBolt()
Map< String, Object > getComponentConfiguration()
void prepare(Map conf, TopologyContext context, OutputCollector collector)
abstract void doWork(Tuple tuple)
void execute(Tuple tuple)
OutputCollector _collector
boolean isTickTuple(Tuple tuple)
AbstractTickStatefulBolt(Integer frequency)