Open Kilda Java Documentation
SplitterBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
39 
40 import org.apache.storm.task.OutputCollector;
41 import org.apache.storm.task.TopologyContext;
42 import org.apache.storm.topology.OutputFieldsDeclarer;
43 import org.apache.storm.topology.base.BaseRichBolt;
44 import org.apache.storm.tuple.Tuple;
45 import org.apache.storm.tuple.Values;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 
49 import java.util.Map;
50 
54 public class SplitterBolt extends BaseRichBolt {
58  private static final Logger logger = LoggerFactory.getLogger(SplitterBolt.class);
59 
63  private OutputCollector outputCollector;
64 
68  private Message tryMessage(String json) {
69  Message result = null;
70  try {
71  result = MAPPER.readValue(json, Message.class);
72  } catch (Exception e) {
73  /* do nothing */
74  }
75  return result;
76  }
77 
81  @Override
82  public void execute(Tuple tuple) {
83  String request = tuple.getString(0);
84  Values values = new Values(request);
85 
86  try {
87  Message message = tryMessage(request);
88  if (message == null
89  || !Destination.WFM.equals(message.getDestination())
90  || !(message instanceof CommandMessage || message instanceof InfoMessage)) {
91  /*
92  * Due to refactoring the kafka topics, it appears more messages are coming to the splitter than
93  * originally desinged for.
94  *
95  * TODO(crimi): Fix the cause of excess messages coming to the splitter.
96  */
97  // String message = String.format("Could not deserialize message: %s", request);
98  // logger.error("{}", message, exception);
99  //
100  // ErrorMessage errorMessage = new ErrorMessage(
101  // new ErrorData(ErrorType.REQUEST_INVALID, message, exception.getMessage()),
102  // System.currentTimeMillis(), Utils.SYSTEM_CORRELATION_ID, Destination.NORTHBOUND);
103  //
104  // values = new Values(errorMessage, ErrorType.INTERNAL_ERROR);
105  // outputCollector.emit(StreamType.ERROR.toString(), tuple, values);
106  return;
107  }
108 
109  logger.debug("Request tuple={}", tuple);
110 
111  /*
112  * First, try to see if this is a PUSH / UNPUSH (smaller code base vs other).
113  * NB: InfoMessage was used since it has the relevant attributes/properties for
114  * pushing the flow.
115  */
116  if (message instanceof InfoMessage) {
117  InfoData data = ((InfoMessage) message).getData();
118  if (data instanceof FlowInfoData) {
120  String flowId = fid.getFlowId();
121 
122  values = new Values(message, flowId);
123  logger.info("Flow {} message: operation={} values={}", flowId, fid.getOperation(), values);
124  if (fid.getOperation() == FlowOperation.PUSH
126  outputCollector.emit(StreamType.PUSH.toString(), tuple, values);
127  } else if (fid.getOperation() == FlowOperation.UNPUSH
129  outputCollector.emit(StreamType.UNPUSH.toString(), tuple, values);
130  } else {
131  logger.warn("Skip undefined FlowInfoData Operation {}: {}={}",
133  }
134  } else {
135  logger.warn("Skip undefined InfoMessage: {}={}", Utils.CORRELATION_ID, message.getCorrelationId());
136  }
137  return;
138  }
139 
140  /*
141  * Second, it isn't an InfoMessage, so it must be a CommandMessage.
142  */
143  CommandData data = ((CommandMessage) message).getData();
144 
145  if (data instanceof FlowCreateRequest) {
146  String flowId = ((FlowCreateRequest) data).getPayload().getFlowId();
147 
148  logger.info("Flow {} create message: values={}", flowId, values);
149 
150  values = new Values(message, flowId);
151  outputCollector.emit(StreamType.CREATE.toString(), tuple, values);
152 
153  } else if (data instanceof FlowDeleteRequest) {
154  String flowId = ((FlowDeleteRequest) data).getPayload().getFlowId();
155 
156  logger.info("Flow {} delete message: values={}", flowId, values);
157 
158  values = new Values(message, flowId);
159  outputCollector.emit(StreamType.DELETE.toString(), tuple, values);
160 
161  } else if (data instanceof FlowUpdateRequest) {
162  String flowId = ((FlowUpdateRequest) data).getPayload().getFlowId();
163 
164  logger.info("Flow {} update message: values={}", flowId, values);
165 
166  values = new Values(message, flowId);
167  outputCollector.emit(StreamType.UPDATE.toString(), tuple, values);
168 
169  } else if (data instanceof FlowRerouteRequest) {
170  String flowId = ((FlowRerouteRequest) data).getFlowId();
171 
172  logger.info("Flow {} reroute message: values={}", flowId, values);
173 
174  values = new Values(message, flowId);
175  outputCollector.emit(StreamType.REROUTE.toString(), tuple, values);
176 
177  } else if (data instanceof FlowReadRequest) {
178  String flowId = ((FlowReadRequest) data).getFlowId();
179 
180  logger.info("Flow {} read message: values={}", flowId, values);
181 
182  values = new Values(message, flowId);
183  outputCollector.emit(StreamType.READ.toString(), tuple, values);
184 
185  } else if (data instanceof FlowsDumpRequest) {
186  logger.info("Flows dump message: values={}", values);
187 
188  values = new Values(message, null);
189  outputCollector.emit(StreamType.DUMP.toString(), tuple, values);
190  } else if (data instanceof FlowCacheSyncRequest) {
191  logger.info("FlowCacheSyncRequest: values={}", values);
192 
193  values = new Values(message, null);
194  outputCollector.emit(StreamType.CACHE_SYNC.toString(), tuple, values);
195  } else if (data instanceof FlowVerificationRequest) {
196  String flowId = ((FlowVerificationRequest) data).getFlowId();
197  logger.info("Flow {} verification request", flowId);
198 
199  outputCollector.emit(StreamType.VERIFICATION.toString(), tuple, new Values(message, flowId));
200 
201  } else {
202  logger.debug("Skip undefined CommandMessage: {}={}", Utils.CORRELATION_ID, message.getCorrelationId());
203  }
204  } catch (Exception e) {
205  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
206 
207  } finally {
208  outputCollector.ack(tuple);
209 
210  logger.debug("Splitter message ack: component={}, stream={}, tuple={}, values={}",
211  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
212  }
213  }
214 
218  @Override
219  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
220  outputFieldsDeclarer.declareStream(StreamType.CREATE.toString(), FlowTopology.fieldsMessageFlowId);
221  outputFieldsDeclarer.declareStream(StreamType.READ.toString(), FlowTopology.fieldsMessageFlowId);
222  outputFieldsDeclarer.declareStream(StreamType.DUMP.toString(), FlowTopology.fieldsMessageFlowId);
223  outputFieldsDeclarer.declareStream(StreamType.UPDATE.toString(), FlowTopology.fieldsMessageFlowId);
224  outputFieldsDeclarer.declareStream(StreamType.DELETE.toString(), FlowTopology.fieldsMessageFlowId);
225  outputFieldsDeclarer.declareStream(StreamType.PUSH.toString(), FlowTopology.fieldsMessageFlowId);
226  outputFieldsDeclarer.declareStream(StreamType.UNPUSH.toString(), FlowTopology.fieldsMessageFlowId);
227  outputFieldsDeclarer.declareStream(StreamType.CACHE_SYNC.toString(), FlowTopology.fieldsMessageFlowId);
228  outputFieldsDeclarer.declareStream(StreamType.VERIFICATION.toString(), FlowTopology.fieldsMessageFlowId);
229  outputFieldsDeclarer.declareStream(StreamType.REROUTE.toString(), FlowTopology.fieldsMessageFlowId);
230  outputFieldsDeclarer.declareStream(StreamType.ERROR.toString(), FlowTopology.fieldsMessageErrorType);
231  }
232 
236  @Override
237  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
238  this.outputCollector = outputCollector;
239  }
240 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
list result
Definition: plan-d.py:72
static final String CORRELATION_ID
Definition: Utils.java:43
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)