Open Kilda Java Documentation
TransactionBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
29 
30 import org.apache.storm.shade.org.eclipse.jetty.util.ConcurrentHashSet;
31 import org.apache.storm.state.InMemoryKeyValueState;
32 import org.apache.storm.task.OutputCollector;
33 import org.apache.storm.task.TopologyContext;
34 import org.apache.storm.topology.OutputFieldsDeclarer;
35 import org.apache.storm.topology.base.BaseStatefulBolt;
36 import org.apache.storm.tuple.Tuple;
37 import org.apache.storm.tuple.Values;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 
41 import java.util.HashMap;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.ConcurrentHashMap;
45 
54 public class TransactionBolt
55  extends BaseStatefulBolt<InMemoryKeyValueState<SwitchId, Map<String, Set<Long>>>>
56  implements ICtrlBolt {
60  private static final Logger logger = LoggerFactory.getLogger(TransactionBolt.class);
61 
62  public static final String STREAM_ID_CTRL = "ctrl";
63 
69  private InMemoryKeyValueState<SwitchId, Map<String, Set<Long>>> transactions;
70 
71  private TopologyContext context;
72  private OutputCollector outputCollector;
73 
74  @Override
75  public void execute(Tuple tuple) {
76 
77  if (CtrlAction.boltHandlerEntrance(this, tuple)) {
78  return;
79  }
80 
81  logger.trace("States before: {}", transactions);
82 
83  ComponentType componentId = ComponentType.valueOf(tuple.getSourceComponent());
84  StreamType streamId = StreamType.valueOf(tuple.getSourceStreamId());
85  Long transactionId = (Long) tuple.getValueByField(Utils.TRANSACTION_ID);
86  SwitchId switchId = (SwitchId) tuple.getValueByField(FlowTopology.SWITCH_ID_FIELD);
87  String flowId = (String) tuple.getValueByField(Utils.FLOW_ID);
88  Object message = tuple.getValueByField(FlowTopology.MESSAGE_FIELD);
89  Map<String, Set<Long>> flowTransactions;
90  Set<Long> flowTransactionIds;
91  Values values = null;
92 
93  try {
94  logger.debug("Request tuple={}", tuple);
95 
96  switch (componentId) {
97 
98  case TOPOLOGY_ENGINE_BOLT:
99  logger.info("Transaction from TopologyEngine: switch-id={}, {}={}, {}={}",
100  switchId, Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId);
101 
102  flowTransactions = transactions.get(switchId);
103  if (flowTransactions == null) {
104  flowTransactions = new ConcurrentHashMap<>();
105  transactions.put(switchId, flowTransactions);
106  }
107 
108  flowTransactionIds = flowTransactions.get(flowId);
109  if (flowTransactionIds == null) {
110  flowTransactionIds = new ConcurrentHashSet<>();
111  flowTransactions.put(flowId, flowTransactionIds);
112  }
113 
114  if (!flowTransactionIds.add(transactionId)) {
115  throw new RuntimeException(
116  String.format("Transaction adding failure: id %d already exists", transactionId));
117  }
118 
119  logger.info("Set status {}: switch-id={}, {}={}, {}={}", FlowState.IN_PROGRESS,
120  switchId, Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId);
121 
122  values = new Values(flowId, FlowState.IN_PROGRESS);
123  outputCollector.emit(StreamType.STATUS.toString(), tuple, values);
124 
125  values = new Values(message);
126  outputCollector.emit(streamId.toString(), tuple, values);
127  break;
128 
129  case SPEAKER_BOLT:
130 
131  logger.info("Transaction from Speaker: switch-id={}, {}={}, {}={}",
132  switchId, Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId);
133 
134  flowTransactions = transactions.get(switchId);
135  if (flowTransactions != null) {
136 
137  flowTransactionIds = flowTransactions.get(flowId);
138  if (flowTransactionIds != null) {
139 
140  if (flowTransactionIds.remove(transactionId)) {
141 
142  if (flowTransactionIds.isEmpty()) {
143  //
144  // All transactions have been removed .. the Flow
145  // can now be considered "UP"
146  //
147  logger.info(
148  "Flow transaction completed for one switch "
149  + "(switch: {}, flow: {}, stream: {})", switchId, flowId, streamId);
150 
151  values = new Values(flowId, FlowState.UP);
152  outputCollector.emit(StreamType.STATUS.toString(), tuple, values);
153 
154  flowTransactions.remove(flowId);
155  } else {
156  logger.debug("Transaction {} not empty yet, count = {}",
157  transactionId, flowTransactionIds.size()
158  );
159  }
160  } else {
161  logger.warn("Transaction removing: transaction id not found");
162  }
163  } else {
164  logger.warn("Transaction removing failure: flow id not found");
165  }
166  if (flowTransactions.isEmpty()) {
167  transactions.delete(switchId);
168  }
169  } else {
170  logger.warn("Transaction removing failure: switch id not found");
171  }
172  break;
173 
174  default:
175  logger.debug("Skip undefined message: message={}", tuple);
176  break;
177  }
178  } catch (RuntimeException exception) {
179  logger.error("Set status {}: switch-id={}, {}={}, {}={}",
180  FlowState.DOWN, switchId, Utils.FLOW_ID, flowId, Utils.TRANSACTION_ID, transactionId, exception);
181 
182  values = new Values(flowId, FlowState.DOWN);
183  outputCollector.emit(StreamType.STATUS.toString(), tuple, values);
184  } catch (Exception e) {
185  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
186  } finally {
187  outputCollector.ack(tuple);
188 
189  logger.debug("Transaction message ack: component={}, stream={}, tuple={}, values={}",
190  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
191  }
192 
193  logger.trace("States after: {}", transactions);
194  }
195 
199  @Override
200  public void initState(InMemoryKeyValueState<SwitchId, Map<String, Set<Long>>> state) {
201  transactions = state;
202  }
203 
207  @Override
208  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
209  outputFieldsDeclarer.declareStream(StreamType.CREATE.toString(), FlowTopology.fieldMessage);
210  outputFieldsDeclarer.declareStream(StreamType.DELETE.toString(), FlowTopology.fieldMessage);
211  outputFieldsDeclarer.declareStream(StreamType.STATUS.toString(), FlowTopology.fieldsFlowIdStatus);
212  // FIXME(dbogun): use proper tuple format
213  outputFieldsDeclarer.declareStream(STREAM_ID_CTRL, AbstractTopology.fieldMessage);
214  }
215 
219  @Override
220  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
221  this.context = topologyContext;
222  this.outputCollector = outputCollector;
223  }
224 
225  @Override
227  Map<SwitchId, Map<String, Set<Long>>> dump = new HashMap<>();
228  for (Map.Entry<SwitchId, Map<String, Set<Long>>> item : transactions) {
229  dump.put(item.getKey(), item.getValue());
230  }
231  return new TransactionBoltState(dump);
232  }
233 
234  @Override
235  public String getCtrlStreamId() {
236  return STREAM_ID_CTRL;
237  }
238 
239  @Override
241  // Not implemented
242  return new TransactionBoltState(new HashMap<>());
243  }
244 
245  @Override
246  public TopologyContext getContext() {
247  return context;
248  }
249 
250  @Override
251  public OutputCollector getOutput() {
252  return outputCollector;
253  }
254 }
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
static final String TRANSACTION_ID
Definition: Utils.java:39
void initState(InMemoryKeyValueState< SwitchId, Map< String, Set< Long >>> state)
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
Definition: CtrlAction.java:78
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
static final String FLOW_ID
Definition: Utils.java:61