1 package org.openkilda.wfm.topology.flow.bolts;
10 import com.fasterxml.jackson.core.JsonProcessingException;
11 import org.apache.storm.task.OutputCollector;
12 import org.apache.storm.task.TopologyContext;
13 import org.apache.storm.topology.OutputFieldsDeclarer;
14 import org.apache.storm.topology.base.BaseRichBolt;
15 import org.apache.storm.tuple.Fields;
16 import org.apache.storm.tuple.Tuple;
17 import org.apache.storm.tuple.Values;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.UUID;
38 private TopologyContext context;
39 private OutputCollector output;
41 private final Map<String, LcmSyncTrace> pendingSyncRequests =
new HashMap<>();
42 private final List<String> managedSpouts;
45 this.managedSpouts = Arrays.asList(managedSpouts);
49 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
50 this.context = context;
51 this.output = collector;
56 logger.debug(
"{} - execute", context.getThisComponentId());
57 String componentId = input.getSourceComponent();
60 if (managedSpouts.contains(componentId)) {
61 handleManagedSpout(input);
63 handleInputData(input);
65 }
catch (Exception e) {
66 logger.error(
"{} - handler exception", context.getThisComponentId(), e);
67 output.reportError(e);
78 private void handleManagedSpout(Tuple input) {
79 logger.info(
"{} - handle managed spout", context.getThisComponentId());
85 json =
Utils.
MAPPER.writeValueAsString(syncRequest);
86 }
catch (JsonProcessingException e) {
87 logger.error(
"Can serialise sync request `{}` into JSON: {}", syncRequest, e);
93 logger.info(
"{} - emit sync request: {}", context.getThisComponentId(), json);
95 pendingSyncRequests.put(input.getSourceComponent(), syncTrace);
98 private void handleInputData(Tuple input) {
99 logger.info(
"{} - handle data stream", context.getThisComponentId());
104 json = input.getString(0);
105 raw = Utils.MAPPER.readValue(json,
Message.class);
106 }
catch (IndexOutOfBoundsException|IOException e) {
107 logger.debug(
"Skip non deserializable record {}: {}", input, e);
114 }
catch (IllegalArgumentException e) {
115 logger.debug(
"Skip record due to malformed correlation id: {}", raw.
getCorrelationId());
119 LcmSyncTrace pendingSync = popPendingSyncRequest(correlationId);
120 if (pendingSync != null) {
121 Tuple syncRequest = pendingSync.getSyncRequest();
122 logger.info(
"Got sync response for spout {}", syncRequest.getSourceComponent());
124 InfoMessage envelope = (InfoMessage) raw;
126 output.ack(syncRequest);
128 logger.debug(
"{} - there is no pending sync request with correlation-id: {}",
129 context.getThisComponentId(), correlationId);
136 private LcmSyncTrace popPendingSyncRequest(UUID correlationId) {
137 LcmSyncTrace match = null;
139 for (String componentID : pendingSyncRequests.keySet()) {
140 LcmSyncTrace trace = pendingSyncRequests.get(componentID);
141 if (!correlationId.equals(trace.getCorrelationId())) {
146 pendingSyncRequests.remove(componentID);
153 private CommandMessage makeSyncRequest(LcmSyncTrace syncTrace) {
154 String correlationId = syncTrace.getCorrelationId().toString();
155 return new CommandMessage(
new FlowsSyncRequest(),
156 System.currentTimeMillis(), correlationId, Destination.TOPOLOGY_ENGINE);
void declareOutputFields(OutputFieldsDeclarer declarer)
static final ObjectMapper MAPPER
static final String FIELD_ID_JSON
static final String FIELD_ID_MESSAGE
void execute(Tuple input)
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final String FIELD_ID_NETWORK_DUMP
static final String STREAM_ID_SYNC_FLOW_CACHE
LcmFlowCacheSyncBolt(String ... managedSpouts)
static final String STREAM_ID_TPE
String getCorrelationId()