Open Kilda Java Documentation
AbstractTickRichBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.utils;
17 
18 import org.slf4j.LoggerFactory;
19 import org.slf4j.Logger;
20 import org.apache.storm.Config;
21 import org.apache.storm.Constants;
22 import org.apache.storm.task.OutputCollector;
23 import org.apache.storm.task.TopologyContext;
24 import org.apache.storm.topology.base.BaseRichBolt;
25 import org.apache.storm.tuple.Tuple;
26 
27 import java.util.Map;
28 
32 public abstract class AbstractTickRichBolt extends BaseRichBolt {
33 
34  private static final Logger logger = LoggerFactory.getLogger(AbstractTickRichBolt.class);
35  protected OutputCollector _collector;
36  private Integer emitFrequency;
37 
39  emitFrequency = 1; // every second
40  }
41 
42  public AbstractTickRichBolt(Integer frequency) {
43  emitFrequency = frequency;
44  }
45 
46  /*
47  * Configure frequency of tick tuples for this bolt. This delivers a 'tick' tuple on a specific
48  * interval, which is used to trigger certain actions
49  */
50  @Override
51  public Map<String, Object> getComponentConfiguration() {
52  Config conf = new Config();
53  conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
54  return conf;
55  }
56 
57  protected boolean isTickTuple(Tuple tuple) {
58  return (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
59  && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID));
60  }
61 
62  @Override
63  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
64  _collector = collector;
65  }
66 
67  //execute is called to process tuples
68  @Override
69  public void execute(Tuple tuple) {
70  //If it's a tick tuple, emit all words and counts
71  if (isTickTuple(tuple)) {
72  doTick(tuple);
73  } else {
74  doWork(tuple);
75  }
76  }
77 
78  protected abstract void doTick(Tuple tuple);
79 
80  protected abstract void doWork(Tuple tuple);
81 
82 }
void prepare(Map conf, TopologyContext context, OutputCollector collector)