Open Kilda Java Documentation
CacheTopologyTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.cache;
17 
48 
49 import com.fasterxml.jackson.databind.ObjectMapper;
50 import com.google.common.collect.ImmutableList;
51 import org.apache.commons.lang.StringUtils;
52 import org.apache.kafka.clients.consumer.ConsumerRecord;
53 import org.apache.storm.Config;
54 import org.apache.storm.generated.StormTopology;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Ignore;
60 import org.junit.Test;
61 
62 import java.io.IOException;
63 import java.util.Collections;
64 import java.util.HashSet;
65 import java.util.List;
66 import java.util.Properties;
67 import java.util.Set;
68 import java.util.UUID;
69 
70 public class CacheTopologyTest extends AbstractStormTest {
71  private static CacheTopology topology;
72  private static final ObjectMapper objectMapper = new ObjectMapper();
73  private static final String firstFlowId = "first-flow";
74  private static final String secondFlowId = "second-flow";
75  private static final String thirdFlowId = "third-flow";
76  private static final SwitchInfoData sw = new SwitchInfoData(new SwitchId("ff:03"),
77  SwitchState.ADDED, "127.0.0.1", "localhost", "test switch", "kilda");
78  private static final ImmutablePair<Flow, Flow> firstFlow = new ImmutablePair<>(
79  new Flow(firstFlowId, 10000, false, "", sw.getSwitchId(), 1, 2, sw.getSwitchId(), 1, 2),
80  new Flow(firstFlowId, 10000, false, "", sw.getSwitchId(), 1, 2, sw.getSwitchId(), 1, 2));
81  private static final ImmutablePair<Flow, Flow> secondFlow = new ImmutablePair<>(
82  new Flow(secondFlowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
83  new SwitchId("ff:00"), 1, 2),
84  new Flow(secondFlowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
85  new SwitchId("ff:00"), 1, 2));
86  private static final ImmutablePair<Flow, Flow> thirdFlow = new ImmutablePair<>(
87  new Flow(thirdFlowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
88  new SwitchId("ff:00"), 1, 2),
89  new Flow(thirdFlowId, 10000, false, "", new SwitchId("ff:00"), 1, 2,
90  new SwitchId("ff:00"), 1, 2));
91  private static final Set<ImmutablePair<Flow, Flow>> flows = new HashSet<>();
92  private static final NetworkInfoData dump = new NetworkInfoData(
93  "test", Collections.singleton(sw), Collections.emptySet(), Collections.emptySet(), flows);
94 
95  private static TestKafkaConsumer teConsumer;
96  private static TestKafkaConsumer flowConsumer;
97  private static TestKafkaConsumer ctrlConsumer;
98 
99  private static Neo4jFixture fakeNeo4jDb;
100 
101  @BeforeClass
102  public static void setupOnce() throws Exception {
104 
105  Properties configOverlay = new Properties();
106 
107  fakeNeo4jDb = new Neo4jFixture(fsData.getRoot().toPath(), NEO4J_LISTEN_ADDRESS);
108  fakeNeo4jDb.start();
109  configOverlay.setProperty("neo4j.hosts", fakeNeo4jDb.getListenAddress());
110 
111  flows.add(firstFlow);
112  flows.add(secondFlow);
113 
114  LaunchEnvironment launchEnvironment = makeLaunchEnvironment();
115  launchEnvironment.setupOverlay(configOverlay);
116  topology = new CacheTopology(launchEnvironment);
117  StormTopology stormTopology = topology.createTopology();
118 
119  Config config = stormConfig();
120  cluster.submitTopology(CacheTopologyTest.class.getSimpleName(), config, stormTopology);
121 
122  teConsumer = new TestKafkaConsumer(
123  topology.getConfig().getKafkaTopoEngTopic(), Destination.TOPOLOGY_ENGINE,
124  kafkaProperties(UUID.nameUUIDFromBytes(Destination.TOPOLOGY_ENGINE.toString().getBytes()).toString())
125  );
126  teConsumer.start();
127 
128  flowConsumer = new TestKafkaConsumer(
129  topology.getConfig().getKafkaFlowTopic(), Destination.WFM,
130  kafkaProperties(UUID.nameUUIDFromBytes(Destination.WFM.toString().getBytes()).toString())
131  );
132  flowConsumer.start();
133 
134  ctrlConsumer = new TestKafkaConsumer(
135  topology.getConfig().getKafkaCtrlTopic(), Destination.CTRL_CLIENT,
136  kafkaProperties(UUID.nameUUIDFromBytes(Destination.CTRL_CLIENT.toString().getBytes()).toString())
137  );
138  ctrlConsumer.start();
139  }
140 
141  @Before
142  public void init() throws Exception {
144  flowConsumer.clear();
145  teConsumer.clear();
146  ctrlConsumer.clear();
147 
148  sendClearState();
149  }
150 
151  @AfterClass
152  public static void teardownOnce() throws Exception {
153 
154  flowConsumer.wakeup();
155  flowConsumer.join();
156  teConsumer.wakeup();
157  teConsumer.join();
158  ctrlConsumer.wakeup();
159  ctrlConsumer.join();
160 
161  fakeNeo4jDb.stop();
162 
164  }
165 
166 
167  @Test
169  System.out.println("Flow Update Test");
170  teConsumer.clear();
171  sendFlowUpdate(thirdFlow);
172 
173  ConsumerRecord<String, String> flow = teConsumer.pollMessage();
174 
175  Assert.assertNotNull(flow);
176  Assert.assertNotNull(flow.value());
177 
178  InfoMessage infoMessage = objectMapper.readValue(flow.value(), InfoMessage.class);
179  FlowInfoData infoData = (FlowInfoData) infoMessage.getData();
180  Assert.assertNotNull(infoData);
181 
182  Assert.assertEquals(thirdFlow, infoData.getPayload());
183  }
184 
185  @Test
186  public void ctrlListHandler() throws Exception {
187  CtrlRequest request = new CtrlRequest(
188  "cachetopology/*", new RequestData("list"), 1, "list-correlation-id", Destination.WFM_CTRL);
189 
190  sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
191 
192  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
193 
194  Assert.assertNotNull(raw); // TODO: FAILED
195  Assert.assertNotNull(raw.value());
196 
197  Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
198  CtrlResponse response = (CtrlResponse) responseGeneric;
199  ResponseData payload = response.getData();
200 
201  Assert.assertEquals(request.getCorrelationId(), response.getCorrelationId());
202  Assert.assertEquals(CacheTopology.BOLT_ID_CACHE, payload.getComponent());
203  }
204 
205  @Test
206  public void ctrlDumpHandler() throws Exception {
207  CtrlRequest request = new CtrlRequest(
208  "cachetopology/*", new RequestData("dump"), 1, "dump-correlation-id", Destination.WFM_CTRL);
209 
210  sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
211 
212  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
213 
214  Assert.assertNotNull(raw); // TODO: FAILED
215  Assert.assertNotNull(raw.value());
216 
217  Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
218  CtrlResponse response = (CtrlResponse) responseGeneric;
219  ResponseData payload = response.getData();
220 
221  Assert.assertEquals(request.getCorrelationId(), response.getCorrelationId());
222  Assert.assertEquals(CacheTopology.BOLT_ID_CACHE, payload.getComponent());
223  Assert.assertTrue(payload instanceof DumpStateResponseData);
224  }
225 
226  @Test
227  public void ctrlSpecificRoute() throws Exception {
228  CtrlRequest request = new CtrlRequest(
229  "cachetopology/cache", new RequestData("dump"), 1, "route-correlation-id", Destination.WFM_CTRL);
230  sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
231 
232  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
233 
234  Assert.assertNotNull(raw); // TODO: FAILED
235  Assert.assertNotNull(raw.value());
236 
237  Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
238  CtrlResponse response = (CtrlResponse) responseGeneric;
239  Assert.assertEquals(request.getCorrelationId(), response.getCorrelationId());
240  }
241 
242  @Ignore
243  @Test
244  public void flowShouldBeReroutedWhenIslDies() throws Exception {
245  final SwitchId destSwitchId = new SwitchId("ff:02");
246  final String flowId = "flowId";
247  sendData(sw);
248 
249  SwitchInfoData destSwitch = new SwitchInfoData(destSwitchId, SwitchState.ACTIVATED, StringUtils.EMPTY,
250  StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY);
251  sendData(destSwitch);
252 
253  List<PathNode> path = ImmutableList.of(
254  new PathNode(sw.getSwitchId(), 0, 0),
255  new PathNode(destSwitch.getSwitchId(), 0, 1)
256  );
257  IslInfoData isl = new IslInfoData(0L, path, 0L, IslChangeType.DISCOVERED, 0L);
258  sendData(isl);
259 
260  FlowInfoData flowData = buildFlowInfoData(flowId, sw.getSwitchId(), destSwitchId, path);
261  sendData(flowData);
262 
263  //mark isl as failed
264  flowConsumer.clear();
266  sendData(isl);
267 
268  //we are expecting that flow should be rerouted
269  ConsumerRecord<String, String> record = flowConsumer.pollMessage();
270  Assert.assertNotNull(record);
271  CommandMessage message = objectMapper.readValue(record.value(), CommandMessage.class);
272  Assert.assertNotNull(message);
274  Assert.assertEquals(command.getFlowId(), flowId);
275  }
276 
277  private static <T extends Message> void sendMessage(T message, String topic) throws IOException {
278  String request = objectMapper.writeValueAsString(message);
279  kProducer.pushMessage(topic, request);
280  }
281 
282  private static void sendNetworkDump(NetworkInfoData data, String correlationId) throws IOException {
283  System.out.println("Topology-Engine: Send Network Dump");
284  InfoMessage info = new InfoMessage(data, 0, correlationId, Destination.WFM_CACHE);
285  sendMessage(info, topology.getConfig().getKafkaTopoCacheTopic());
286  }
287 
288  private static <T extends InfoData> void sendData(T infoData) throws IOException {
289  InfoMessage info = new InfoMessage(infoData, 0, UUID.randomUUID().toString(), Destination.WFM_CACHE);
290  sendMessage(info, topology.getConfig().getKafkaTopoCacheTopic());
291  }
292 
293  private static void sendFlowUpdate(ImmutablePair<Flow, Flow> flow) throws IOException {
294  System.out.println("Flow Topology: Send Flow Creation Request");
295  String correlationId = UUID.randomUUID().toString();
296  FlowInfoData data = new FlowInfoData(flow.getLeft().getFlowId(),
297  flow, FlowOperation.CREATE, correlationId);
298  // TODO: as part of getting rid of OutputTopic, used TopoDiscoTopic. This feels wrong for
299  // Flows.
300  InfoMessage message = new InfoMessage(data, System.currentTimeMillis(), correlationId);
301  sendMessage(message, topology.getConfig().getKafkaTopoCacheTopic());
302  }
303 
304  private static void sendClearState() throws IOException, InterruptedException {
305  CtrlRequest request = new CtrlRequest(
306  "cachetopology/cache", new RequestData("clearState"), 1, "route-correlation-id",
307  Destination.WFM_CTRL);
308  sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
309 
310  ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();
311  // assertNotNull(raw);
312  if (raw != null) {
313  CtrlResponse response = (CtrlResponse) objectMapper.readValue(raw.value(), Message.class);
314  Assert.assertEquals(request.getCorrelationId(), response.getCorrelationId());
315  }
316  }
317 
318  private static void sendNetworkDumpRequest() throws IOException, InterruptedException {
319  CtrlRequest request = new CtrlRequest("cachetopology/cache", new RequestData("dump"),
320  System.currentTimeMillis(), UUID.randomUUID().toString(), Destination.WFM_CTRL);
321  sendMessage(request, topology.getConfig().getKafkaCtrlTopic());
322  }
323 
324  private NetworkDump getNetworkDump(ConsumerRecord<String, String> raw) throws IOException {
325  Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
326  CtrlResponse response = (CtrlResponse) responseGeneric;
327  DumpStateResponseData data = (DumpStateResponseData) response.getData();
328  CacheBoltState cacheState = (CacheBoltState) data.getState();
329  return cacheState.getNetwork();
330  }
331 
332  private FlowInfoData buildFlowInfoData(String flowId, SwitchId srcSwitch, SwitchId dstSwitch, List<PathNode> path) {
333  Flow flow = new Flow();
334  flow.setFlowId(flowId);
335  flow.setSourceSwitch(srcSwitch);
336  flow.setDestinationSwitch(dstSwitch);
337  flow.setState(FlowState.UP);
338 
339  PathInfoData pathInfoData = new PathInfoData(0L, path);
340  flow.setFlowPath(pathInfoData);
341  ImmutablePair<Flow, Flow> immutablePair = new ImmutablePair<>(flow, flow);
342  return new FlowInfoData(flowId, immutablePair, FlowOperation.CREATE, UUID.randomUUID().toString());
343  }
344 
345  private static String waitDumpRequest() throws InterruptedException, IOException {
346  ConsumerRecord<String, String> raw;
347  int sec = 0;
348  while ((raw = teConsumer.pollMessage(1000)) == null) {
349  System.out.println("Waiting For Dump Request");
350  Assert.assertTrue("Waiting For Dump Request failed", ++sec < 20);
351  }
352  System.out.println("Waiting For Dump Request");
353 
354  Message request = objectMapper.readValue(raw.value(), Message.class);
355  return request.getCorrelationId();
356  }
357 }
void setupOverlay(Properties overlay)
static LaunchEnvironment makeLaunchEnvironment()
ImmutablePair< Flow, Flow > getPayload()
def command(payload, fields)
Definition: share.py:102
void pushMessage(final String topic, final String data)
ConsumerRecord< String, String > pollMessage()