Open Kilda Java Documentation
RouteAction.java
Go to the documentation of this file.
1 package org.openkilda.wfm.ctrl;
2 
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.google.common.base.Strings;
5 import org.apache.storm.tuple.Tuple;
12 
13 import java.nio.file.FileSystem;
14 import java.nio.file.FileSystems;
15 import java.nio.file.Path;
16 import java.nio.file.PathMatcher;
17 import java.util.List;
18 import java.util.Map;
19 
20 public class RouteAction extends AbstractAction {
21  private String topologyName;
22  private Map<String, String> endpoints;
23 
24  public RouteAction(
25  IKildaBolt bolt, Tuple tuple,
26  String topologyName, Map<String, String> endpoint) {
27  super(bolt, tuple);
28 
29  this.topologyName = topologyName;
30  this.endpoints = endpoint;
31  }
32 
33  @Override
34  protected void handle() throws MessageFormatException, JsonProcessingException {
35  KafkaMessage input = new KafkaMessage(getTuple());
36  Message payload = input.getPayload();
37 
38  if (! (payload instanceof CtrlRequest)) {
39  getLogger().debug(String.format(
40  "Skip foreign message (correlation-id: %s timestamp: %s)",
41  payload.getCorrelationId(), payload.getTimestamp()));
42  return;
43  }
44 
45  handleMessage((CtrlRequest)payload);
46  }
47 
48  private void handleMessage(CtrlRequest payload) throws JsonProcessingException {
49  RouteMessage message = new RouteMessage(
50  payload.getData(), payload.getCorrelationId(), topologyName);
51  List<Object> packedMessage = message.pack();
52 
53  String glob = payload.getRoute();
54 
55  if (Strings.isNullOrEmpty(glob)) {
56  glob = "**";
57  } else if (glob.equals("*")) {
58  glob = "**";
59  }
60 
61  FileSystem fs = FileSystems.getDefault();
62  PathMatcher matcher = fs.getPathMatcher("glob:" + glob);
63 
64  for (String bolt : endpoints.keySet()) {
65  Path route = fs.getPath(topologyName, bolt);
66 
67  if (! matcher.matches(route)) {
68  continue;
69  }
70 
71  getOutputCollector().emit(
72  endpoints.get(bolt), getTuple(), packedMessage);
73  }
74  }
75 }
RouteAction(IKildaBolt bolt, Tuple tuple, String topologyName, Map< String, String > endpoint)