16 package org.openkilda.wfm.topology.nbworker.bolts;
29 import org.apache.storm.topology.OutputFieldsDeclarer;
30 import org.apache.storm.tuple.Fields;
31 import org.apache.storm.tuple.Tuple;
32 import org.apache.storm.tuple.Values;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.io.IOException;
41 private static final Logger LOGGER = LoggerFactory.getLogger(
RouterBolt.class);
45 String request = input.getString(0);
50 }
catch (IOException e) {
51 LOGGER.error(
"Error during parsing request for NBWorker topology", e);
56 LOGGER.debug(
"Received command message {}", message);
69 private void processRequest(Tuple input,
BaseRequest request, String correlationId) {
72 }
else if (request instanceof LinksBaseRequest) {
74 }
else if (request instanceof FlowsBaseRequest) {
75 getOutput().emit(StreamType.FLOW.toString(), input,
new Values(request, correlationId));
83 declarer.declareStream(
StreamType.
SWITCH.toString(),
new Fields(
"request",
"correlationId"));
84 declarer.declareStream(
StreamType.
ISL.toString(),
new Fields(
"request",
"correlationId"));
85 declarer.declareStream(
StreamType.
FLOW.toString(),
new Fields(
"request",
"correlationId"));
OutputCollector getOutput()
static final ObjectMapper MAPPER
void declareOutputFields(OutputFieldsDeclarer declarer)
def command(payload, fields)
void unhandledInput(Tuple input)
String getCorrelationId()
void handleInput(Tuple input)