16 package org.openkilda.wfm.topology.flow.bolts;
33 import com.fasterxml.jackson.core.JsonProcessingException;
34 import org.apache.storm.topology.OutputFieldsDeclarer;
35 import org.apache.storm.tuple.Fields;
36 import org.apache.storm.tuple.Tuple;
37 import org.apache.storm.tuple.Values;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import java.util.ArrayList;
42 import java.util.LinkedList;
43 import java.util.List;
44 import java.util.ListIterator;
56 private final LinkedList<VerificationWaitRecord> ongoingVerifications =
new LinkedList<>();
60 logger.debug(
"Verification joint - dispatching");
69 "Unexpected input {} - is topology changes without code change?",
70 unclassified.getClass().getName());
75 logger.debug(
"Handling VERIFICATION request");
82 List<String> jsonMessages =
new ArrayList<>(pendingRequests.size());
86 uniFlowVerificationRequest, System.currentTimeMillis(), message.
getCorrelationId(),
88 String s = MAPPER.writeValueAsString(floodlightMessage);
91 }
catch (JsonProcessingException e) {
92 logger.error(
"Can't encode {}: {}", UniFlowVerificationRequest.class, e);
96 for (String json : jsonMessages) {
100 ongoingVerifications.addLast(waitRecord);
103 private void handleResponse(Tuple input, UniFlowVerificationResponse response) {
104 logger.debug(
"Handling VERIFICATION response");
106 ListIterator<VerificationWaitRecord> iter = ongoingVerifications.listIterator();
108 long currentTime = System.currentTimeMillis();
109 while (iter.hasNext()) {
110 VerificationWaitRecord waitRecord = iter.next();
112 if (waitRecord.isOutdated(currentTime)) {
114 produceErrorResponse(input, waitRecord);
118 if (! waitRecord.consumeResponse(response)) {
121 if (! waitRecord.isFilled()) {
126 produceResponse(input, waitRecord);
132 private CommandMessage fetchInputMessage(Tuple input) {
133 Object raw = input.getValueByField(VerificationBolt.FIELD_ID_INPUT);
135 throw new IllegalArgumentException(
"The message field is empty in input tuple");
138 CommandMessage
value;
140 value = (CommandMessage) raw;
141 }
catch (ClassCastException e) {
142 throw new IllegalArgumentException(String.format(
"Can't convert value into Message: %s", e));
147 private FlowVerificationRequest fetchVerificationRequest(CommandMessage message) {
148 FlowVerificationRequest
value;
150 value = (FlowVerificationRequest) message.getData();
151 }
catch (ClassCastException e) {
152 throw new IllegalArgumentException(String.format(
153 "Can't fetch flow VERIFICATION request from CommandMessage: %s", e));
158 private void produceResponse(Tuple input, VerificationWaitRecord waitRecord) {
159 FlowVerificationResponse response = waitRecord.produce();
160 InfoMessage northboundMessage =
new InfoMessage(
161 response, System.currentTimeMillis(), waitRecord.getCorrelationId());
165 private void produceErrorResponse(Tuple input, VerificationWaitRecord waitRecord) {
166 waitRecord.fillPendingWithError(FlowVerificationErrorCode.NO_SPEAKER_RESPONSE);
167 produceResponse(input, waitRecord);
OutputCollector getOutput()
static final ObjectMapper MAPPER
static final String STREAM_RESPONSE_ID
List< UniFlowVerificationRequest > getPendingRequests()
static final String MESSAGE_FIELD
static final Fields fieldMessage
static final String FIELD_ID_OUTPUT
static final Fields STREAM_RESPONSE_FIELDS
void declareOutputFields(OutputFieldsDeclarer outputManager)
void handleInput(Tuple input)
static final Fields STREAM_SPEAKER_FIELDS
String getCorrelationId()
static final String STREAM_SPEAKER_ID
static final String FIELD_ID_RESPONSE