Open Kilda Java Documentation
StatsTopologyTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats;
17 
18 import static java.util.stream.Collectors.toList;
19 import static org.hamcrest.CoreMatchers.is;
20 import static org.hamcrest.CoreMatchers.startsWith;
21 import static org.hamcrest.MatcherAssert.assertThat;
23 import static org.openkilda.messaging.Utils.MAPPER;
24 
46 
47 import org.apache.storm.Testing;
48 import org.apache.storm.generated.StormTopology;
49 import org.apache.storm.kafka.bolt.KafkaBolt;
50 import org.apache.storm.testing.FixedTuple;
51 import org.apache.storm.testing.MockedSources;
52 import org.apache.storm.topology.TopologyBuilder;
53 import org.apache.storm.tuple.Values;
54 import org.junit.AfterClass;
55 import org.junit.Assert;
56 import org.junit.BeforeClass;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.neo4j.graphdb.GraphDatabaseService;
60 import org.neo4j.graphdb.Label;
61 import org.neo4j.graphdb.Node;
62 import org.neo4j.graphdb.Relationship;
63 import org.neo4j.graphdb.RelationshipType;
64 import org.neo4j.graphdb.Transaction;
65 
66 import java.io.IOException;
67 import java.util.ArrayList;
68 import java.util.Arrays;
69 import java.util.Collections;
70 import java.util.HashSet;
71 import java.util.List;
72 import java.util.Map;
73 import java.util.Properties;
74 import java.util.stream.IntStream;
75 
77 
78  private static final long timestamp = System.currentTimeMillis();
79 
80  private final SwitchId switchId = new SwitchId(1L);
81  private final long cookie = 0x4000000000000001L;
82  private final String flowId = "f253423454343";
83 
84  private static Neo4jFixture fakeNeo4jDb;
85 
86  private static LaunchEnvironment launchEnvironment;
87 
88  @BeforeClass
89  public static void setupOnce() throws Exception {
91  fakeNeo4jDb = new Neo4jFixture(fsData.getRoot().toPath(), NEO4J_LISTEN_ADDRESS);
92  fakeNeo4jDb.start();
93  launchEnvironment = makeLaunchEnvironment();
94  Properties configOverlay = new Properties();
95  configOverlay.setProperty("neo4j.hosts", fakeNeo4jDb.getListenAddress());
96 
97  launchEnvironment.setupOverlay(configOverlay);
98  }
99 
100 
101  @AfterClass
102  public static void teardownOnce() throws Exception {
103  fakeNeo4jDb.stop();
104  }
105 
106  @Ignore
107  @Test
108  public void portStatsTest() throws Exception {
109  final SwitchId switchId = new SwitchId(1L);
110  final List<PortStatsEntry> entries = IntStream.range(1, 53).boxed().map(port -> {
111  int baseCount = port * 20;
112  return new PortStatsEntry(port, baseCount, baseCount + 1, baseCount + 2, baseCount + 3,
113  baseCount + 4, baseCount + 5, baseCount + 6, baseCount + 7,
114  baseCount + 8, baseCount + 9, baseCount + 10, baseCount + 11);
115  }).collect(toList());
116  final List<PortStatsReply> replies = Collections.singletonList(new PortStatsReply(1, entries));
117  InfoMessage message = new InfoMessage(new PortStatsData(switchId, replies), timestamp, CORRELATION_ID,
119 
120  //mock kafka spout
121  MockedSources sources = new MockedSources();
122  sources.addMockData(StatsComponentType.STATS_OFS_KAFKA_SPOUT.toString(),
123  new Values(MAPPER.writeValueAsString(message)));
124  completeTopologyParam.setMockedSources(sources);
125 
126  //execute topology
127  Testing.withTrackedCluster(clusterParam, (cluster) -> {
128  StatsTopology topology = new TestingTargetTopology(launchEnvironment, new TestingKafkaBolt());
129  StormTopology stormTopology = topology.createTopology();
130 
131  //verify results
132  Map result = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
133  ArrayList<FixedTuple> tuples =
134  (ArrayList<FixedTuple>) result.get(StatsComponentType.PORT_STATS_METRIC_GEN.name());
135  assertThat(tuples.size(), is(728));
136  tuples.stream()
137  .map(this::readFromJson)
138  .forEach(datapoint -> {
139  assertThat(datapoint.getTags().get("switchId"), is(switchId.toString().replaceAll(":", "")));
140  assertThat(datapoint.getTime(), is(timestamp));
141  assertThat(datapoint.getMetric(), startsWith("pen.switch"));
142  });
143  });
144  }
145 
146  @Test
147  public void meterConfigStatsTest() throws Exception {
148  final SwitchId switchId = new SwitchId(1L);
149  final List<MeterConfigReply> stats =
150  Collections.singletonList(new MeterConfigReply(2, Arrays.asList(1L, 2L, 3L)));
151  InfoMessage message = new InfoMessage(new MeterConfigStatsData(switchId, stats), timestamp, CORRELATION_ID,
153 
154  //mock kafka spout
155  MockedSources sources = new MockedSources();
156  sources.addMockData(StatsComponentType.STATS_OFS_KAFKA_SPOUT.toString(),
157  new Values(MAPPER.writeValueAsString(message)));
158  sources.addMockData(StatsComponentType.STATS_KILDA_SPEAKER_SPOUT.name(),
159  new Values(MAPPER.writeValueAsString(message))
160  );
161 
162  completeTopologyParam.setMockedSources(sources);
163 
164  //execute topology
165  Testing.withTrackedCluster(clusterParam, (cluster) -> {
166  StatsTopology topology = new TestingTargetTopology(launchEnvironment, new TestingKafkaBolt());
167  StormTopology stormTopology = topology.createTopology();
168 
169  //verify results
170  Map result = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
171  ArrayList<FixedTuple> tuples =
172  (ArrayList<FixedTuple>) result.get(StatsComponentType.METER_CFG_STATS_METRIC_GEN.name());
173  assertThat(tuples.size(), is(3));
174  tuples.stream()
175  .map(this::readFromJson)
176  .forEach(datapoint -> {
177  assertThat(datapoint.getTags().get("switchid"),
178  is(switchId.toOtsdFormat()));
179  assertThat(datapoint.getTime(), is(timestamp));
180  assertThat(datapoint.getMetric(), is("pen.switch.meters"));
181  });
182  });
183  }
184 
185  @Test
186  public void flowStatsTest() throws Exception {
187  //mock kafka spout
188  MockedSources sources = new MockedSources();
189 
190  GraphDatabaseService graphDatabaseService = fakeNeo4jDb.getGraphDatabaseService();
191 
192  try (Transaction tx = graphDatabaseService.beginTx()) {
193  Node node1 = graphDatabaseService.createNode(Label.label("switch"));
194  node1.setProperty("name", switchId.toString());
195  Relationship rel1 = node1.createRelationshipTo(node1, RelationshipType.withName("flow"));
196  rel1.setProperty("flowid", flowId);
197  rel1.setProperty("cookie", cookie);
198  rel1.setProperty("meter_id", 2);
199  rel1.setProperty("transit_vlan", 1);
200  rel1.setProperty("src_switch", switchId.toString());
201  rel1.setProperty("dst_switch", switchId.toString());
202  rel1.setProperty("src_port", 1);
203  rel1.setProperty("dst_port", 2);
204  rel1.setProperty("src_vlan", 5);
205  rel1.setProperty("dst_vlan", 5);
206  rel1.setProperty("path", "\"{\"path\": [], \"latency_ns\": 0, \"timestamp\": 1522528031909}\"");
207  rel1.setProperty("bandwidth", 200);
208  rel1.setProperty("ignore_bandwidth", true);
209  rel1.setProperty("description", "description");
210  rel1.setProperty("last_updated", "last_updated");
211  tx.success();
212  }
213 
214  List<FlowStatsEntry> entries = Collections.singletonList(
215  new FlowStatsEntry((short) 1, cookie, 1500L, 3000L));
216  final List<FlowStatsReply> stats = Collections.singletonList(new FlowStatsReply(3, entries));
217  InfoMessage message = new InfoMessage(new FlowStatsData(switchId, stats),
218  timestamp, CORRELATION_ID, Destination.WFM_STATS);
219 
220  sources.addMockData(StatsComponentType.STATS_OFS_KAFKA_SPOUT.toString(),
221  new Values(MAPPER.writeValueAsString(message)));
222 
223  sources.addMockData(StatsComponentType.STATS_KILDA_SPEAKER_SPOUT.name(),
224  new Values("")
225  );
226 
227  completeTopologyParam.setMockedSources(sources);
228 
229  //execute topology
230  Testing.withTrackedCluster(clusterParam, (cluster) -> {
231  StatsTopology topology = new TestingTargetTopology(launchEnvironment, new TestingKafkaBolt());
232  StormTopology stormTopology = topology.createTopology();
233 
234  Map result = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
235 
236  //verify results which were sent to Kafka bolt
237  ArrayList<FixedTuple> tuples =
238  (ArrayList<FixedTuple>) result.get(StatsComponentType.FLOW_STATS_METRIC_GEN.name());
239  assertThat(tuples.size(), is(9));
240  tuples.stream()
241  .map(this::readFromJson)
242  .forEach(datapoint -> {
243  if (datapoint.getMetric().equals("pen.flow.packets")) {
244  assertThat(datapoint.getTags().get("direction"), is("forward"));
245  }
246  assertThat(datapoint.getTags().get("flowid"), is(flowId));
247  assertThat(datapoint.getTime(), is(timestamp));
248  });
249  });
250  }
251 
252  @Test
253  public void cacheSyncSingleSwitchFlowAdd() throws Exception {
254  final SwitchId switchId = new SwitchId(1L);
255  final String flowId = "sync-test-add-ssf";
256  final InstallOneSwitchFlow payload =
258  0L, flowId, 0xFFFF000000000001L, switchId, 8, 9, 127, 127,
259  OutputVlanType.PUSH, 1000L, 0L);
260  final CommandMessage message = new CommandMessage(payload, timestamp, flowId, Destination.WFM_STATS);
261  final String json = MAPPER.writeValueAsString(message);
262 
263  MockedSources sources = new MockedSources();
264  sources.addMockData(StatsComponentType.STATS_OFS_KAFKA_SPOUT.name());
265  sources.addMockData(StatsComponentType.STATS_KILDA_SPEAKER_SPOUT.name(), new Values(json));
266  completeTopologyParam.setMockedSources(sources);
267 
268  Testing.withTrackedCluster(clusterParam, (cluster) -> {
269  StatsTopology topologyManager = new TestingTargetTopology(launchEnvironment, new TestingKafkaBolt());
270  StormTopology topology = topologyManager.createTopology();
271 
272  Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);
273  List<FixedTuple> cacheSyncStream = (List<FixedTuple>) result.get(
275 
276  final HashSet<MeasurePoint> expectedEvents = new HashSet<>();
277  expectedEvents.add(MeasurePoint.INGRESS);
278  expectedEvents.add(MeasurePoint.EGRESS);
279 
280  final HashSet<MeasurePoint> seenEvents = new HashSet<>();
281  cacheSyncStream.stream()
282  .filter(item -> StatsStreamType.CACHE_UPDATE == StatsStreamType.valueOf(item.stream))
283  .forEach(item -> {
284  Assert.assertEquals(CacheFilterBolt.Commands.UPDATE, item.values.get(0));
285  Assert.assertEquals(flowId, item.values.get(1));
286  Assert.assertEquals(switchId, item.values.get(2));
287  MeasurePoint affectedPoint = (MeasurePoint) item.values.get(4);
288 
289  seenEvents.add(affectedPoint);
290  });
291 
292  Assert.assertEquals(expectedEvents, seenEvents);
293  });
294  }
295 
296  private Datapoint readFromJson(FixedTuple tuple) {
297  try {
298  return Utils.MAPPER.readValue(tuple.values.get(0).toString(), Datapoint.class);
299  } catch (IOException e) {
300  e.printStackTrace();
301  }
302  return null;
303  }
304 
308  private class TestingTargetTopology extends StatsTopology {
309 
310  private KafkaBolt kafkaBolt;
311 
312  TestingTargetTopology(LaunchEnvironment launchEnvironment, KafkaBolt kafkaBolt)
313  throws Exception {
314  super(launchEnvironment);
315  this.kafkaBolt = kafkaBolt;
316  }
317 
318  @Override
319  protected void checkAndCreateTopic(String topic) {
320  }
321 
322  @Override
323  protected void createHealthCheckHandler(TopologyBuilder builder, String prefix) {
324  }
325 
326  @Override
327  public String getDefaultTopologyName() {
328  return StatsTopology.class.getSimpleName().toLowerCase();
329  }
330 
331  @Override
332  protected KafkaBolt createKafkaBolt(String topic) {
333  return kafkaBolt;
334  }
335 
336  }
337 }
void setupOverlay(Properties overlay)
static final ObjectMapper MAPPER
Definition: Utils.java:31
static LaunchEnvironment makeLaunchEnvironment()
static CompleteTopologyParam completeTopologyParam
list result
Definition: plan-d.py:72
static final String CORRELATION_ID
Definition: Utils.java:43
GraphDatabaseService getGraphDatabaseService()