Open Kilda Java Documentation
AbstractTickStatefulBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.utils;
17 
18 import org.slf4j.LoggerFactory;
19 import org.slf4j.Logger;
20 import org.apache.storm.Config;
21 import org.apache.storm.Constants;
22 import org.apache.storm.state.State;
23 import org.apache.storm.task.OutputCollector;
24 import org.apache.storm.task.TopologyContext;
25 import org.apache.storm.topology.base.BaseStatefulBolt;
26 import org.apache.storm.tuple.Tuple;
27 
28 import java.util.Map;
29 
33 public abstract class AbstractTickStatefulBolt<T extends State> extends BaseStatefulBolt<T> {
34 
35  private static final Logger logger = LoggerFactory.getLogger(AbstractTickStatefulBolt.class);
36  protected OutputCollector _collector;
38  private Integer emitFrequency;
40  private static final int DEFAULT_FREQUENCY = 1;
41 
43  emitFrequency = DEFAULT_FREQUENCY;
44  }
45 
47  public AbstractTickStatefulBolt(Integer frequency) {
48  emitFrequency = frequency;
49  }
50 
51  public AbstractTickStatefulBolt withFrequency(Integer frequency){
52  this.emitFrequency = frequency;
53  return this;
54  }
55 
56  /*
57  * Configure frequency of tick tuples for this bolt. This delivers a 'tick' tuple on a specific
58  * interval, which is used to trigger certain actions
59  */
60  @Override
61  public Map<String, Object> getComponentConfiguration() {
62  Config conf = new Config();
63  conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
64  return conf;
65  }
66 
67  protected boolean isTickTuple(Tuple tuple) {
68  return (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
69  && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID));
70  }
71 
72  @Override
73  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
74  _collector = collector;
75  }
76 
77  //execute is called to process tuples
78  @Override
79  public void execute(Tuple tuple) {
80  if (isTickTuple(tuple)) {
81  doTick(tuple);
82  } else {
83  doWork(tuple);
84  }
85  }
86 
87  protected abstract void doTick(Tuple tuple);
88 
89  protected abstract void doWork(Tuple tuple);
90 
91 }
AbstractTickStatefulBolt withFrequency(Integer frequency)
void prepare(Map conf, TopologyContext context, OutputCollector collector)