16 package org.openkilda.wfm.topology.stats;
18 import static java.util.stream.Collectors.toList;
19 import static org.hamcrest.CoreMatchers.is;
20 import static org.hamcrest.CoreMatchers.startsWith;
21 import static org.hamcrest.MatcherAssert.assertThat;
47 import org.apache.storm.Testing;
48 import org.apache.storm.generated.StormTopology;
49 import org.apache.storm.kafka.bolt.KafkaBolt;
50 import org.apache.storm.testing.FixedTuple;
51 import org.apache.storm.testing.MockedSources;
52 import org.apache.storm.topology.TopologyBuilder;
53 import org.apache.storm.tuple.Values;
54 import org.junit.AfterClass;
55 import org.junit.Assert;
56 import org.junit.BeforeClass;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.neo4j.graphdb.GraphDatabaseService;
60 import org.neo4j.graphdb.Label;
61 import org.neo4j.graphdb.Node;
62 import org.neo4j.graphdb.Relationship;
63 import org.neo4j.graphdb.RelationshipType;
64 import org.neo4j.graphdb.Transaction;
66 import java.io.IOException;
67 import java.util.ArrayList;
68 import java.util.Arrays;
69 import java.util.Collections;
70 import java.util.HashSet;
71 import java.util.List;
73 import java.util.Properties;
74 import java.util.stream.IntStream;
78 private static final long timestamp = System.currentTimeMillis();
81 private final long cookie = 0x4000000000000001L;
82 private final String flowId =
"f253423454343";
94 Properties configOverlay =
new Properties();
110 final List<PortStatsEntry> entries = IntStream.range(1, 53).boxed().map(
port -> {
111 int baseCount =
port * 20;
112 return new PortStatsEntry(
port, baseCount, baseCount + 1, baseCount + 2, baseCount + 3,
113 baseCount + 4, baseCount + 5, baseCount + 6, baseCount + 7,
114 baseCount + 8, baseCount + 9, baseCount + 10, baseCount + 11);
115 }).collect(toList());
116 final List<PortStatsReply> replies = Collections.singletonList(
new PortStatsReply(1, entries));
121 MockedSources sources =
new MockedSources();
123 new Values(MAPPER.writeValueAsString(message)));
129 StormTopology stormTopology =
topology.createTopology();
133 ArrayList<FixedTuple> tuples =
135 assertThat(tuples.size(), is(728));
137 .map(this::readFromJson)
138 .forEach(datapoint -> {
139 assertThat(datapoint.getTags().get(
"switchId"), is(switchId.
toString().replaceAll(
":",
"")));
140 assertThat(datapoint.getTime(), is(timestamp));
141 assertThat(datapoint.getMetric(), startsWith(
"pen.switch"));
149 final List<MeterConfigReply> stats =
150 Collections.singletonList(
new MeterConfigReply(2, Arrays.asList(1L, 2L, 3L)));
155 MockedSources sources =
new MockedSources();
157 new Values(MAPPER.writeValueAsString(message)));
159 new Values(MAPPER.writeValueAsString(message))
167 StormTopology stormTopology =
topology.createTopology();
171 ArrayList<FixedTuple> tuples =
173 assertThat(tuples.size(), is(3));
175 .map(this::readFromJson)
176 .forEach(datapoint -> {
177 assertThat(datapoint.getTags().get(
"switchid"),
179 assertThat(datapoint.getTime(), is(timestamp));
180 assertThat(datapoint.getMetric(), is(
"pen.switch.meters"));
188 MockedSources sources =
new MockedSources();
192 try (Transaction tx = graphDatabaseService.beginTx()) {
193 Node node1 = graphDatabaseService.createNode(Label.label(
"switch"));
194 node1.setProperty(
"name", switchId.
toString());
195 Relationship rel1 = node1.createRelationshipTo(node1, RelationshipType.withName(
"flow"));
196 rel1.setProperty(
"flowid", flowId);
197 rel1.setProperty(
"cookie", cookie);
198 rel1.setProperty(
"meter_id", 2);
199 rel1.setProperty(
"transit_vlan", 1);
200 rel1.setProperty(
"src_switch", switchId.
toString());
201 rel1.setProperty(
"dst_switch", switchId.
toString());
202 rel1.setProperty(
"src_port", 1);
203 rel1.setProperty(
"dst_port", 2);
204 rel1.setProperty(
"src_vlan", 5);
205 rel1.setProperty(
"dst_vlan", 5);
206 rel1.setProperty(
"path",
"\"{\"path\": [], \"latency_ns\": 0, \"timestamp\": 1522528031909}\"");
207 rel1.setProperty(
"bandwidth", 200);
208 rel1.setProperty(
"ignore_bandwidth",
true);
209 rel1.setProperty(
"description",
"description");
210 rel1.setProperty(
"last_updated",
"last_updated");
214 List<FlowStatsEntry> entries = Collections.singletonList(
216 final List<FlowStatsReply> stats = Collections.singletonList(
new FlowStatsReply(3, entries));
221 new Values(MAPPER.writeValueAsString(message)));
232 StormTopology stormTopology =
topology.createTopology();
237 ArrayList<FixedTuple> tuples =
239 assertThat(tuples.size(), is(9));
241 .map(this::readFromJson)
242 .forEach(datapoint -> {
243 if (datapoint.getMetric().equals(
"pen.flow.packets")) {
244 assertThat(datapoint.getTags().get(
"direction"), is(
"forward"));
246 assertThat(datapoint.getTags().get(
"flowid"), is(flowId));
247 assertThat(datapoint.getTime(), is(timestamp));
255 final String flowId =
"sync-test-add-ssf";
258 0L, flowId, 0xFFFF000000000001L, switchId, 8, 9, 127, 127,
261 final String json = MAPPER.writeValueAsString(message);
263 MockedSources sources =
new MockedSources();
273 List<FixedTuple> cacheSyncStream = (List<FixedTuple>)
result.get(
276 final HashSet<MeasurePoint> expectedEvents =
new HashSet<>();
280 final HashSet<MeasurePoint> seenEvents =
new HashSet<>();
281 cacheSyncStream.stream()
285 Assert.assertEquals(flowId, item.values.get(1));
286 Assert.assertEquals(switchId, item.values.get(2));
289 seenEvents.add(affectedPoint);
292 Assert.assertEquals(expectedEvents, seenEvents);
296 private Datapoint readFromJson(FixedTuple tuple) {
299 }
catch (IOException e) {
308 private class TestingTargetTopology
extends StatsTopology {
310 private KafkaBolt kafkaBolt;
312 TestingTargetTopology(LaunchEnvironment launchEnvironment, KafkaBolt kafkaBolt)
314 super(launchEnvironment);
315 this.kafkaBolt = kafkaBolt;
319 protected void checkAndCreateTopic(String
topic) {
323 protected void createHealthCheckHandler(TopologyBuilder builder, String prefix) {
327 public String getDefaultTopologyName() {
332 protected KafkaBolt createKafkaBolt(String
topic) {
static TemporaryFolder fsData
void setupOverlay(Properties overlay)
static final ObjectMapper MAPPER
METER_CFG_STATS_METRIC_GEN
static LaunchEnvironment makeLaunchEnvironment()
void meterConfigStatsTest()
static MkClusterParam clusterParam
void cacheSyncSingleSwitchFlowAdd()
static CompleteTopologyParam completeTopologyParam
static LocalCluster cluster
StormTopology createTopology()
StatsTopology(LaunchEnvironment env)
static final String CORRELATION_ID
static void teardownOnce()
GraphDatabaseService getGraphDatabaseService()
STATS_KILDA_SPEAKER_SPOUT
String getListenAddress()
static String NEO4J_LISTEN_ADDRESS