16 package org.openkilda.wfm.topology.flow;
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNotNull;
20 import static org.junit.Assert.assertNull;
21 import static org.junit.Assert.assertTrue;
64 import com.fasterxml.jackson.databind.ObjectMapper;
65 import org.apache.kafka.clients.consumer.ConsumerRecord;
66 import org.apache.storm.Config;
67 import org.apache.storm.generated.StormTopology;
68 import org.apache.storm.utils.Utils;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Ignore;
74 import org.junit.Test;
76 import java.io.IOException;
77 import java.util.Collections;
78 import java.util.List;
79 import java.util.UUID;
83 private static final long COOKIE = 0x1FFFFFFFFL;
84 private static final ObjectMapper objectMapper =
new ObjectMapper();
98 topologyConfig = flowTopology.
getConfig();
116 cacheConsumer.start();
122 teResponseConsumer.start();
126 ctrlConsumer.start();
138 cacheConsumer.join();
139 teResponseConsumer.
wakeup();
140 teResponseConsumer.join();
146 public void setup() throws Exception {
149 cacheConsumer.
clear();
150 teResponseConsumer.
clear();
157 cacheConsumer.
clear();
158 teResponseConsumer.
clear();
166 ConsumerRecord<String, String> record;
167 String flowId = UUID.randomUUID().toString();
172 assertNotNull(record);
173 assertNotNull(record.value());
180 assertNotNull(record);
181 assertNotNull(record.value());
185 assertNotNull(response);
190 String flowId = UUID.randomUUID().toString();
191 ConsumerRecord<String, String> record;
196 assertNotNull(record);
197 assertNotNull(record.value());
199 assertNotNull(record);
200 assertNotNull(record.value());
210 String flowId = UUID.randomUUID().toString();
211 ConsumerRecord<String, String> record;
216 assertNotNull(record);
217 assertNotNull(record.value());
219 assertNotNull(record);
220 assertNotNull(record.value());
222 createFlow(flowId +
"_alt");
230 String flowId = UUID.randomUUID().toString();
231 ConsumerRecord<String, String> record;
236 assertNotNull(record);
237 assertNotNull(record.value());
239 assertNotNull(record);
240 assertNotNull(record.value());
242 Flow payload = deleteFlow(flowId);
245 assertNotNull(record);
246 assertNotNull(record.value());
249 assertNotNull(message);
254 assertEquals(payload.getFlowId(), flowTePayload.getFlowId());
257 assertNotNull(record);
258 assertNotNull(record.value());
260 System.out.println(
"record = " + record);
263 assertNotNull(response);
268 String flowId = UUID.randomUUID().toString();
269 ConsumerRecord<String, String> record;
279 String flowId = UUID.randomUUID().toString();
280 ConsumerRecord<String, String> record;
285 assertNotNull(record);
286 assertNotNull(record.value());
288 assertNotNull(record);
289 assertNotNull(record.value());
294 assertNotNull(record);
297 assertNotNull(message);
304 assertNotNull(record);
305 assertNotNull(record.value());
309 assertNotNull(payload);
312 assertEquals(flowNbPayload, flowTePayload);
317 String flowId = UUID.randomUUID().toString();
318 ConsumerRecord<String, String> record;
328 String flowId = UUID.randomUUID().toString();
329 ConsumerRecord<String, String> record;
334 assertNotNull(record);
335 assertNotNull(record.value());
337 assertNotNull(record);
338 assertNotNull(record.value());
348 String flowId = UUID.randomUUID().toString();
349 ConsumerRecord<String, String> record;
359 String flowId = UUID.randomUUID().toString();
360 ConsumerRecord<String, String> record;
365 assertNotNull(record);
366 assertNotNull(record.value());
368 assertNotNull(record);
369 assertNotNull(record.value());
374 assertNotNull(record);
375 assertNotNull(record.value());
379 assertNotNull(infoData);
382 assertEquals(emptyPath, flowPayload.getForward().getFlowPath());
383 assertEquals(emptyPath, flowPayload.getReverse().getFlowPath());
388 String flowId = UUID.randomUUID().toString();
389 ConsumerRecord<String, String> record;
399 String flowId = UUID.randomUUID().toString();
400 ConsumerRecord<String, String> record;
402 Flow flow = createFlow(flowId);
404 flow.setFlowPath(
new PathInfoData(0L, Collections.emptyList()));
406 flow.setTransitVlan(2);
410 assertNotNull(record);
411 assertNotNull(record.value());
413 assertNotNull(record);
414 assertNotNull(record.value());
419 assertNotNull(record);
420 assertNotNull(record.value());
424 assertNotNull(infoData);
426 Flow flowTePayload = infoData.getPayload().getForward();
427 assertEquals(flow, flowTePayload);
432 String flowId = UUID.randomUUID().toString();
433 ConsumerRecord<String, String> record;
443 String flowId = UUID.randomUUID().toString();
444 ConsumerRecord<String, String> record;
449 assertNotNull(record);
450 assertNotNull(record.value());
452 assertNotNull(record);
453 assertNotNull(record.value());
458 assertNotNull(record);
459 assertNotNull(record.value());
463 assertNotNull(infoData);
464 assertNotNull(infoData.getPayload());
465 assertEquals(flowId, infoData.getPayload().getFlowId());
472 ConsumerRecord<String, String> record = nbConsumer.
pollMessage();
473 assertNotNull(record);
474 assertNotNull(record.value());
477 assertNull(infoMessage.getData());
492 String flowId = UUID.randomUUID().toString();
493 ConsumerRecord<String, String> record;
498 assertNotNull(record);
499 assertNotNull(record.value());
501 assertNotNull(record);
502 assertNotNull(record.value());
512 assertNotNull(record);
513 assertNotNull(record.value());
516 assertNotNull(response);
521 assertEquals(
data, responseData);
531 baseInstallRuleCommand(response);
541 String flowId = UUID.randomUUID().toString();
542 ConsumerRecord<String, String> ofsRecord;
543 ConsumerRecord<String, String> record;
548 assertNotNull(record);
549 assertNotNull(record.value());
551 assertNotNull(record);
552 assertNotNull(record.value());
562 assertNotNull(ofsRecord);
563 assertNotNull(ofsRecord.value());
566 assertNotNull(response);
571 assertEquals(
data, responseData);
581 removeRuleCommand(response);
592 ConsumerRecord<String, String> nbRecord;
593 String flowId = UUID.randomUUID().toString();
598 assertNotNull(nbRecord);
599 assertNotNull(nbRecord.value());
602 assertNotNull(response);
605 assertNotNull(responseData);
606 assertEquals(payload, responseData.getPayload().getForward().getFlowPath());
607 assertEquals(payload, responseData.getPayload().getReverse().getFlowPath());
613 ConsumerRecord<String, String> nbRecord;
614 String flowId = UUID.randomUUID().toString();
616 Flow payload = getFlowCommand(flowId);
619 assertNotNull(nbRecord);
620 assertNotNull(nbRecord.value());
623 assertNotNull(response);
626 assertNotNull(responseData);
627 assertEquals(payload, responseData.
getPayload());
633 ConsumerRecord<String, String> nbRecord;
634 String flowId = UUID.randomUUID().toString();
636 List<String> payload = dumpFlowCommand(flowId);
639 assertNotNull(nbRecord);
640 assertNotNull(nbRecord.value());
643 assertNotNull(response);
646 assertNotNull(responseData);
647 assertEquals(payload, responseData.
getFlowIds());
652 String flowId = UUID.randomUUID().toString();
653 ConsumerRecord<String, String> record;
658 assertNotNull(record);
659 assertNotNull(record.value());
661 assertNotNull(record);
662 assertNotNull(record.value());
682 String flowId = UUID.randomUUID().toString();
683 ConsumerRecord<String, String> record;
688 assertNotNull(record);
689 assertNotNull(record.value());
691 assertNotNull(record);
692 assertNotNull(record.value());
697 assertNotNull(record);
700 assertNotNull(message);
705 assertNotNull(record);
706 assertNotNull(record.value());
726 String flowId = UUID.randomUUID().toString();
727 ConsumerRecord<String, String> record;
732 assertNotNull(record);
733 assertNotNull(record.value());
735 assertNotNull(record);
736 assertNotNull(record.value());
741 assertNotNull(record);
744 assertNotNull(message);
749 assertNotNull(record);
750 assertNotNull(record.value());
770 String flowId = UUID.randomUUID().toString();
771 ConsumerRecord<String, String> record;
776 assertNotNull(record);
777 assertNotNull(record.value());
779 assertNotNull(record);
780 assertNotNull(record.value());
787 errorFlowSpeakerCommand(flowId);
796 @Ignore(
"Not reliable during batch run")
801 sendMessage(request, flowTopology.
getConfig().getKafkaFlowTopic());
803 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
806 assertNotNull(raw.value());
808 Message responseGeneric = objectMapper.readValue(raw.value(),
Message.class);
819 String flowId = UUID.randomUUID().toString();
827 sendFlowMessage(message);
830 assertNotNull(nbMessageValue);
835 assertNotNull(flowNbPayload);
836 assertEquals(1, flowNbPayload.getDroppedFlows().size());
837 assertEquals(flowId, flowNbPayload.getDroppedFlows().get(0));
842 String flowId = UUID.randomUUID().toString();
847 cacheConsumer.
clear();
851 sendFlowMessage(message);
857 assertEquals(flowId, infoData.
getFlowId());
868 String flowId = UUID.randomUUID().toString();
873 cacheConsumer.
clear();
877 sendFlowMessage(message);
883 assertEquals(flowId, infoData.
getFlowId());
892 private void checkFlowReadStatus(
893 ConsumerRecord<String, String> record, String flowId,
FlowState state)
throws IOException {
894 assertNotNull(record);
895 assertNotNull(record.value());
898 assertNotNull(message);
901 assertNotNull(flowResponse);
904 assertNotNull(flowPayload);
905 assertEquals(flowId, flowPayload.getFlowId());
906 assertEquals(state, flowPayload.getForward().getState());
909 private void checkErrorResponseType(ConsumerRecord<String, String> record, ErrorType
type)
throws IOException {
910 assertNotNull(record);
911 assertNotNull(record.value());
913 ErrorMessage errorMessage = objectMapper.readValue(record.value(), ErrorMessage.class);
914 assertNotNull(errorMessage);
915 assertEquals(
type, errorMessage.getData().getErrorType());
918 private Flow deleteFlow(
final String flowId)
throws IOException {
919 System.out.println(
"NORTHBOUND: Delete flow");
920 Flow payload =
new Flow();
921 payload.setFlowId(flowId);
922 FlowDeleteRequest commandData =
new FlowDeleteRequest(payload);
927 sendFlowMessage(message);
932 private Flow createFlow(
final String flowId)
throws IOException {
933 System.out.println(
"NORTHBOUND: Create flow");
935 new Flow(flowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
936 new SwitchId(
"ff:00"), 1, 2);
937 FlowCreateRequest commandData =
new FlowCreateRequest(flowPayload);
938 CommandMessage message =
new CommandMessage(commandData, 0,
"create-flow", Destination.WFM);
940 sendFlowMessage(message);
944 private Flow updateFlow(
final String flowId)
throws IOException {
945 System.out.println(
"NORTHBOUND: Update flow");
947 new Flow(flowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
948 new SwitchId(
"ff:00"), 1, 2);
949 FlowUpdateRequest commandData =
new FlowUpdateRequest(flowPayload);
950 CommandMessage message =
new CommandMessage(commandData, 0,
"update-flow", Destination.WFM);
952 sendFlowMessage(message);
956 private void statusFlow(
final String flowId)
throws IOException {
957 System.out.println(
"NORTHBOUND: Status flow");
958 FlowReadRequest commandData =
new FlowReadRequest(flowId);
959 CommandMessage message =
new CommandMessage(commandData, 0,
"status-flow", Destination.WFM);
961 sendFlowMessage(message);
964 private PathInfoData pathFlow(
final String flowId)
throws IOException {
965 System.out.println(
"NORTHBOUND: Path flow");
966 FlowReadRequest commandData =
new FlowReadRequest(flowId);
967 CommandMessage message =
new CommandMessage(commandData, 0,
"path-flow", Destination.WFM);
969 sendFlowMessage(message);
970 return new PathInfoData(0L, Collections.emptyList());
973 private void getFlow(
final String flowId)
throws IOException {
974 System.out.println(
"NORTHBOUND: Get flow");
975 FlowReadRequest commandData =
new FlowReadRequest(flowId);
976 CommandMessage message =
new CommandMessage(commandData, 0,
"get-flow", Destination.WFM);
978 sendFlowMessage(message);
981 private void dumpFlows() throws IOException {
982 System.out.println(
"NORTHBOUND: Get flows");
983 FlowsDumpRequest commandData =
new FlowsDumpRequest();
984 CommandMessage message =
new CommandMessage(commandData, 0,
"get-flows", Destination.WFM);
986 sendFlowMessage(message);
989 private void sendTopologyEngineMessage(
final Message message)
throws IOException {
990 String request = objectMapper.writeValueAsString(message);
994 private InstallOneSwitchFlow baseInstallFlowCommand(
final String flowId)
throws IOException {
995 System.out.println(
"TOPOLOGY: Install flow");
996 InstallOneSwitchFlow commandData =
new InstallOneSwitchFlow(0L, flowId,
997 COOKIE,
new SwitchId(
"ff:04"), 1, 2, 0, 0, OutputVlanType.NONE, 10000L, 0L);
998 CommandMessage commandMessage =
new CommandMessage(commandData, 0,
"install-flow", Destination.WFM);
1001 sendFlowMessage(commandMessage);
1005 private RemoveFlow removeFlowCommand(
final String flowId)
throws IOException {
1006 System.out.println(
"TOPOLOGY: Remove flow");
1007 RemoveFlow commandData =
new RemoveFlow(0L, flowId, COOKIE,
new SwitchId(
"ff:04"), 0L,
1008 DeleteRulesCriteria.builder().cookie(COOKIE).build());
1009 CommandMessage commandMessage =
new CommandMessage(commandData, 0,
"remove-flow", Destination.WFM);
1011 sendFlowMessage(commandMessage);
1015 private Flow getFlowCommand(
final String flowId)
throws IOException {
1016 System.out.println(
"TOPOLOGY: Get flow");
1018 new Flow(flowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
1019 new SwitchId(
"ff:00"), 1, 2);
1020 FlowResponse infoData =
new FlowResponse(flowPayload);
1021 InfoMessage infoMessage =
new InfoMessage(infoData, 0,
"get-flow", Destination.WFM);
1022 sendTopologyEngineMessage(infoMessage);
1026 private List<String> dumpFlowCommand(
final String flowId)
throws IOException {
1027 System.out.println(
"TOPOLOGY: Get flows");
1029 new Flow(flowId, 10000,
false,
"",
new SwitchId(
"ff:00"), 1, 2,
1030 new SwitchId(
"ff:00"), 1, 2);
1031 List<String> payload = Collections.singletonList(flow.getFlowId());
1032 FlowsResponse infoData =
new FlowsResponse(payload);
1033 InfoMessage infoMessage =
new InfoMessage(infoData, 0,
"dump-flows", Destination.WFM);
1034 sendTopologyEngineMessage(infoMessage);
1038 private PathInfoData pathFlowCommand(
final String flowId)
throws IOException {
1039 System.out.println(
"TOPOLOGY: Path flow");
1040 PathInfoData pathInfoData =
new PathInfoData(
1041 0L, Collections.singletonList(
new PathNode(
new SwitchId(
"ff:00"), 1, 0, null)));
1042 Flow flow = Flow.builder().flowId(flowId).flowPath(pathInfoData).build();
1043 FlowReadResponse infoData =
new FlowReadResponse(
new BidirectionalFlow(flow, flow));
1044 InfoMessage infoMessage =
1045 new InfoMessage(infoData, 0,
"path-flow", Destination.WFM);
1046 sendTopologyEngineMessage(infoMessage);
1047 return pathInfoData;
1050 private ErrorMessage errorFlowTopologyEngineCommand(
final String flowId,
final ErrorType
type)
throws IOException {
1051 System.out.println(
"TOPOLOGY: Error flow");
1052 ErrorData errorData =
new ErrorData(
type,
"Could not operate with flow", flowId);
1053 ErrorMessage errorMessage =
new ErrorMessage(errorData, 0,
"error-flow", Destination.WFM);
1056 return errorMessage;
1059 private void sendSpeakerMessage(
final Message message)
throws IOException {
1060 String request = objectMapper.writeValueAsString(message);
1064 private Message baseInstallRuleCommand(
final Message message)
throws IOException {
1065 System.out.println(
"TOPOLOGY: Install rule");
1070 private Message removeRuleCommand(
final Message message)
throws IOException {
1071 System.out.println(
"TOPOLOGY: Remove rule");
1076 private ErrorMessage errorFlowSpeakerCommand(
final String flowId)
throws IOException {
1077 System.out.println(
"TOPOLOGY: Error rule");
1078 ErrorData errorData =
new ErrorData(ErrorType.REQUEST_INVALID,
"Could not operate with flow", flowId);
1079 ErrorMessage errorMessage =
new ErrorMessage(errorData, 0,
"error-flow", Destination.WFM_TRANSACTION);
1082 return errorMessage;
1085 private void sendFlowMessage(
final CommandMessage message)
throws IOException {
1089 private void sendNorthboundMessage(
final CommandMessage message)
throws IOException {
1093 private void sendMessage(Object
object, String
topic)
throws IOException {
1094 String request = objectMapper.writeValueAsString(
object);
1098 private ImmutablePair<Flow, Flow> getFlowPayload(InfoMessage message) {
1099 InfoData
data = message.getData();
1100 FlowInfoData flow = (FlowInfoData)
data;
1101 return flow.getPayload();
1104 private void sendClearState() throws IOException, InterruptedException {
1105 CtrlRequest request =
new CtrlRequest(
"flowtopology/" + ComponentType.CRUD_BOLT.toString(),
1106 new RequestData(
"clearState"), 1,
"clear-state-correlation-id", Destination.WFM_CTRL);
1109 ConsumerRecord<String, String> raw = ctrlConsumer.
pollMessage();
1112 CtrlResponse response = (CtrlResponse) objectMapper.readValue(raw.value(), Message.class);
1113 assertEquals(request.getCorrelationId(), response.getCorrelationId());
FlowCacheSyncResults getPayload()
void errorFlowCreateMessageStatusBoltTopologyEngineBoltTest()
static Config stormConfig()
void deleteUnknownFlowCommandBoltTest()
static LaunchEnvironment makeLaunchEnvironment()
void dumpFlowsWhenThereIsNoFlowsCreated()
void removeFlowTopologyEngineSpeakerBoltTest()
void updateFlowCommandBoltTest()
List< String > getFlowIds()
void errorFlowDeleteMessageStatusBoltTopologyEngineBoltTest()
static TestKafkaProducer kProducer
void errorMessageStatusBoltSpeakerBoltTest()
default String getKafkaFlowTopic()
default String getKafkaSpeakerTopic()
void shouldInvalidateCacheWithFlowsTest()
void getPathTopologyEngineBoltTest()
static void teardownOnce()
void shouldSyncCacheWithFlowsTest()
default String getKafkaTopoEngTopic()
void installFlowTopologyEngineSpeakerBoltTest()
FlowOperation getOperation()
void setTransactionId(final Long transactionId)
static LocalCluster cluster
void pushMessage(final String topic, final String data)
void updateUnknownFlowCommandBoltTest()
void createAlreadyExistsFlowCommandBoltTest()
void deleteFlowCommandBoltTest()
StormTopology createTopology()
default String getKafkaNorthboundTopic()
void errorFlowUpdateMessageStatusBoltTopologyEngineBoltTest()
void shouldFailOnCreatingConflictingFlow()
String pollMessageValue()
static void teardownOnce()
void statusUnknownFlowTest()
void getUnknownFlowTest()
void pathUnknownFlowTest()
default String getKafkaCtrlTopic()
ConsumerRecord< String, String > pollMessage()
default String getKafkaTopoCacheTopic()
String getCorrelationId()
void dumpFlowsTopologyEngineBoltTest()
static Properties kafkaProperties()
void getFlowTopologyEngineBoltTest()
void shouldSyncCacheProvideDifferenceWithFlowsTest()
void createFlowCommandBoltTest()