16 package org.openkilda.wfm.topology.cache;
49 import com.fasterxml.jackson.databind.ObjectMapper;
50 import com.google.common.collect.ImmutableList;
51 import org.apache.commons.lang.StringUtils;
52 import org.apache.kafka.clients.consumer.ConsumerRecord;
53 import org.apache.storm.Config;
54 import org.apache.storm.generated.StormTopology;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Ignore;
60 import org.junit.Test;
62 import java.io.IOException;
63 import java.util.Collections;
64 import java.util.HashSet;
65 import java.util.List;
66 import java.util.Properties;
68 import java.util.UUID;
72 private static final ObjectMapper objectMapper =
new ObjectMapper();
73 private static final String firstFlowId =
"first-flow";
74 private static final String secondFlowId =
"second-flow";
75 private static final String thirdFlowId =
"third-flow";
79 new Flow(firstFlowId, 10000,
false,
"", sw.
getSwitchId(), 1, 2, sw.
getSwitchId(), 1, 2),
82 new Flow(secondFlowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
84 new Flow(secondFlowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
87 new Flow(thirdFlowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
89 new Flow(thirdFlowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
91 private static final Set<ImmutablePair<Flow, Flow>> flows =
new HashSet<>();
93 "test", Collections.singleton(sw), Collections.emptySet(), Collections.emptySet(), flows);
105 Properties configOverlay =
new Properties();
111 flows.add(firstFlow);
112 flows.add(secondFlow);
117 StormTopology stormTopology = topology.createTopology();
132 flowConsumer.start();
138 ctrlConsumer.start();
142 public void init() throws Exception {
144 flowConsumer.
clear();
146 ctrlConsumer.
clear();
169 System.out.println(
"Flow Update Test");
171 sendFlowUpdate(thirdFlow);
173 ConsumerRecord<String, String> flow = teConsumer.pollMessage();
175 Assert.assertNotNull(flow);
176 Assert.assertNotNull(flow.value());
180 Assert.assertNotNull(infoData);
182 Assert.assertEquals(thirdFlow, infoData.
getPayload());
190 sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
192 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
194 Assert.assertNotNull(raw);
195 Assert.assertNotNull(raw.value());
197 Message responseGeneric = objectMapper.readValue(raw.value(),
Message.class);
210 sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
212 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
214 Assert.assertNotNull(raw);
215 Assert.assertNotNull(raw.value());
217 Message responseGeneric = objectMapper.readValue(raw.value(),
Message.class);
230 sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
232 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
234 Assert.assertNotNull(raw);
235 Assert.assertNotNull(raw.value());
237 Message responseGeneric = objectMapper.readValue(raw.value(),
Message.class);
246 final String flowId =
"flowId";
250 StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY);
251 sendData(destSwitch);
253 List<PathNode>
path = ImmutableList.of(
264 flowConsumer.
clear();
269 ConsumerRecord<String, String> record = flowConsumer.
pollMessage();
270 Assert.assertNotNull(record);
272 Assert.assertNotNull(message);
274 Assert.assertEquals(
command.getFlowId(), flowId);
277 private static <T extends Message>
void sendMessage(T message, String
topic)
throws IOException {
278 String request = objectMapper.writeValueAsString(message);
282 private static void sendNetworkDump(NetworkInfoData
data, String correlationId)
throws IOException {
283 System.out.println(
"Topology-Engine: Send Network Dump");
285 sendMessage(info, topology.getConfig().getKafkaTopoCacheTopic());
288 private static <T extends InfoData>
void sendData(T infoData)
throws IOException {
290 sendMessage(info, topology.getConfig().getKafkaTopoCacheTopic());
293 private static void sendFlowUpdate(ImmutablePair<Flow, Flow> flow)
throws IOException {
294 System.out.println(
"Flow Topology: Send Flow Creation Request");
295 String correlationId = UUID.randomUUID().toString();
296 FlowInfoData
data =
new FlowInfoData(flow.getLeft().getFlowId(),
297 flow, FlowOperation.CREATE, correlationId);
300 InfoMessage message =
new InfoMessage(
data, System.currentTimeMillis(), correlationId);
301 sendMessage(message, topology.getConfig().getKafkaTopoCacheTopic());
304 private static void sendClearState() throws IOException, InterruptedException {
305 CtrlRequest request =
new CtrlRequest(
306 "cachetopology/cache",
new RequestData(
"clearState"), 1,
"route-correlation-id",
307 Destination.WFM_CTRL);
308 sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
310 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
313 CtrlResponse response = (CtrlResponse) objectMapper.readValue(raw.value(), Message.class);
314 Assert.assertEquals(request.getCorrelationId(), response.getCorrelationId());
318 private static void sendNetworkDumpRequest() throws IOException, InterruptedException {
319 CtrlRequest request =
new CtrlRequest(
"cachetopology/cache",
new RequestData(
"dump"),
320 System.currentTimeMillis(), UUID.randomUUID().toString(), Destination.WFM_CTRL);
321 sendMessage(request,
topology.getConfig().getKafkaCtrlTopic());
324 private NetworkDump getNetworkDump(ConsumerRecord<String, String> raw)
throws IOException {
325 Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
326 CtrlResponse response = (CtrlResponse) responseGeneric;
327 DumpStateResponseData
data = (DumpStateResponseData) response.getData();
328 CacheBoltState cacheState = (CacheBoltState)
data.getState();
329 return cacheState.getNetwork();
332 private FlowInfoData buildFlowInfoData(String flowId, SwitchId srcSwitch, SwitchId dstSwitch, List<PathNode>
path) {
333 Flow flow =
new Flow();
334 flow.setFlowId(flowId);
335 flow.setSourceSwitch(srcSwitch);
336 flow.setDestinationSwitch(dstSwitch);
337 flow.setState(FlowState.UP);
339 PathInfoData pathInfoData =
new PathInfoData(0L,
path);
340 flow.setFlowPath(pathInfoData);
341 ImmutablePair<Flow, Flow> immutablePair =
new ImmutablePair<>(flow, flow);
342 return new FlowInfoData(flowId, immutablePair, FlowOperation.CREATE, UUID.randomUUID().toString());
345 private static String waitDumpRequest() throws InterruptedException, IOException {
346 ConsumerRecord<String, String> raw;
348 while ((raw = teConsumer.pollMessage(1000)) == null) {
349 System.out.println(
"Waiting For Dump Request");
350 Assert.assertTrue(
"Waiting For Dump Request failed", ++sec < 20);
352 System.out.println(
"Waiting For Dump Request");
354 Message request = objectMapper.readValue(raw.value(), Message.class);
355 return request.getCorrelationId();
static TemporaryFolder fsData
void setupOverlay(Properties overlay)
static Config stormConfig()
static LaunchEnvironment makeLaunchEnvironment()
static TestKafkaProducer kProducer
static void teardownOnce()
static LocalCluster cluster
void setState(final SwitchState state)
ImmutablePair< Flow, Flow > getPayload()
def command(payload, fields)
void pushMessage(final String topic, final String data)
static void teardownOnce()
void flowShouldBeReroutedWhenIslDies()
ConsumerRecord< String, String > pollMessage()
void setState(IslChangeType state)
String getCorrelationId()
static Properties kafkaProperties()
void cacheReceivesFlowTopologyUpdatesAndSendsToTopologyEngine()
String getListenAddress()
static String NEO4J_LISTEN_ADDRESS