Open Kilda Java Documentation
VerificationJointBolt.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
32 
33 import com.fasterxml.jackson.core.JsonProcessingException;
34 import org.apache.storm.topology.OutputFieldsDeclarer;
35 import org.apache.storm.tuple.Fields;
36 import org.apache.storm.tuple.Tuple;
37 import org.apache.storm.tuple.Values;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 
41 import java.util.ArrayList;
42 import java.util.LinkedList;
43 import java.util.List;
44 import java.util.ListIterator;
45 
46 public class VerificationJointBolt extends AbstractBolt {
47  public static final String FIELD_ID_RESPONSE = AbstractTopology.MESSAGE_FIELD;
48 
49  public static final String STREAM_SPEAKER_ID = "request";
50  public static final Fields STREAM_SPEAKER_FIELDS = AbstractTopology.fieldMessage;
51  public static final String STREAM_RESPONSE_ID = "response";
52  public static final Fields STREAM_RESPONSE_FIELDS = new Fields(FIELD_ID_RESPONSE);
53 
54  private static final Logger logger = LoggerFactory.getLogger(VerificationJointBolt.class);
55 
56  private final LinkedList<VerificationWaitRecord> ongoingVerifications = new LinkedList<>();
57 
58  @Override
59  protected void handleInput(Tuple input) {
60  logger.debug("Verification joint - dispatching");
61  Object unclassified = input.getValueByField(VerificationBolt.FIELD_ID_OUTPUT);
62 
63  if (unclassified instanceof BidirectionalFlow) {
64  handleRequest(input, (BidirectionalFlow) unclassified);
65  } else if (unclassified instanceof UniFlowVerificationResponse) {
66  handleResponse(input, (UniFlowVerificationResponse) unclassified);
67  } else {
68  logger.warn(
69  "Unexpected input {} - is topology changes without code change?",
70  unclassified.getClass().getName());
71  }
72  }
73 
74  private void handleRequest(Tuple input, BidirectionalFlow biFlow) {
75  logger.debug("Handling VERIFICATION request");
76 
77  CommandMessage message = fetchInputMessage(input);
78  FlowVerificationRequest request = fetchVerificationRequest(message);
79  VerificationWaitRecord waitRecord = new VerificationWaitRecord(request, biFlow, message.getCorrelationId());
80 
81  List<UniFlowVerificationRequest> pendingRequests = waitRecord.getPendingRequests();
82  List<String> jsonMessages = new ArrayList<>(pendingRequests.size());
83  try {
84  for (UniFlowVerificationRequest uniFlowVerificationRequest : pendingRequests) {
85  CommandMessage floodlightMessage = new CommandMessage(
86  uniFlowVerificationRequest, System.currentTimeMillis(), message.getCorrelationId(),
88  String s = MAPPER.writeValueAsString(floodlightMessage);
89  jsonMessages.add(s);
90  }
91  } catch (JsonProcessingException e) {
92  logger.error("Can't encode {}: {}", UniFlowVerificationRequest.class, e);
93  return;
94  }
95 
96  for (String json : jsonMessages) {
97  getOutput().emit(STREAM_SPEAKER_ID, input, new Values(json));
98  }
99 
100  ongoingVerifications.addLast(waitRecord);
101  }
102 
103  private void handleResponse(Tuple input, UniFlowVerificationResponse response) {
104  logger.debug("Handling VERIFICATION response");
105 
106  ListIterator<VerificationWaitRecord> iter = ongoingVerifications.listIterator();
107 
108  long currentTime = System.currentTimeMillis();
109  while (iter.hasNext()) {
110  VerificationWaitRecord waitRecord = iter.next();
111 
112  if (waitRecord.isOutdated(currentTime)) {
113  iter.remove();
114  produceErrorResponse(input, waitRecord);
115  continue;
116  }
117 
118  if (! waitRecord.consumeResponse(response)) {
119  continue;
120  }
121  if (! waitRecord.isFilled()) {
122  continue;
123  }
124 
125  iter.remove();
126  produceResponse(input, waitRecord);
127 
128  break;
129  }
130  }
131 
132  private CommandMessage fetchInputMessage(Tuple input) {
133  Object raw = input.getValueByField(VerificationBolt.FIELD_ID_INPUT);
134  if (raw == null) {
135  throw new IllegalArgumentException("The message field is empty in input tuple");
136  }
137 
138  CommandMessage value;
139  try {
140  value = (CommandMessage) raw;
141  } catch (ClassCastException e) {
142  throw new IllegalArgumentException(String.format("Can't convert value into Message: %s", e));
143  }
144  return value;
145  }
146 
147  private FlowVerificationRequest fetchVerificationRequest(CommandMessage message) {
148  FlowVerificationRequest value;
149  try {
150  value = (FlowVerificationRequest) message.getData();
151  } catch (ClassCastException e) {
152  throw new IllegalArgumentException(String.format(
153  "Can't fetch flow VERIFICATION request from CommandMessage: %s", e));
154  }
155  return value;
156  }
157 
158  private void produceResponse(Tuple input, VerificationWaitRecord waitRecord) {
159  FlowVerificationResponse response = waitRecord.produce();
160  InfoMessage northboundMessage = new InfoMessage(
161  response, System.currentTimeMillis(), waitRecord.getCorrelationId());
162  getOutput().emit(STREAM_RESPONSE_ID, input, new Values(northboundMessage));
163  }
164 
165  private void produceErrorResponse(Tuple input, VerificationWaitRecord waitRecord) {
166  waitRecord.fillPendingWithError(FlowVerificationErrorCode.NO_SPEAKER_RESPONSE);
167  produceResponse(input, waitRecord);
168  }
169 
170  @Override
171  public void declareOutputFields(OutputFieldsDeclarer outputManager) {
172  outputManager.declareStream(STREAM_SPEAKER_ID, STREAM_SPEAKER_FIELDS);
173  outputManager.declareStream(STREAM_RESPONSE_ID, STREAM_RESPONSE_FIELDS);
174  }
175 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
value
Definition: nodes.py:62