Open Kilda Java Documentation
RouteBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.ctrl;
2 
3 import org.apache.storm.task.OutputCollector;
4 import org.apache.storm.task.TopologyContext;
5 import org.apache.storm.topology.OutputFieldsDeclarer;
6 import org.apache.storm.topology.base.BaseRichBolt;
7 import org.apache.storm.tuple.Tuple;
11 
12 import java.util.HashMap;
13 import java.util.Map;
14 
15 public class RouteBolt extends BaseRichBolt implements IKildaBolt {
16  final private String PREFIX_STREAM_ID = "ctrl.";
17  final public String STREAM_ID_ERROR = PREFIX_STREAM_ID + "_error";
18 
19  String topologyName;
20  TopologyContext context;
21  OutputCollector output;
22 
23  HashMap<String, String> endpointMapping;
24 
25  public RouteBolt(String topology) {
26  topologyName = topology;
27 
28  endpointMapping = new HashMap<>();
29  }
30 
31  public String registerEndpoint(String boltId) throws StreamNameCollisionException {
32  if (endpointMapping.containsKey(boltId)) {
33  throw new StreamNameCollisionException();
34  }
35 
36  String stream = PREFIX_STREAM_ID + boltId;
37  endpointMapping.put(boltId, stream);
38 
39  return stream;
40  }
41 
42  @Override
43  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
44  this.context = context;
45  this.output = collector;
46  }
47 
48  @Override
49  public void execute(Tuple input) {
50  RouteAction action = new RouteAction(
51  this, input, topologyName, endpointMapping);
52  action.run();
53  }
54 
55  @Override
56  public void declareOutputFields(OutputFieldsDeclarer declarer) {
57  declarer.declareStream(STREAM_ID_ERROR, KafkaMessage.FORMAT);
58  for (String streamId : endpointMapping.values()) {
59  declarer.declareStream(streamId, RouteMessage.FORMAT);
60  }
61  }
62 
63  @Override
64  public TopologyContext getContext() {
65  return context;
66  }
67 
68  @Override
69  public OutputCollector getOutput() {
70  return output;
71  }
72 }
String registerEndpoint(String boltId)
Definition: RouteBolt.java:31
void declareOutputFields(OutputFieldsDeclarer declarer)
Definition: RouteBolt.java:56
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
Definition: RouteBolt.java:43
TopologyContext getContext()
Definition: RouteBolt.java:64