16 package org.openkilda.wfm.topology.nbworker.bolts;
25 import com.fasterxml.jackson.core.JsonProcessingException;
26 import org.apache.commons.collections4.CollectionUtils;
27 import org.apache.storm.topology.OutputFieldsDeclarer;
28 import org.apache.storm.tuple.Tuple;
29 import org.apache.storm.tuple.Values;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import java.util.ArrayList;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.UUID;
44 List<InfoData> responses = (List<InfoData>) input.getValueByField(
"response");
45 String correlationId = input.getStringByField(
"correlationId");
46 LOGGER.debug(
"Received response correlationId {}", correlationId);
48 sendChunkedResponse(responses, input, correlationId);
51 private void sendChunkedResponse(List<InfoData> responses, Tuple input, String requestId) {
52 List<Message> messages =
new ArrayList<>();
53 if (CollectionUtils.isEmpty(responses)) {
54 LOGGER.debug(
"No records found in the database");
56 messages.add(message);
58 String currentRequestId = requestId;
60 Iterator<InfoData> iterator = responses.iterator();
61 while (iterator.hasNext()) {
64 if (iterator.hasNext()) {
65 nextRequestId = UUID.randomUUID().
toString();
70 Message message =
new ChunkedInfoMessage(infoData, System.currentTimeMillis(), currentRequestId,
72 messages.add(message);
73 currentRequestId = nextRequestId;
76 LOGGER.debug(
"Response is divided into {} messages", messages.size());
80 for (Message message : messages) {
82 getOutput().emit(input,
new Values(Utils.MAPPER.writeValueAsString(message)));
83 }
catch (JsonProcessingException e) {
84 LOGGER.error(
"Error during writing response as json", e);
OutputCollector getOutput()
static final Fields fieldMessage
void declareOutputFields(OutputFieldsDeclarer declarer)
void handleInput(Tuple input)