16 package org.openkilda.wfm.topology.flow.bolts;
24 import org.apache.storm.topology.OutputFieldsDeclarer;
25 import org.apache.storm.tuple.Fields;
26 import org.apache.storm.tuple.Tuple;
27 import org.apache.storm.tuple.Values;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
39 private static final Logger logger = LoggerFactory.getLogger(
VerificationBolt.class);
48 String
source = input.getSourceComponent();
53 consumePingReply(input);
55 logger.warn(
"Unexpected input from {} - is topology changes without code change?",
source);
59 private void proxyRequest(Tuple input) {
60 Values proxyData =
new Values(
67 private void consumePingReply(Tuple input) {
68 UniFlowVerificationResponse response = fetchUniFlowResponse(input);
69 Values payload =
new Values(response.getFlowId(), response, null);
73 private UniFlowVerificationResponse fetchUniFlowResponse(Tuple input) {
74 UniFlowVerificationResponse
value;
76 value = (UniFlowVerificationResponse) input.getValueByField(SpeakerBolt.FIELD_ID_PAYLOAD);
77 }
catch (ClassCastException e) {
78 throw new IllegalArgumentException(
79 String.format(
"Can't deserialize into %s", UniFlowVerificationResponse.class.getName()), e);
void declareOutputFields(OutputFieldsDeclarer outputManager)
OutputCollector getOutput()
static final String FIELD_ID_BIFLOW
static final String FIELD_ID_FLOW_ID
static final String FIELD_ID_FLOW_ID
static final String MESSAGE_FIELD
static final String FIELD_ID_OUTPUT
void handleInput(Tuple input)
static final String FIELD_ID_MESSAGE
static final String STREAM_ID_PROXY
static final String FIELD_ID_INPUT
static final Fields STREAM_FIELDS_PROXY
static final String FLOW_ID