Open Kilda Java Documentation
TopologyEngineBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
32 
33 import org.apache.storm.task.OutputCollector;
34 import org.apache.storm.task.TopologyContext;
35 import org.apache.storm.topology.OutputFieldsDeclarer;
36 import org.apache.storm.topology.base.BaseRichBolt;
37 import org.apache.storm.tuple.Tuple;
38 import org.apache.storm.tuple.Values;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
42 import java.io.IOException;
43 import java.util.Map;
44 import java.util.UUID;
45 
49 public class TopologyEngineBolt extends BaseRichBolt {
53  private static final Logger logger = LoggerFactory.getLogger(TopologyEngineBolt.class);
54 
58  private OutputCollector outputCollector;
59 
63  @Override
64  public void execute(Tuple tuple) {
65  String request = tuple.getString(0);
66  Values values = null;
67 
68  try {
69  Message message = MAPPER.readValue(request, Message.class);
70  if (!Destination.WFM.equals(message.getDestination())) {
71  return;
72  }
73  logger.debug("Request tuple={}", tuple);
74 
75  if (message instanceof CommandMessage) {
76  CommandData data = ((CommandMessage) message).getData();
77 
78  if (data instanceof BaseInstallFlow) {
79  BaseInstallFlow installData = (BaseInstallFlow) data;
80  Long transactionId = UUID.randomUUID().getLeastSignificantBits();
81  installData.setTransactionId(transactionId);
82  SwitchId switchId = installData.getSwitchId();
83  String flowId = installData.getId();
84 
85  logger.debug("Flow install message: {}={}, switch-id={}, {}={}, {}={}, message={}",
86  Utils.CORRELATION_ID, message.getCorrelationId(), switchId,
87  Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId, request);
88 
89  // FIXME(surabujin): send here and in TE
91  values = new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
92  outputCollector.emit(StreamType.CREATE.toString(), tuple, values);
93 
94  } else if (data instanceof RemoveFlow) {
95  RemoveFlow removeData = (RemoveFlow) data;
96  Long transactionId = UUID.randomUUID().getLeastSignificantBits();
97  removeData.setTransactionId(transactionId);
98  SwitchId switchId = removeData.getSwitchId();
99  String flowId = removeData.getId();
100 
101  logger.debug("Flow remove message: {}={}, switch-id={}, {}={}, {}={}, message={}",
102  Utils.CORRELATION_ID, message.getCorrelationId(), switchId,
103  Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId, request);
104 
106  values = new Values(MAPPER.writeValueAsString(message), switchId, flowId, transactionId);
107  outputCollector.emit(StreamType.DELETE.toString(), tuple, values);
108 
109  } else {
110  logger.debug("Skip undefined command message: {}={}, message={}",
111  Utils.CORRELATION_ID, message.getCorrelationId(), request);
112  }
113  } else if (message instanceof InfoMessage) {
114  values = new Values(message);
115 
116  logger.debug("Flow response message: {}={}, message={}",
117  Utils.CORRELATION_ID, message.getCorrelationId(), request);
118 
119  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, values);
120 
121  } else if (message instanceof ErrorMessage) {
122  String flowId = ((ErrorMessage) message).getData().getErrorDescription();
123 
124  logger.error("Flow error message: {}={}, {}={}, message={}",
125  Utils.CORRELATION_ID, message.getCorrelationId(), Utils.FLOW_ID, flowId, request);
126 
127  values = new Values(message, flowId);
128  outputCollector.emit(StreamType.STATUS.toString(), tuple, values);
129 
130  } else {
131  logger.debug("Skip undefined message: {}={}, message={}",
132  Utils.CORRELATION_ID, message.getCorrelationId(), request);
133  }
134  } catch (IOException exception) {
135  logger.error("Could not deserialize message={}", request, exception);
136  } catch (Exception e) {
137  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
138  } finally {
139  outputCollector.ack(tuple);
140 
141  logger.debug("Topology-Engine message ack: component={}, stream={}, tuple={}, values={}",
142  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
143  }
144  }
145 
149  @Override
150  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
151  outputFieldsDeclarer.declareStream(
152  StreamType.CREATE.toString(),
154  );
155  outputFieldsDeclarer.declareStream(
156  StreamType.DELETE.toString(),
158  );
159  outputFieldsDeclarer.declareStream(
160  StreamType.RESPONSE.toString(),
162  );
163  outputFieldsDeclarer.declareStream(
164  StreamType.STATUS.toString(),
166  );
167  }
168 
172  @Override
173  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
174  this.outputCollector = outputCollector;
175  }
176 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
static final Fields fieldsMessageSwitchIdFlowIdTransactionId
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
void setTransactionId(final Long transactionId)
Definition: BaseFlow.java:98
void setDestination(final Destination destination)
Definition: Message.java:111
static final String TRANSACTION_ID
Definition: Utils.java:39
static final String CORRELATION_ID
Definition: Utils.java:43
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final String FLOW_ID
Definition: Utils.java:61