Open Kilda Java Documentation
OpenTSDBTopologyTest.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.opentsdb;
2 
3 import static org.mockserver.integration.ClientAndServer.startClientAndServer;
4 import static org.openkilda.messaging.Utils.MAPPER;
5 
9 
10 import org.apache.storm.Testing;
11 import org.apache.storm.generated.StormTopology;
12 import org.apache.storm.kafka.bolt.KafkaBolt;
13 import org.apache.storm.testing.MockedSources;
14 import org.apache.storm.topology.TopologyBuilder;
15 import org.apache.storm.tuple.Values;
16 import org.junit.Before;
17 import org.junit.BeforeClass;
18 import org.junit.Ignore;
19 import org.junit.Test;
20 import org.mockserver.integration.ClientAndServer;
21 import org.mockserver.model.HttpRequest;
22 import org.mockserver.model.HttpResponse;
23 import org.mockserver.verify.VerificationTimes;
24 
25 import java.util.Collections;
26 import java.util.Map;
27 
29  private static final long timestamp = System.currentTimeMillis();
30  private static ClientAndServer mockServer;
31 
32  @BeforeClass
33  public static void setupOnce() throws Exception {
35  mockServer = startClientAndServer(4242);
36  mockServer.when(HttpRequest
37  .request()
38  .withMethod("POST")
39  .withPath("api/put"))
40  .respond(HttpResponse.response());
41  }
42 
43  @Before
44  public void init() {
45  mockServer.reset();
46  }
47 
48  @Ignore
49  @Test
50  public void shouldSuccessfulSendDatapoint() throws Exception {
51  Datapoint datapoint = new Datapoint("metric", timestamp, Collections.emptyMap(), 123);
52 
53  MockedSources sources = new MockedSources();
54  // TODO: rather than use Topic.OTSDB, grab it from the TopologyConfig object (which does
55  // not exist at this point in the code.
56 
57  Testing.withTrackedCluster(clusterParam, (cluster) -> {
58  OpenTSDBTopology topology = new TestingTargetTopology(new TestingKafkaBolt());
59 
60  sources.addMockData(OpenTSDBTopology.OTSDB_SPOUT_ID,
61  new Values(MAPPER.writeValueAsString(datapoint)));
62  completeTopologyParam.setMockedSources(sources);
63 
64  StormTopology stormTopology = topology.createTopology();
65 
66  Map result = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
67  });
68 
69  //verify that request is sent to OpenTSDB server
70  mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(1));
71  }
72 
73  @Ignore
74  @Test
75  public void shouldSendDatapointRequestsOnlyOnce() throws Exception {
76  Datapoint datapoint = new Datapoint("metric", timestamp, Collections.emptyMap(), 123);
77  String jsonDatapoint = MAPPER.writeValueAsString(datapoint);
78 
79  MockedSources sources = new MockedSources();
80 
81  Testing.withTrackedCluster(clusterParam, (cluster) -> {
82  OpenTSDBTopology topology = new TestingTargetTopology(new TestingKafkaBolt());
83 
84  sources.addMockData(OpenTSDBTopology.OTSDB_SPOUT_ID,
85  new Values(jsonDatapoint), new Values(jsonDatapoint));
86  completeTopologyParam.setMockedSources(sources);
87 
88  StormTopology stormTopology = topology.createTopology();
89 
90  Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
91  });
92  //verify that request is sent to OpenTSDB server once
93  mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(1));
94  }
95 
96  @Ignore
97  @Test
98  public void shouldSendDatapointRequestsTwice() throws Exception {
99  Datapoint datapoint1 = new Datapoint("metric", timestamp, Collections.emptyMap(), 123);
100  String jsonDatapoint1 = MAPPER.writeValueAsString(datapoint1);
101 
102  Datapoint datapoint2 = new Datapoint("metric", timestamp, Collections.emptyMap(), 456);
103  String jsonDatapoint2 = MAPPER.writeValueAsString(datapoint2);
104 
105  MockedSources sources = new MockedSources();
106 
107  Testing.withTrackedCluster(clusterParam, (cluster) -> {
108  OpenTSDBTopology topology = new TestingTargetTopology(new TestingKafkaBolt());
109 
110  sources.addMockData(OpenTSDBTopology.OTSDB_SPOUT_ID,
111  new Values(jsonDatapoint1), new Values(jsonDatapoint2));
112  completeTopologyParam.setMockedSources(sources);
113 
114  StormTopology stormTopology = topology.createTopology();
115 
116  Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
117  });
118  //verify that request is sent to OpenTSDB server once
119  mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(2));
120  }
121 
122  private class TestingTargetTopology extends OpenTSDBTopology {
123 
124  private KafkaBolt kafkaBolt;
125 
126  TestingTargetTopology(KafkaBolt kafkaBolt) throws Exception {
127  super(makeLaunchEnvironment());
128 
129  this.kafkaBolt = kafkaBolt;
130  }
131 
132  @Override
133  protected void checkAndCreateTopic(String topic) {
134  }
135 
136  @Override
137  protected void createHealthCheckHandler(TopologyBuilder builder, String prefix) {
138  }
139 
140  @Override
141  public String getDefaultTopologyName() {
142  return OpenTSDBTopology.class.getSimpleName().toLowerCase();
143  }
144 
145  @Override
146  protected KafkaBolt createKafkaBolt(String topic) {
147  return kafkaBolt;
148  }
149 
150  }
151 
152 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
static LaunchEnvironment makeLaunchEnvironment()
static CompleteTopologyParam completeTopologyParam
list result
Definition: plan-d.py:72