1 package org.openkilda.wfm.ctrl;
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Tuple;
12 import java.util.HashMap;
16 final private String PREFIX_STREAM_ID =
"ctrl.";
20 TopologyContext context;
21 OutputCollector output;
23 HashMap<String, String> endpointMapping;
28 endpointMapping =
new HashMap<>();
32 if (endpointMapping.containsKey(boltId)) {
36 String stream = PREFIX_STREAM_ID + boltId;
37 endpointMapping.put(boltId, stream);
43 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
44 this.context = context;
45 this.output = collector;
51 this, input, topologyName, endpointMapping);
58 for (String streamId : endpointMapping.values()) {
String registerEndpoint(String boltId)
void declareOutputFields(OutputFieldsDeclarer declarer)
RouteBolt(String topology)
static final Fields FORMAT
void execute(Tuple input)
final String STREAM_ID_ERROR
OutputCollector getOutput()
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final Fields FORMAT
TopologyContext getContext()