Open Kilda Java Documentation
LcmFlowCacheSyncBolt.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.flow.bolts;
2 
9 
10 import com.fasterxml.jackson.core.JsonProcessingException;
11 import org.apache.storm.task.OutputCollector;
12 import org.apache.storm.task.TopologyContext;
13 import org.apache.storm.topology.OutputFieldsDeclarer;
14 import org.apache.storm.topology.base.BaseRichBolt;
15 import org.apache.storm.tuple.Fields;
16 import org.apache.storm.tuple.Tuple;
17 import org.apache.storm.tuple.Values;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27 
28 public class LcmFlowCacheSyncBolt extends BaseRichBolt {
29  private static final Logger logger = LoggerFactory.getLogger(LcmFlowCacheSyncBolt.class);
30 
31  public static final String FIELD_ID_MESSAGE = "message";
32  public static final String FIELD_ID_JSON = "json";
33  public static final String FIELD_ID_NETWORK_DUMP = "network-dump";
34 
35  public static final String STREAM_ID_TPE = "kafka_into_TE";
36  public static final String STREAM_ID_SYNC_FLOW_CACHE = "sync.flow_cache";
37 
38  private TopologyContext context;
39  private OutputCollector output;
40 
41  private final Map<String, LcmSyncTrace> pendingSyncRequests = new HashMap<>();
42  private final List<String> managedSpouts;
43 
44  public LcmFlowCacheSyncBolt(String ... managedSpouts) {
45  this.managedSpouts = Arrays.asList(managedSpouts);
46  }
47 
48  @Override
49  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
50  this.context = context;
51  this.output = collector;
52  }
53 
54  @Override
55  public void execute(Tuple input) {
56  logger.debug("{} - execute", context.getThisComponentId());
57  String componentId = input.getSourceComponent();
58 
59  try {
60  if (managedSpouts.contains(componentId)) {
61  handleManagedSpout(input);
62  } else {
63  handleInputData(input);
64  }
65  } catch (Exception e) {
66  logger.error("{} - handler exception", context.getThisComponentId(), e);
67  output.reportError(e);
68  output.fail(input);
69  }
70  }
71 
72  @Override
73  public void declareOutputFields(OutputFieldsDeclarer declarer) {
74  declarer.declareStream(STREAM_ID_TPE, new Fields(FIELD_ID_MESSAGE));
75  declarer.declareStream(STREAM_ID_SYNC_FLOW_CACHE, new Fields(FIELD_ID_JSON, FIELD_ID_NETWORK_DUMP));
76  }
77 
78  private void handleManagedSpout(Tuple input) {
79  logger.info("{} - handle managed spout", context.getThisComponentId());
80  LcmSyncTrace syncTrace = new LcmSyncTrace(input);
81 
82  CommandMessage syncRequest = makeSyncRequest(syncTrace);
83  String json;
84  try {
85  json = Utils.MAPPER.writeValueAsString(syncRequest);
86  } catch (JsonProcessingException e) {
87  logger.error("Can serialise sync request `{}` into JSON: {}", syncRequest, e);
88 
89  output.fail(input);
90  return;
91  }
92 
93  logger.info("{} - emit sync request: {}", context.getThisComponentId(), json);
94  output.emit(STREAM_ID_TPE, input, new Values(json));
95  pendingSyncRequests.put(input.getSourceComponent(), syncTrace);
96  }
97 
98  private void handleInputData(Tuple input) {
99  logger.info("{} - handle data stream", context.getThisComponentId());
100  try {
101  Message raw;
102  String json;
103  try {
104  json = input.getString(0);
105  raw = Utils.MAPPER.readValue(json, Message.class);
106  } catch (IndexOutOfBoundsException|IOException e) {
107  logger.debug("Skip non deserializable record {}: {}", input, e);
108  return;
109  }
110 
111  UUID correlationId;
112  try {
113  correlationId = UUID.fromString(raw.getCorrelationId());
114  } catch (IllegalArgumentException e) {
115  logger.debug("Skip record due to malformed correlation id: {}", raw.getCorrelationId());
116  return;
117  }
118 
119  LcmSyncTrace pendingSync = popPendingSyncRequest(correlationId);
120  if (pendingSync != null) {
121  Tuple syncRequest = pendingSync.getSyncRequest();
122  logger.info("Got sync response for spout {}", syncRequest.getSourceComponent());
123 
124  InfoMessage envelope = (InfoMessage) raw;
125  output.emit(STREAM_ID_SYNC_FLOW_CACHE, input, new Values(json, envelope.getData()));
126  output.ack(syncRequest);
127  } else {
128  logger.debug("{} - there is no pending sync request with correlation-id: {}",
129  context.getThisComponentId(), correlationId);
130  }
131  } finally {
132  output.ack(input);
133  }
134  }
135 
136  private LcmSyncTrace popPendingSyncRequest(UUID correlationId) {
137  LcmSyncTrace match = null;
138 
139  for (String componentID : pendingSyncRequests.keySet()) {
140  LcmSyncTrace trace = pendingSyncRequests.get(componentID);
141  if (!correlationId.equals(trace.getCorrelationId())) {
142  continue;
143  }
144 
145  match = trace;
146  pendingSyncRequests.remove(componentID);
147  break;
148  }
149 
150  return match;
151  }
152 
153  private CommandMessage makeSyncRequest(LcmSyncTrace syncTrace) {
154  String correlationId = syncTrace.getCorrelationId().toString();
155  return new CommandMessage(new FlowsSyncRequest(),
156  System.currentTimeMillis(), correlationId, Destination.TOPOLOGY_ENGINE);
157  }
158 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)