16 package org.openkilda.wfm.topology.event;
18 import static org.junit.Assert.assertEquals;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import org.apache.storm.Config;
32 import org.apache.storm.generated.StormTopology;
33 import org.apache.storm.utils.Utils;
34 import org.junit.AfterClass;
35 import org.junit.Ignore;
36 import org.junit.Test;
38 import java.io.IOException;
39 import java.util.Collections;
40 import java.util.UUID;
51 private static final ObjectMapper objectMapper =
new ObjectMapper();
57 Utils.sleep(4 * 1000);
61 @Test(timeout = 5000 * 60)
66 topology.getConfig().getKafkaTopoEngTopic(),
72 final int floodSize = 100000;
75 "hostname",
"description",
"controller");
79 sendMessages(message, topology.getConfig().getKafkaTopoDiscoTopic(), floodSize);
82 StormTopology stormTopology = topology.createTopology();
87 "test", Collections.emptySet(),
88 Collections.emptySet(),
89 Collections.emptySet(),
90 Collections.emptySet());
94 String request = objectMapper.writeValueAsString(info);
100 while (pooled < floodSize) {
105 assertEquals(floodSize, pooled);
108 private static void sendMessages(Object
object, String
topic,
int count)
throws IOException {
109 String request = objectMapper.writeValueAsString(
object);
static Config stormConfig()
static LaunchEnvironment makeLaunchEnvironment()
static TestKafkaProducer kProducer
static void teardownOnce()
static void teardownOnce()
static LocalCluster cluster
void warmBoltOnHighLoadedTopic()
void pushMessage(final String topic, final String data)
void pushMessageAsync(final String topic, final String data)
static final String DEFAULT_CORRELATION_ID
ConsumerRecord< String, String > pollMessage()
static Properties kafkaProperties()