16 package org.openkilda.wfm.topology.flow.bolts;
30 import org.apache.storm.shade.org.eclipse.jetty.util.ConcurrentHashSet;
31 import org.apache.storm.state.InMemoryKeyValueState;
32 import org.apache.storm.task.OutputCollector;
33 import org.apache.storm.task.TopologyContext;
34 import org.apache.storm.topology.OutputFieldsDeclarer;
35 import org.apache.storm.topology.base.BaseStatefulBolt;
36 import org.apache.storm.tuple.Tuple;
37 import org.apache.storm.tuple.Values;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import java.util.HashMap;
44 import java.util.concurrent.ConcurrentHashMap;
55 extends BaseStatefulBolt<InMemoryKeyValueState<SwitchId, Map<String, Set<Long>>>>
60 private static final Logger logger = LoggerFactory.getLogger(
TransactionBolt.class);
69 private InMemoryKeyValueState<SwitchId, Map<String, Set<Long>>> transactions;
71 private TopologyContext context;
72 private OutputCollector outputCollector;
81 logger.trace(
"States before: {}", transactions);
87 String flowId = (String) tuple.getValueByField(
Utils.
FLOW_ID);
89 Map<String, Set<Long>> flowTransactions;
90 Set<Long> flowTransactionIds;
94 logger.debug(
"Request tuple={}", tuple);
96 switch (componentId) {
98 case TOPOLOGY_ENGINE_BOLT:
99 logger.info(
"Transaction from TopologyEngine: switch-id={}, {}={}, {}={}",
102 flowTransactions = transactions.get(switchId);
103 if (flowTransactions == null) {
104 flowTransactions =
new ConcurrentHashMap<>();
105 transactions.put(switchId, flowTransactions);
108 flowTransactionIds = flowTransactions.get(flowId);
109 if (flowTransactionIds == null) {
110 flowTransactionIds =
new ConcurrentHashSet<>();
111 flowTransactions.put(flowId, flowTransactionIds);
114 if (!flowTransactionIds.add(transactionId)) {
115 throw new RuntimeException(
116 String.format(
"Transaction adding failure: id %d already exists", transactionId));
125 values =
new Values(message);
126 outputCollector.emit(streamId.toString(), tuple, values);
131 logger.info(
"Transaction from Speaker: switch-id={}, {}={}, {}={}",
134 flowTransactions = transactions.get(switchId);
135 if (flowTransactions != null) {
137 flowTransactionIds = flowTransactions.get(flowId);
138 if (flowTransactionIds != null) {
140 if (flowTransactionIds.remove(transactionId)) {
142 if (flowTransactionIds.isEmpty()) {
148 "Flow transaction completed for one switch " 149 +
"(switch: {}, flow: {}, stream: {})", switchId, flowId, streamId);
154 flowTransactions.remove(flowId);
156 logger.debug(
"Transaction {} not empty yet, count = {}",
157 transactionId, flowTransactionIds.size()
161 logger.warn(
"Transaction removing: transaction id not found");
164 logger.warn(
"Transaction removing failure: flow id not found");
166 if (flowTransactions.isEmpty()) {
167 transactions.delete(switchId);
170 logger.warn(
"Transaction removing failure: switch id not found");
175 logger.debug(
"Skip undefined message: message={}", tuple);
178 }
catch (RuntimeException exception) {
179 logger.error(
"Set status {}: switch-id={}, {}={}, {}={}",
184 }
catch (Exception e) {
185 logger.error(String.format(
"Unhandled exception in %s", getClass().getName()), e);
187 outputCollector.ack(tuple);
189 logger.debug(
"Transaction message ack: component={}, stream={}, tuple={}, values={}",
190 tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
193 logger.trace(
"States after: {}", transactions);
201 transactions = state;
220 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
221 this.context = topologyContext;
222 this.outputCollector = outputCollector;
227 Map<SwitchId, Map<String, Set<Long>>> dump =
new HashMap<>();
228 for (Map.Entry<
SwitchId, Map<String, Set<Long>>> item : transactions) {
229 dump.put(item.getKey(), item.getValue());
252 return outputCollector;
static final String STREAM_ID_CTRL
OutputCollector getOutput()
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final String SWITCH_ID_FIELD
static final String MESSAGE_FIELD
void execute(Tuple tuple)
AbstractDumpState dumpStateBySwitchId(SwitchId switchId)
TopologyContext getContext()
static final Fields fieldMessage
static final String TRANSACTION_ID
void initState(InMemoryKeyValueState< SwitchId, Map< String, Set< Long >>> state)
static boolean boltHandlerEntrance(ICtrlBolt bolt, Tuple tuple)
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)
AbstractDumpState dumpState()
static final String FLOW_ID
static final Fields fieldsFlowIdStatus