1 package org.openkilda.wfm.topology.opentsdb;
3 import static org.mockserver.integration.ClientAndServer.startClientAndServer;
10 import org.apache.storm.Testing;
11 import org.apache.storm.generated.StormTopology;
12 import org.apache.storm.kafka.bolt.KafkaBolt;
13 import org.apache.storm.testing.MockedSources;
14 import org.apache.storm.topology.TopologyBuilder;
15 import org.apache.storm.tuple.Values;
16 import org.junit.Before;
17 import org.junit.BeforeClass;
18 import org.junit.Ignore;
19 import org.junit.Test;
20 import org.mockserver.integration.ClientAndServer;
21 import org.mockserver.model.HttpRequest;
22 import org.mockserver.model.HttpResponse;
23 import org.mockserver.verify.VerificationTimes;
25 import java.util.Collections;
29 private static final long timestamp = System.currentTimeMillis();
30 private static ClientAndServer mockServer;
35 mockServer = startClientAndServer(4242);
36 mockServer.when(HttpRequest
40 .respond(HttpResponse.response());
53 MockedSources sources =
new MockedSources();
61 new Values(MAPPER.writeValueAsString(datapoint)));
64 StormTopology stormTopology =
topology.createTopology();
70 mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(1));
77 String jsonDatapoint = MAPPER.writeValueAsString(datapoint);
79 MockedSources sources =
new MockedSources();
85 new Values(jsonDatapoint),
new Values(jsonDatapoint));
88 StormTopology stormTopology =
topology.createTopology();
93 mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(1));
99 Datapoint datapoint1 =
new Datapoint(
"metric", timestamp, Collections.emptyMap(), 123);
100 String jsonDatapoint1 = MAPPER.writeValueAsString(datapoint1);
102 Datapoint datapoint2 =
new Datapoint(
"metric", timestamp, Collections.emptyMap(), 456);
103 String jsonDatapoint2 = MAPPER.writeValueAsString(datapoint2);
105 MockedSources sources =
new MockedSources();
111 new Values(jsonDatapoint1),
new Values(jsonDatapoint2));
114 StormTopology stormTopology =
topology.createTopology();
119 mockServer.verify(HttpRequest.request(), VerificationTimes.exactly(2));
124 private KafkaBolt kafkaBolt;
126 TestingTargetTopology(KafkaBolt kafkaBolt)
throws Exception {
129 this.kafkaBolt = kafkaBolt;
133 protected void checkAndCreateTopic(String
topic) {
137 protected void createHealthCheckHandler(TopologyBuilder builder, String prefix) {
141 public String getDefaultTopologyName() {
142 return OpenTSDBTopology.class.getSimpleName().toLowerCase();
146 protected KafkaBolt createKafkaBolt(String
topic) {
void shouldSuccessfulSendDatapoint()
void shouldSendDatapointRequestsTwice()
static final ObjectMapper MAPPER
static LaunchEnvironment makeLaunchEnvironment()
static MkClusterParam clusterParam
static CompleteTopologyParam completeTopologyParam
static LocalCluster cluster
void shouldSendDatapointRequestsOnlyOnce()