Open Kilda Java Documentation
SpeakerBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
35 
36 import org.apache.storm.task.OutputCollector;
37 import org.apache.storm.task.TopologyContext;
38 import org.apache.storm.topology.OutputFieldsDeclarer;
39 import org.apache.storm.topology.base.BaseRichBolt;
40 import org.apache.storm.tuple.Fields;
41 import org.apache.storm.tuple.Tuple;
42 import org.apache.storm.tuple.Values;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 
46 import java.io.IOException;
47 import java.util.Map;
48 
52 public class SpeakerBolt extends BaseRichBolt {
53  public static final String FIELD_ID_INPUT = "input";
54  public static final String FIELD_ID_PAYLOAD = "payload";
55 
56  public static final String STREAM_VERIFICATION_ID = "verification";
57  public static final Fields STREAM_VERIFICATION_FIELDS = new Fields(FIELD_ID_PAYLOAD, FIELD_ID_INPUT);
61  private static final Logger logger = LoggerFactory.getLogger(SpeakerBolt.class);
62 
66  private OutputCollector outputCollector;
67 
71  @Override
72  public void execute(Tuple tuple) {
73  String request = tuple.getString(0);
74  Values values = null;
75 
76  try {
77 
78  Message message = MAPPER.readValue(request, Message.class);
79  logger.debug("Request tuple={}", tuple);
80 
81  if (message instanceof InfoMessage) {
82  handleInfoMessage(tuple, (InfoMessage) message);
83  return;
84  }
85 
86  if (!Destination.WFM_TRANSACTION.equals(message.getDestination())) {
87  return;
88  }
89 
90  if (message instanceof CommandMessage) {
91 
92  CommandData data = ((CommandMessage) message).getData();
93 
94  if (data instanceof BaseInstallFlow) {
95  Long transactionId = ((BaseInstallFlow) data).getTransactionId();
96  SwitchId switchId = ((BaseInstallFlow) data).getSwitchId();
97  String flowId = ((BaseInstallFlow) data).getId();
98 
99  logger.debug("Flow install message: {}={}, switch-id={}, {}={}, {}={}, message={}",
100  Utils.CORRELATION_ID, message.getCorrelationId(), switchId,
101  Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId, request);
102 
104  values = new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
105  // FIXME(surabujin): looks like TE ignore this messages
106  outputCollector.emit(StreamType.CREATE.toString(), tuple, values);
107 
108  } else if (data instanceof RemoveFlow) {
109 
110  Long transactionId = ((RemoveFlow) data).getTransactionId();
111  SwitchId switchId = ((RemoveFlow) data).getSwitchId();
112  String flowId = ((RemoveFlow) data).getId();
113 
114  logger.debug("Flow remove message: {}={}, switch-id={}, {}={}, {}={}, message={}",
115  Utils.CORRELATION_ID, message.getCorrelationId(), switchId,
116  Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId, request);
117 
119  values = new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
120  outputCollector.emit(StreamType.DELETE.toString(), tuple, values);
121 
122  } else {
123  logger.debug("Skip undefined command message: {}={}, message={}",
124  Utils.CORRELATION_ID, message.getCorrelationId(), request);
125  }
126  } else if (message instanceof ErrorMessage) {
127  String flowId = ((ErrorMessage) message).getData().getErrorDescription();
129 
130  // TODO: Should add debug message if receiving ErrorMessage.
131  if (flowId != null) {
132  logger.error("Flow error message: {}={}, {}={}, message={}",
133  Utils.CORRELATION_ID, message.getCorrelationId(), Utils.FLOW_ID, flowId, request);
134 
135  values = new Values(flowId, status);
136  outputCollector.emit(StreamType.STATUS.toString(), tuple, values);
137  } else {
138  logger.debug("Skip error message without flow-id: {}={}, message={}",
139  Utils.CORRELATION_ID, message.getCorrelationId(), request);
140  }
141 
142  } else {
143  // TODO: should this be a warn or error? Probably, after refactored / specific
144  // topics
145  logger.debug("Skip undefined message: {}={}, message={}",
146  Utils.CORRELATION_ID, message.getCorrelationId(), request);
147  }
148  } catch (IOException exception) {
149  logger.error("\n\nCould not deserialize message={}", request, exception);
150  } catch (Exception e) {
151  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
152  } finally {
153  outputCollector.ack(tuple);
154 
155  logger.debug("Speaker message ack: component={}, stream={}, tuple={}, values={}",
156  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
157  }
158  }
159 
160  private void handleInfoMessage(Tuple input, InfoMessage message) {
161  InfoData rawPayload = message.getData();
162 
163  if (rawPayload instanceof UniFlowVerificationResponse) {
164  Values proxyData = new Values(rawPayload, message);
165  outputCollector.emit(STREAM_VERIFICATION_ID, input, proxyData);
166  } else {
167  logger.debug("Unhandled InfoMessage with payload type: {}", rawPayload.getClass().getName());
168  }
169  }
170 
174  @Override
175  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
176  outputFieldsDeclarer.declareStream(
178  outputFieldsDeclarer.declareStream(
180  outputFieldsDeclarer.declareStream(StreamType.STATUS.toString(), FlowTopology.fieldsFlowIdStatus);
181  outputFieldsDeclarer.declareStream(STREAM_VERIFICATION_ID, STREAM_VERIFICATION_FIELDS);
182  }
183 
187  @Override
188  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
189  this.outputCollector = outputCollector;
190  }
191 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
def status()
Definition: rest.py:593
void setDestination(final Destination destination)
Definition: Message.java:111
static final String TRANSACTION_ID
Definition: Utils.java:39
static final String CORRELATION_ID
Definition: Utils.java:43
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final String FLOW_ID
Definition: Utils.java:61
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)