Open Kilda Java Documentation
FlowTopologyTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow;
17 
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNotNull;
20 import static org.junit.Assert.assertNull;
21 import static org.junit.Assert.assertTrue;
22 
63 
64 import com.fasterxml.jackson.databind.ObjectMapper;
65 import org.apache.kafka.clients.consumer.ConsumerRecord;
66 import org.apache.storm.Config;
67 import org.apache.storm.generated.StormTopology;
68 import org.apache.storm.utils.Utils;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Ignore;
74 import org.junit.Test;
75 
76 import java.io.IOException;
77 import java.util.Collections;
78 import java.util.List;
79 import java.util.UUID;
80 
81 public class FlowTopologyTest extends AbstractStormTest {
82 
83  private static final long COOKIE = 0x1FFFFFFFFL;
84  private static final ObjectMapper objectMapper = new ObjectMapper();
85  private static TestKafkaConsumer nbConsumer;
86  private static TestKafkaConsumer ofsConsumer;
87  private static TestKafkaConsumer cacheConsumer;
88  private static TestKafkaConsumer teResponseConsumer;
89  private static TestKafkaConsumer ctrlConsumer;
90  private static FlowTopology flowTopology;
91  private static FlowTopologyConfig topologyConfig;
92 
93  @BeforeClass
94  public static void setupOnce() throws Exception {
96 
97  flowTopology = new FlowTopology(makeLaunchEnvironment(), new MockedPathComputerAuth());
98  topologyConfig = flowTopology.getConfig();
99 
100  StormTopology stormTopology = flowTopology.createTopology();
101  Config config = stormConfig();
102  cluster.submitTopology(FlowTopologyTest.class.getSimpleName(), config, stormTopology);
103 
104  nbConsumer = new TestKafkaConsumer(
106  kafkaProperties(UUID.nameUUIDFromBytes(Destination.NORTHBOUND.toString().getBytes()).toString()));
107  nbConsumer.start();
108 
109  ofsConsumer = new TestKafkaConsumer(topologyConfig.getKafkaSpeakerTopic(),
111  kafkaProperties(UUID.nameUUIDFromBytes(Destination.CONTROLLER.toString().getBytes()).toString()));
112  ofsConsumer.start();
113 
114  cacheConsumer = new TestKafkaConsumer(topologyConfig.getKafkaTopoCacheTopic(), null,
115  kafkaProperties(UUID.nameUUIDFromBytes(Destination.TOPOLOGY_ENGINE.toString().getBytes()).toString()));
116  cacheConsumer.start();
117 
118  //teResponseConsumer = new TestKafkaConsumer(topologyConfig.getKafkaTopoEngTopic(),
119  teResponseConsumer = new TestKafkaConsumer(topologyConfig.getKafkaFlowTopic(),
121  kafkaProperties(UUID.nameUUIDFromBytes(Destination.WFM.toString().getBytes()).toString()));
122  teResponseConsumer.start();
123 
124  ctrlConsumer = new TestKafkaConsumer(flowTopology.getConfig().getKafkaCtrlTopic(), Destination.CTRL_CLIENT,
125  kafkaProperties(UUID.nameUUIDFromBytes(Destination.CTRL_CLIENT.toString().getBytes()).toString()));
126  ctrlConsumer.start();
127 
128  Utils.sleep(10000);
129  }
130 
131  @AfterClass
132  public static void teardownOnce() throws Exception {
133  nbConsumer.wakeup();
134  nbConsumer.join();
135  ofsConsumer.wakeup();
136  ofsConsumer.join();
137  cacheConsumer.wakeup();
138  cacheConsumer.join();
139  teResponseConsumer.wakeup();
140  teResponseConsumer.join();
141 
143  }
144 
145  @Before
146  public void setup() throws Exception {
147  nbConsumer.clear();
148  ofsConsumer.clear();
149  cacheConsumer.clear();
150  teResponseConsumer.clear();
151  }
152 
153  @After
154  public void teardown() throws Exception {
155  nbConsumer.clear();
156  ofsConsumer.clear();
157  cacheConsumer.clear();
158  teResponseConsumer.clear();
159 
160  // Clean the CrudBolt's state.
161  sendClearState();
162  }
163 
164  @Test
165  public void createFlowCommandBoltTest() throws Exception {
166  ConsumerRecord<String, String> record;
167  String flowId = UUID.randomUUID().toString();
168 
169  createFlow(flowId);
170 
171  record = cacheConsumer.pollMessage();
172  assertNotNull(record);
173  assertNotNull(record.value());
174 
175  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
176  ImmutablePair<Flow, Flow> flow = getFlowPayload(message);
177  assertNotNull(flow);
178 
179  record = nbConsumer.pollMessage();
180  assertNotNull(record);
181  assertNotNull(record.value());
182 
183  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
184  FlowResponse response = (FlowResponse) infoMessage.getData();
185  assertNotNull(response);
186  }
187 
188  @Test
189  public void createAlreadyExistsFlowCommandBoltTest() throws Exception {
190  String flowId = UUID.randomUUID().toString();
191  ConsumerRecord<String, String> record;
192 
193  createFlow(flowId);
194 
195  record = cacheConsumer.pollMessage();
196  assertNotNull(record);
197  assertNotNull(record.value());
198  record = nbConsumer.pollMessage();
199  assertNotNull(record);
200  assertNotNull(record.value());
201 
202  createFlow(flowId);
203 
204  record = nbConsumer.pollMessage();
205  checkErrorResponseType(record, ErrorType.ALREADY_EXISTS);
206  }
207 
208  @Test
209  public void shouldFailOnCreatingConflictingFlow() throws Exception {
210  String flowId = UUID.randomUUID().toString();
211  ConsumerRecord<String, String> record;
212 
213  createFlow(flowId);
214 
215  record = cacheConsumer.pollMessage();
216  assertNotNull(record);
217  assertNotNull(record.value());
218  record = nbConsumer.pollMessage();
219  assertNotNull(record);
220  assertNotNull(record.value());
221 
222  createFlow(flowId + "_alt");
223 
224  record = nbConsumer.pollMessage();
225  checkErrorResponseType(record, ErrorType.ALREADY_EXISTS);
226  }
227 
228  @Test
229  public void deleteFlowCommandBoltTest() throws Exception {
230  String flowId = UUID.randomUUID().toString();
231  ConsumerRecord<String, String> record;
232 
233  createFlow(flowId);
234 
235  record = cacheConsumer.pollMessage();
236  assertNotNull(record);
237  assertNotNull(record.value());
238  record = nbConsumer.pollMessage();
239  assertNotNull(record);
240  assertNotNull(record.value());
241 
242  Flow payload = deleteFlow(flowId);
243 
244  record = cacheConsumer.pollMessage();
245  assertNotNull(record);
246  assertNotNull(record.value());
247 
248  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
249  assertNotNull(message);
250  ImmutablePair<Flow, Flow> flow = getFlowPayload(message);
251  assertNotNull(flow);
252 
253  Flow flowTePayload = flow.getLeft();
254  assertEquals(payload.getFlowId(), flowTePayload.getFlowId());
255 
256  record = nbConsumer.pollMessage();
257  assertNotNull(record);
258  assertNotNull(record.value());
259 
260  System.out.println("record = " + record);
261  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
262  FlowResponse response = (FlowResponse) infoMessage.getData();
263  assertNotNull(response);
264  }
265 
266  @Test
267  public void deleteUnknownFlowCommandBoltTest() throws Exception {
268  String flowId = UUID.randomUUID().toString();
269  ConsumerRecord<String, String> record;
270 
271  deleteFlow(flowId);
272 
273  record = nbConsumer.pollMessage();
274  checkErrorResponseType(record, ErrorType.NOT_FOUND);
275  }
276 
277  @Test
278  public void updateFlowCommandBoltTest() throws Exception {
279  String flowId = UUID.randomUUID().toString();
280  ConsumerRecord<String, String> record;
281 
282  createFlow(flowId);
283 
284  record = cacheConsumer.pollMessage();
285  assertNotNull(record);
286  assertNotNull(record.value());
287  record = nbConsumer.pollMessage();
288  assertNotNull(record);
289  assertNotNull(record.value());
290 
291  updateFlow(flowId);
292 
293  record = cacheConsumer.pollMessage();
294  assertNotNull(record);
295 
296  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
297  assertNotNull(message);
298  ImmutablePair<Flow, Flow> flow = getFlowPayload(message);
299  assertNotNull(flow);
300 
301  Flow flowTePayload = flow.getLeft();
302 
303  record = nbConsumer.pollMessage();
304  assertNotNull(record);
305  assertNotNull(record.value());
306 
307  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
308  FlowResponse payload = (FlowResponse) infoMessage.getData();
309  assertNotNull(payload);
310 
311  Flow flowNbPayload = payload.getPayload();
312  assertEquals(flowNbPayload, flowTePayload);
313  }
314 
315  @Test
316  public void updateUnknownFlowCommandBoltTest() throws Exception {
317  String flowId = UUID.randomUUID().toString();
318  ConsumerRecord<String, String> record;
319 
320  updateFlow(flowId);
321 
322  record = nbConsumer.pollMessage();
323  checkErrorResponseType(record, ErrorType.NOT_FOUND);
324  }
325 
326  @Test
327  public void statusFlowTest() throws Exception {
328  String flowId = UUID.randomUUID().toString();
329  ConsumerRecord<String, String> record;
330 
331  createFlow(flowId);
332 
333  record = cacheConsumer.pollMessage();
334  assertNotNull(record);
335  assertNotNull(record.value());
336  record = nbConsumer.pollMessage();
337  assertNotNull(record);
338  assertNotNull(record.value());
339 
340  statusFlow(flowId);
341 
342  record = nbConsumer.pollMessage();
343  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
344  }
345 
346  @Test
347  public void statusUnknownFlowTest() throws Exception {
348  String flowId = UUID.randomUUID().toString();
349  ConsumerRecord<String, String> record;
350 
351  statusFlow(flowId);
352 
353  record = nbConsumer.pollMessage();
354  checkErrorResponseType(record, ErrorType.NOT_FOUND);
355  }
356 
357  @Test
358  public void pathFlowTest() throws Exception {
359  String flowId = UUID.randomUUID().toString();
360  ConsumerRecord<String, String> record;
361 
362  createFlow(flowId);
363 
364  record = cacheConsumer.pollMessage();
365  assertNotNull(record);
366  assertNotNull(record.value());
367  record = nbConsumer.pollMessage();
368  assertNotNull(record);
369  assertNotNull(record.value());
370 
371  PathInfoData emptyPath = pathFlow(flowId);
372 
373  record = nbConsumer.pollMessage();
374  assertNotNull(record);
375  assertNotNull(record.value());
376 
377  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
378  FlowReadResponse infoData = (FlowReadResponse) infoMessage.getData();
379  assertNotNull(infoData);
380 
381  BidirectionalFlow flowPayload = infoData.getPayload();
382  assertEquals(emptyPath, flowPayload.getForward().getFlowPath());
383  assertEquals(emptyPath, flowPayload.getReverse().getFlowPath());
384  }
385 
386  @Test
387  public void pathUnknownFlowTest() throws Exception {
388  String flowId = UUID.randomUUID().toString();
389  ConsumerRecord<String, String> record;
390 
391  pathFlow(flowId);
392 
393  record = nbConsumer.pollMessage();
394  checkErrorResponseType(record, ErrorType.NOT_FOUND);
395  }
396 
397  @Test
398  public void getFlowTest() throws Exception {
399  String flowId = UUID.randomUUID().toString();
400  ConsumerRecord<String, String> record;
401 
402  Flow flow = createFlow(flowId);
403  flow.setCookie(1);
404  flow.setFlowPath(new PathInfoData(0L, Collections.emptyList()));
405  flow.setMeterId(1);
406  flow.setTransitVlan(2);
407  flow.setState(FlowState.ALLOCATED);
408 
409  record = cacheConsumer.pollMessage();
410  assertNotNull(record);
411  assertNotNull(record.value());
412  record = nbConsumer.pollMessage();
413  assertNotNull(record);
414  assertNotNull(record.value());
415 
416  getFlow(flowId);
417 
418  record = nbConsumer.pollMessage();
419  assertNotNull(record);
420  assertNotNull(record.value());
421 
422  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
423  FlowReadResponse infoData = (FlowReadResponse) infoMessage.getData();
424  assertNotNull(infoData);
425 
426  Flow flowTePayload = infoData.getPayload().getForward();
427  assertEquals(flow, flowTePayload);
428  }
429 
430  @Test
431  public void getUnknownFlowTest() throws Exception {
432  String flowId = UUID.randomUUID().toString();
433  ConsumerRecord<String, String> record;
434 
435  getFlow(flowId);
436 
437  record = nbConsumer.pollMessage();
438  checkErrorResponseType(record, ErrorType.NOT_FOUND);
439  }
440 
441  @Test
442  public void dumpFlowsTest() throws Exception {
443  String flowId = UUID.randomUUID().toString();
444  ConsumerRecord<String, String> record;
445 
446  createFlow(flowId);
447 
448  record = cacheConsumer.pollMessage();
449  assertNotNull(record);
450  assertNotNull(record.value());
451  record = nbConsumer.pollMessage();
452  assertNotNull(record);
453  assertNotNull(record.value());
454 
455  dumpFlows();
456 
457  record = nbConsumer.pollMessage();
458  assertNotNull(record);
459  assertNotNull(record.value());
460 
461  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
462  FlowReadResponse infoData = (FlowReadResponse) infoMessage.getData();
463  assertNotNull(infoData);
464  assertNotNull(infoData.getPayload());
465  assertEquals(flowId, infoData.getPayload().getFlowId());
466  }
467 
468  @Test
469  public void dumpFlowsWhenThereIsNoFlowsCreated() throws Exception {
470  dumpFlows();
471 
472  ConsumerRecord<String, String> record = nbConsumer.pollMessage();
473  assertNotNull(record);
474  assertNotNull(record.value());
475 
476  InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
477  assertNull(infoMessage.getData());
478  }
479 
480  @Test
481  public void installFlowTopologyEngineSpeakerBoltTest() throws Exception {
482  /*
483  * This test will verify the state transitions of a flow, through the status mechanism.
484  * It achieves this by doing the following:
485  * - CreateFlow .. clear both cache and northbound consumers
486  * - GetStatus .. confirm STATE = FlowState.ALLOCATED
487  * - baseInstallFlowCommand .. read speaker .. validate data/responsedata
488  * - GetStatus .. confirm STATE = FlowState.IN_PROGRESS
489  * - baseInstallRuleCommand ..
490  * - GetStatus .. confirm STATE = FlowState.UP
491  */
492  String flowId = UUID.randomUUID().toString();
493  ConsumerRecord<String, String> record;
494 
495  createFlow(flowId);
496 
497  record = cacheConsumer.pollMessage();
498  assertNotNull(record);
499  assertNotNull(record.value());
500  record = nbConsumer.pollMessage();
501  assertNotNull(record);
502  assertNotNull(record.value());
503 
504  statusFlow(flowId);
505 
506  record = nbConsumer.pollMessage();
507  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
508 
509  InstallOneSwitchFlow data = baseInstallFlowCommand(flowId);
510 
511  record = ofsConsumer.pollMessage();
512  assertNotNull(record);
513  assertNotNull(record.value());
514 
515  CommandMessage response = objectMapper.readValue(record.value(), CommandMessage.class);
516  assertNotNull(response);
517 
518  InstallOneSwitchFlow responseData = (InstallOneSwitchFlow) response.getData();
519  Long transactionId = responseData.getTransactionId();
520  responseData.setTransactionId(0L);
521  assertEquals(data, responseData);
522  responseData.setTransactionId(transactionId);
523 
524  statusFlow(flowId);
525 
526  record = nbConsumer.pollMessage();
527  checkFlowReadStatus(record, flowId, FlowState.IN_PROGRESS);
528 
529  response.setDestination(Destination.WFM_TRANSACTION);
530 
531  baseInstallRuleCommand(response);
532 
533  statusFlow(flowId);
534 
535  record = nbConsumer.pollMessage();
536  checkFlowReadStatus(record, flowId, FlowState.UP);
537  }
538 
539  @Test
540  public void removeFlowTopologyEngineSpeakerBoltTest() throws Exception {
541  String flowId = UUID.randomUUID().toString();
542  ConsumerRecord<String, String> ofsRecord;
543  ConsumerRecord<String, String> record;
544 
545  createFlow(flowId);
546 
547  record = cacheConsumer.pollMessage();
548  assertNotNull(record);
549  assertNotNull(record.value());
550  record = nbConsumer.pollMessage();
551  assertNotNull(record);
552  assertNotNull(record.value());
553 
554  statusFlow(flowId);
555 
556  record = nbConsumer.pollMessage();
557  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
558 
559  RemoveFlow data = removeFlowCommand(flowId);
560 
561  ofsRecord = ofsConsumer.pollMessage();
562  assertNotNull(ofsRecord);
563  assertNotNull(ofsRecord.value());
564 
565  CommandMessage response = objectMapper.readValue(ofsRecord.value(), CommandMessage.class);
566  assertNotNull(response);
567 
568  RemoveFlow responseData = (RemoveFlow) response.getData();
569  Long transactionId = responseData.getTransactionId();
570  responseData.setTransactionId(0L);
571  assertEquals(data, responseData);
572  responseData.setTransactionId(transactionId);
573 
574  statusFlow(flowId);
575 
576  record = nbConsumer.pollMessage();
577  checkFlowReadStatus(record, flowId, FlowState.IN_PROGRESS);
578 
579  response.setDestination(Destination.WFM_TRANSACTION);
580 
581  removeRuleCommand(response);
582 
583  statusFlow(flowId);
584 
585  record = nbConsumer.pollMessage();
586  checkFlowReadStatus(record, flowId, FlowState.UP);
587  }
588 
589  @Test
590  @Ignore
591  public void getPathTopologyEngineBoltTest() throws Exception {
592  ConsumerRecord<String, String> nbRecord;
593  String flowId = UUID.randomUUID().toString();
594 
595  PathInfoData payload = pathFlowCommand(flowId);
596 
597  nbRecord = nbConsumer.pollMessage();
598  assertNotNull(nbRecord);
599  assertNotNull(nbRecord.value());
600 
601  InfoMessage response = objectMapper.readValue(nbRecord.value(), InfoMessage.class);
602  assertNotNull(response);
603 
604  FlowReadResponse responseData = (FlowReadResponse) response.getData();
605  assertNotNull(responseData);
606  assertEquals(payload, responseData.getPayload().getForward().getFlowPath());
607  assertEquals(payload, responseData.getPayload().getReverse().getFlowPath());
608  }
609 
610  @Test
611  @Ignore
612  public void getFlowTopologyEngineBoltTest() throws Exception {
613  ConsumerRecord<String, String> nbRecord;
614  String flowId = UUID.randomUUID().toString();
615 
616  Flow payload = getFlowCommand(flowId);
617 
618  nbRecord = nbConsumer.pollMessage();
619  assertNotNull(nbRecord);
620  assertNotNull(nbRecord.value());
621 
622  InfoMessage response = objectMapper.readValue(nbRecord.value(), InfoMessage.class);
623  assertNotNull(response);
624 
625  FlowResponse responseData = (FlowResponse) response.getData();
626  assertNotNull(responseData);
627  assertEquals(payload, responseData.getPayload());
628  }
629 
630  @Test
631  @Ignore
632  public void dumpFlowsTopologyEngineBoltTest() throws Exception {
633  ConsumerRecord<String, String> nbRecord;
634  String flowId = UUID.randomUUID().toString();
635 
636  List<String> payload = dumpFlowCommand(flowId);
637 
638  nbRecord = nbConsumer.pollMessage();
639  assertNotNull(nbRecord);
640  assertNotNull(nbRecord.value());
641 
642  InfoMessage response = objectMapper.readValue(nbRecord.value(), InfoMessage.class);
643  assertNotNull(response);
644 
645  FlowsResponse responseData = (FlowsResponse) response.getData();
646  assertNotNull(responseData);
647  assertEquals(payload, responseData.getFlowIds());
648  }
649 
650  @Test
652  String flowId = UUID.randomUUID().toString();
653  ConsumerRecord<String, String> record;
654 
655  createFlow(flowId);
656 
657  record = cacheConsumer.pollMessage();
658  assertNotNull(record);
659  assertNotNull(record.value());
660  record = nbConsumer.pollMessage();
661  assertNotNull(record);
662  assertNotNull(record.value());
663 
664  statusFlow(flowId);
665 
666  record = nbConsumer.pollMessage();
667  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
668 
669  errorFlowTopologyEngineCommand(flowId, ErrorType.CREATION_FAILURE);
670 
671  record = nbConsumer.pollMessage();
672  checkErrorResponseType(record, ErrorType.CREATION_FAILURE);
673 
674  statusFlow(flowId);
675 
676  record = nbConsumer.pollMessage();
677  checkErrorResponseType(record, ErrorType.NOT_FOUND);
678  }
679 
680  @Test
682  String flowId = UUID.randomUUID().toString();
683  ConsumerRecord<String, String> record;
684 
685  createFlow(flowId);
686 
687  record = cacheConsumer.pollMessage();
688  assertNotNull(record);
689  assertNotNull(record.value());
690  record = nbConsumer.pollMessage();
691  assertNotNull(record);
692  assertNotNull(record.value());
693 
694  updateFlow(flowId);
695 
696  record = cacheConsumer.pollMessage();
697  assertNotNull(record);
698 
699  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
700  assertNotNull(message);
701  ImmutablePair<Flow, Flow> flow = getFlowPayload(message);
702  assertNotNull(flow);
703 
704  record = nbConsumer.pollMessage();
705  assertNotNull(record);
706  assertNotNull(record.value());
707 
708  statusFlow(flowId);
709 
710  record = nbConsumer.pollMessage();
711  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
712 
713  errorFlowTopologyEngineCommand(flowId, ErrorType.UPDATE_FAILURE);
714 
715  record = nbConsumer.pollMessage();
716  checkErrorResponseType(record, ErrorType.UPDATE_FAILURE);
717 
718  statusFlow(flowId);
719 
720  record = nbConsumer.pollMessage();
721  checkFlowReadStatus(record, flowId, FlowState.DOWN);
722  }
723 
724  @Test
726  String flowId = UUID.randomUUID().toString();
727  ConsumerRecord<String, String> record;
728 
729  createFlow(flowId);
730 
731  record = cacheConsumer.pollMessage();
732  assertNotNull(record);
733  assertNotNull(record.value());
734  record = nbConsumer.pollMessage();
735  assertNotNull(record);
736  assertNotNull(record.value());
737 
738  deleteFlow(flowId);
739 
740  record = cacheConsumer.pollMessage();
741  assertNotNull(record);
742 
743  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
744  assertNotNull(message);
745  ImmutablePair<Flow, Flow> flow = getFlowPayload(message);
746  assertNotNull(flow);
747 
748  record = nbConsumer.pollMessage();
749  assertNotNull(record);
750  assertNotNull(record.value());
751 
752  statusFlow(flowId);
753 
754  record = nbConsumer.pollMessage();
755  checkErrorResponseType(record, ErrorType.NOT_FOUND);
756 
757  errorFlowTopologyEngineCommand(flowId, ErrorType.DELETION_FAILURE);
758 
759  record = nbConsumer.pollMessage();
760  checkErrorResponseType(record, ErrorType.DELETION_FAILURE);
761 
762  statusFlow(flowId);
763 
764  record = nbConsumer.pollMessage();
765  checkErrorResponseType(record, ErrorType.NOT_FOUND);
766  }
767 
768  @Test
769  public void errorMessageStatusBoltSpeakerBoltTest() throws Exception {
770  String flowId = UUID.randomUUID().toString();
771  ConsumerRecord<String, String> record;
772 
773  createFlow(flowId);
774 
775  record = cacheConsumer.pollMessage();
776  assertNotNull(record);
777  assertNotNull(record.value());
778  record = nbConsumer.pollMessage();
779  assertNotNull(record);
780  assertNotNull(record.value());
781 
782  statusFlow(flowId);
783 
784  record = nbConsumer.pollMessage();
785  checkFlowReadStatus(record, flowId, FlowState.ALLOCATED);
786 
787  errorFlowSpeakerCommand(flowId);
788 
789  statusFlow(flowId);
790 
791  record = nbConsumer.pollMessage();
792  checkFlowReadStatus(record, flowId, FlowState.DOWN);
793  }
794 
795  @Test
796  @Ignore("Not reliable during batch run")
797  public void ctrlDumpHandler() throws Exception {
798  CtrlRequest request = new CtrlRequest("flowtopology/" + ComponentType.CRUD_BOLT.toString(),
799  new RequestData("dump"), 1, "dump-correlation-id", Destination.WFM_CTRL);
800 
801  sendMessage(request, flowTopology.getConfig().getKafkaFlowTopic());
802 
803  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
804 
805  assertNotNull(raw);
806  assertNotNull(raw.value());
807 
808  Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
809  CtrlResponse response = (CtrlResponse) responseGeneric;
810  ResponseData payload = response.getData();
811 
812  assertEquals(request.getCorrelationId(), response.getCorrelationId());
813  assertEquals(ComponentType.CRUD_BOLT.toString(), payload.getComponent());
814  assertTrue(payload instanceof DumpStateResponseData);
815  }
816 
817  @Test
818  public void shouldSyncCacheProvideDifferenceWithFlowsTest() throws Exception {
819  String flowId = UUID.randomUUID().toString();
820 
821  createFlow(flowId);
822 
823  nbConsumer.clear();
824 
826  CommandMessage message = new CommandMessage(commandData, 0, "sync-cache-flow", Destination.WFM);
827  sendFlowMessage(message);
828 
829  String nbMessageValue = nbConsumer.pollMessageValue();
830  assertNotNull(nbMessageValue);
831 
832  InfoMessage infoMessage = objectMapper.readValue(nbMessageValue, InfoMessage.class);
833  FlowCacheSyncResponse infoData = (FlowCacheSyncResponse) infoMessage.getData();
834  FlowCacheSyncResults flowNbPayload = infoData.getPayload();
835  assertNotNull(flowNbPayload);
836  assertEquals(1, flowNbPayload.getDroppedFlows().size());
837  assertEquals(flowId, flowNbPayload.getDroppedFlows().get(0));
838  }
839 
840  @Test
841  public void shouldSyncCacheWithFlowsTest() throws Exception {
842  String flowId = UUID.randomUUID().toString();
843 
844  createFlow(flowId);
845 
846  cacheConsumer.pollMessage();
847  cacheConsumer.clear();
848 
850  CommandMessage message = new CommandMessage(commandData, 0, "sync-cache-flow", Destination.WFM);
851  sendFlowMessage(message);
852 
853  String cacheMessageValue = cacheConsumer.pollMessageValue();
854  InfoMessage infoMessage = objectMapper.readValue(cacheMessageValue, InfoMessage.class);
855  FlowInfoData infoData = (FlowInfoData) infoMessage.getData();
856  assertEquals(FlowOperation.CACHE, infoData.getOperation());
857  assertEquals(flowId, infoData.getFlowId());
858 
859  nbConsumer.clear();
860 
861  statusFlow(flowId);
862 
863  checkErrorResponseType(nbConsumer.pollMessage(), ErrorType.NOT_FOUND);
864  }
865 
866  @Test
867  public void shouldInvalidateCacheWithFlowsTest() throws Exception {
868  String flowId = UUID.randomUUID().toString();
869 
870  createFlow(flowId);
871 
872  cacheConsumer.pollMessage();
873  cacheConsumer.clear();
874 
876  CommandMessage message = new CommandMessage(commandData, 0, "sync-cache-flow", Destination.WFM);
877  sendFlowMessage(message);
878 
879  String cacheMessageValue = cacheConsumer.pollMessageValue();
880  InfoMessage infoMessage = objectMapper.readValue(cacheMessageValue, InfoMessage.class);
881  FlowInfoData infoData = (FlowInfoData) infoMessage.getData();
882  assertEquals(FlowOperation.CACHE, infoData.getOperation());
883  assertEquals(flowId, infoData.getFlowId());
884 
885  nbConsumer.clear();
886 
887  statusFlow(flowId);
888 
889  checkErrorResponseType(nbConsumer.pollMessage(), ErrorType.NOT_FOUND);
890  }
891 
892  private void checkFlowReadStatus(
893  ConsumerRecord<String, String> record, String flowId, FlowState state) throws IOException {
894  assertNotNull(record);
895  assertNotNull(record.value());
896 
897  InfoMessage message = objectMapper.readValue(record.value(), InfoMessage.class);
898  assertNotNull(message);
899 
900  FlowReadResponse flowResponse = (FlowReadResponse) message.getData();
901  assertNotNull(flowResponse);
902 
903  BidirectionalFlow flowPayload = flowResponse.getPayload();
904  assertNotNull(flowPayload);
905  assertEquals(flowId, flowPayload.getFlowId());
906  assertEquals(state, flowPayload.getForward().getState());
907  }
908 
909  private void checkErrorResponseType(ConsumerRecord<String, String> record, ErrorType type) throws IOException {
910  assertNotNull(record);
911  assertNotNull(record.value());
912 
913  ErrorMessage errorMessage = objectMapper.readValue(record.value(), ErrorMessage.class);
914  assertNotNull(errorMessage);
915  assertEquals(type, errorMessage.getData().getErrorType());
916  }
917 
918  private Flow deleteFlow(final String flowId) throws IOException {
919  System.out.println("NORTHBOUND: Delete flow");
920  Flow payload = new Flow();
921  payload.setFlowId(flowId);
922  FlowDeleteRequest commandData = new FlowDeleteRequest(payload);
923  CommandMessage message = new CommandMessage(commandData, 0, "delete-flow", Destination.WFM);
924 
925  //sendNorthboundMessage(message);
926  //sendTopologyEngineMessage(message);
927  sendFlowMessage(message);
928 
929  return payload;
930  }
931 
932  private Flow createFlow(final String flowId) throws IOException {
933  System.out.println("NORTHBOUND: Create flow");
934  Flow flowPayload =
935  new Flow(flowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
936  new SwitchId("ff:00"), 1, 2);
937  FlowCreateRequest commandData = new FlowCreateRequest(flowPayload);
938  CommandMessage message = new CommandMessage(commandData, 0, "create-flow", Destination.WFM);
939  //sendNorthboundMessage(message);
940  sendFlowMessage(message);
941  return flowPayload;
942  }
943 
944  private Flow updateFlow(final String flowId) throws IOException {
945  System.out.println("NORTHBOUND: Update flow");
946  Flow flowPayload =
947  new Flow(flowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
948  new SwitchId("ff:00"), 1, 2);
949  FlowUpdateRequest commandData = new FlowUpdateRequest(flowPayload);
950  CommandMessage message = new CommandMessage(commandData, 0, "update-flow", Destination.WFM);
951  //sendNorthboundMessage(message);
952  sendFlowMessage(message);
953  return flowPayload;
954  }
955 
956  private void statusFlow(final String flowId) throws IOException {
957  System.out.println("NORTHBOUND: Status flow");
958  FlowReadRequest commandData = new FlowReadRequest(flowId);
959  CommandMessage message = new CommandMessage(commandData, 0, "status-flow", Destination.WFM);
960  //sendNorthboundMessage(message);
961  sendFlowMessage(message);
962  }
963 
964  private PathInfoData pathFlow(final String flowId) throws IOException {
965  System.out.println("NORTHBOUND: Path flow");
966  FlowReadRequest commandData = new FlowReadRequest(flowId);
967  CommandMessage message = new CommandMessage(commandData, 0, "path-flow", Destination.WFM);
968  //sendNorthboundMessage(message);
969  sendFlowMessage(message);
970  return new PathInfoData(0L, Collections.emptyList());
971  }
972 
973  private void getFlow(final String flowId) throws IOException {
974  System.out.println("NORTHBOUND: Get flow");
975  FlowReadRequest commandData = new FlowReadRequest(flowId);
976  CommandMessage message = new CommandMessage(commandData, 0, "get-flow", Destination.WFM);
977  //sendNorthboundMessage(message);
978  sendFlowMessage(message);
979  }
980 
981  private void dumpFlows() throws IOException {
982  System.out.println("NORTHBOUND: Get flows");
983  FlowsDumpRequest commandData = new FlowsDumpRequest();
984  CommandMessage message = new CommandMessage(commandData, 0, "get-flows", Destination.WFM);
985  //sendNorthboundMessage(message);
986  sendFlowMessage(message);
987  }
988 
989  private void sendTopologyEngineMessage(final Message message) throws IOException {
990  String request = objectMapper.writeValueAsString(message);
991  kProducer.pushMessage(topologyConfig.getKafkaTopoEngTopic(), request);
992  }
993 
994  private InstallOneSwitchFlow baseInstallFlowCommand(final String flowId) throws IOException {
995  System.out.println("TOPOLOGY: Install flow");
996  InstallOneSwitchFlow commandData = new InstallOneSwitchFlow(0L, flowId,
997  COOKIE, new SwitchId("ff:04"), 1, 2, 0, 0, OutputVlanType.NONE, 10000L, 0L);
998  CommandMessage commandMessage = new CommandMessage(commandData, 0, "install-flow", Destination.WFM);
999  //sendTopologyEngineMessage(commandMessage);
1000  //sendSpeakerMessage(commandMessage);
1001  sendFlowMessage(commandMessage);
1002  return commandData;
1003  }
1004 
1005  private RemoveFlow removeFlowCommand(final String flowId) throws IOException {
1006  System.out.println("TOPOLOGY: Remove flow");
1007  RemoveFlow commandData = new RemoveFlow(0L, flowId, COOKIE, new SwitchId("ff:04"), 0L,
1008  DeleteRulesCriteria.builder().cookie(COOKIE).build());
1009  CommandMessage commandMessage = new CommandMessage(commandData, 0, "remove-flow", Destination.WFM);
1010  //sendTopologyEngineMessage(commandMessage);
1011  sendFlowMessage(commandMessage);
1012  return commandData;
1013  }
1014 
1015  private Flow getFlowCommand(final String flowId) throws IOException {
1016  System.out.println("TOPOLOGY: Get flow");
1017  Flow flowPayload =
1018  new Flow(flowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
1019  new SwitchId("ff:00"), 1, 2);
1020  FlowResponse infoData = new FlowResponse(flowPayload);
1021  InfoMessage infoMessage = new InfoMessage(infoData, 0, "get-flow", Destination.WFM);
1022  sendTopologyEngineMessage(infoMessage);
1023  return flowPayload;
1024  }
1025 
1026  private List<String> dumpFlowCommand(final String flowId) throws IOException {
1027  System.out.println("TOPOLOGY: Get flows");
1028  Flow flow =
1029  new Flow(flowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
1030  new SwitchId("ff:00"), 1, 2);
1031  List<String> payload = Collections.singletonList(flow.getFlowId());
1032  FlowsResponse infoData = new FlowsResponse(payload);
1033  InfoMessage infoMessage = new InfoMessage(infoData, 0, "dump-flows", Destination.WFM);
1034  sendTopologyEngineMessage(infoMessage);
1035  return payload;
1036  }
1037 
1038  private PathInfoData pathFlowCommand(final String flowId) throws IOException {
1039  System.out.println("TOPOLOGY: Path flow");
1040  PathInfoData pathInfoData = new PathInfoData(
1041  0L, Collections.singletonList(new PathNode(new SwitchId("ff:00"), 1, 0, null)));
1042  Flow flow = Flow.builder().flowId(flowId).flowPath(pathInfoData).build();
1043  FlowReadResponse infoData = new FlowReadResponse(new BidirectionalFlow(flow, flow));
1044  InfoMessage infoMessage =
1045  new InfoMessage(infoData, 0, "path-flow", Destination.WFM);
1046  sendTopologyEngineMessage(infoMessage);
1047  return pathInfoData;
1048  }
1049 
1050  private ErrorMessage errorFlowTopologyEngineCommand(final String flowId, final ErrorType type) throws IOException {
1051  System.out.println("TOPOLOGY: Error flow");
1052  ErrorData errorData = new ErrorData(type, "Could not operate with flow", flowId);
1053  ErrorMessage errorMessage = new ErrorMessage(errorData, 0, "error-flow", Destination.WFM);
1054  //sendTopologyEngineMessage(errorMessage);
1055  sendMessage(errorMessage, topologyConfig.getKafkaFlowTopic());
1056  return errorMessage;
1057  }
1058 
1059  private void sendSpeakerMessage(final Message message) throws IOException {
1060  String request = objectMapper.writeValueAsString(message);
1061  kProducer.pushMessage(topologyConfig.getKafkaSpeakerTopic(), request);
1062  }
1063 
1064  private Message baseInstallRuleCommand(final Message message) throws IOException {
1065  System.out.println("TOPOLOGY: Install rule");
1066  sendMessage(message, topologyConfig.getKafkaFlowTopic());
1067  return message;
1068  }
1069 
1070  private Message removeRuleCommand(final Message message) throws IOException {
1071  System.out.println("TOPOLOGY: Remove rule");
1072  sendMessage(message, topologyConfig.getKafkaFlowTopic());
1073  return message;
1074  }
1075 
1076  private ErrorMessage errorFlowSpeakerCommand(final String flowId) throws IOException {
1077  System.out.println("TOPOLOGY: Error rule");
1078  ErrorData errorData = new ErrorData(ErrorType.REQUEST_INVALID, "Could not operate with flow", flowId);
1079  ErrorMessage errorMessage = new ErrorMessage(errorData, 0, "error-flow", Destination.WFM_TRANSACTION);
1080  //sendSpeakerMessage(errorMessage);
1081  sendMessage(errorMessage, topologyConfig.getKafkaFlowTopic());
1082  return errorMessage;
1083  }
1084 
1085  private void sendFlowMessage(final CommandMessage message) throws IOException {
1086  sendMessage(message, topologyConfig.getKafkaFlowTopic());
1087  }
1088 
1089  private void sendNorthboundMessage(final CommandMessage message) throws IOException {
1090  sendMessage(message, topologyConfig.getKafkaNorthboundTopic());
1091  }
1092 
1093  private void sendMessage(Object object, String topic) throws IOException {
1094  String request = objectMapper.writeValueAsString(object);
1095  kProducer.pushMessage(topic, request);
1096  }
1097 
1098  private ImmutablePair<Flow, Flow> getFlowPayload(InfoMessage message) {
1099  InfoData data = message.getData();
1100  FlowInfoData flow = (FlowInfoData) data;
1101  return flow.getPayload();
1102  }
1103 
1104  private void sendClearState() throws IOException, InterruptedException {
1105  CtrlRequest request = new CtrlRequest("flowtopology/" + ComponentType.CRUD_BOLT.toString(),
1106  new RequestData("clearState"), 1, "clear-state-correlation-id", Destination.WFM_CTRL);
1107  sendMessage(request, topologyConfig.getKafkaCtrlTopic());
1108 
1109  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
1110  assertNotNull(raw);
1111 
1112  CtrlResponse response = (CtrlResponse) objectMapper.readValue(raw.value(), Message.class);
1113  assertEquals(request.getCorrelationId(), response.getCorrelationId());
1114  }
1115 }
static LaunchEnvironment makeLaunchEnvironment()
void setTransactionId(final Long transactionId)
Definition: BaseFlow.java:98
void pushMessage(final String topic, final String data)
ConsumerRecord< String, String > pollMessage()