Open Kilda Java Documentation
OfeLinkBoltFloodTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.event;
17 
18 import static org.junit.Assert.assertEquals;
20 
29 
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import org.apache.storm.Config;
32 import org.apache.storm.generated.StormTopology;
33 import org.apache.storm.utils.Utils;
34 import org.junit.AfterClass;
35 import org.junit.Ignore;
36 import org.junit.Test;
37 
38 import java.io.IOException;
39 import java.util.Collections;
40 import java.util.UUID;
41 
47 @Ignore // TODO(nmarchenko): move that test to perf and unignore
49 
50  private static OfEventWfmTopology topology;
51  private static final ObjectMapper objectMapper = new ObjectMapper();
52  private TestKafkaConsumer teConsumer;
53 
54  @AfterClass
55  public static void teardownOnce() throws Exception {
56  cluster.killTopology(OfeLinkBoltFloodTest.class.getSimpleName());
57  Utils.sleep(4 * 1000);
59  }
60 
61  @Test(timeout = 5000 * 60)
62  public void warmBoltOnHighLoadedTopic() throws Exception {
63  topology = new OfEventWfmTopology(makeLaunchEnvironment());
64 
65  teConsumer = new TestKafkaConsumer(
66  topology.getConfig().getKafkaTopoEngTopic(),
67  kafkaProperties(UUID.nameUUIDFromBytes(Destination.TOPOLOGY_ENGINE.toString().getBytes()).toString())
68  );
69  teConsumer.start();
70 
71  // Size of messages in topic before bolt start
72  final int floodSize = 100000;
73 
74  SwitchInfoData data = new SwitchInfoData(new SwitchId("ff:00"), SwitchState.ADDED, "address",
75  "hostname", "description", "controller");
76  InfoMessage message = new InfoMessage(data, System.currentTimeMillis(), UUID.randomUUID().toString());
77 
78  // Floooding
79  sendMessages(message, topology.getConfig().getKafkaTopoDiscoTopic(), floodSize);
80 
81 
82  StormTopology stormTopology = topology.createTopology();
83  Config config = stormConfig();
84  cluster.submitTopology(OfeLinkBoltFloodTest.class.getSimpleName(), config, stormTopology);
85 
87  "test", Collections.emptySet(),
88  Collections.emptySet(),
89  Collections.emptySet(),
90  Collections.emptySet());
91 
92  InfoMessage info = new InfoMessage(dump, 0, DEFAULT_CORRELATION_ID, Destination.WFM);
93 
94  String request = objectMapper.writeValueAsString(info);
95  // Send DumpMessage to topic with offset floodSize+1.
96  kProducer.pushMessage(topology.getConfig().getKafkaTopoDiscoTopic(), request);
97 
98  // Wait all messages
99  int pooled = 0;
100  while (pooled < floodSize) {
101  if (teConsumer.pollMessage() != null) {
102  ++pooled;
103  }
104  }
105  assertEquals(floodSize, pooled);
106  }
107 
108  private static void sendMessages(Object object, String topic, int count) throws IOException {
109  String request = objectMapper.writeValueAsString(object);
110  for (int i = 0; i < count; ++i) {
111  kProducer.pushMessageAsync(topic, request);
112  }
113  kProducer.flush();
114  }
115 }
static LaunchEnvironment makeLaunchEnvironment()
int count
Definition: generator.py:19
void pushMessage(final String topic, final String data)
void pushMessageAsync(final String topic, final String data)
static final String DEFAULT_CORRELATION_ID
Definition: Utils.java:69
ConsumerRecord< String, String > pollMessage()