Open Kilda Java Documentation
StormTopologyLcm.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.atdd;
17 
18 import static com.google.common.base.Charsets.UTF_8;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertTrue;
25 
26 import org.openkilda.KafkaUtils;
39 
40 import com.spotify.docker.client.DefaultDockerClient;
41 import com.spotify.docker.client.DockerClient;
42 import com.spotify.docker.client.DockerClient.ListContainersParam;
43 import com.spotify.docker.client.messages.Container;
44 import cucumber.api.java.en.And;
45 import cucumber.api.java.en.Given;
46 import cucumber.api.java.en.Then;
47 import cucumber.api.java.en.When;
48 import org.apache.commons.collections4.CollectionUtils;
49 import org.apache.commons.io.IOUtils;
50 import org.apache.commons.lang.StringUtils;
51 import org.glassfish.jersey.client.ClientConfig;
52 
53 import java.util.Set;
54 import java.util.concurrent.TimeUnit;
55 import javax.ws.rs.client.Client;
56 import javax.ws.rs.client.ClientBuilder;
57 import javax.ws.rs.core.Response;
58 
59 
60 public class StormTopologyLcm {
61 
62  private static final String WFM_CONTAINER_NAME = "/wfm";
63 
64  private Container wfmContainer;
65  private DockerClient dockerClient;
66  private KafkaUtils kafkaUtils;
67  private DumpStateManager expectedStateDumpsFromBolts;
68  private final String flowId = FlowUtils.getFlowName("simple-flow");
69 
70  public StormTopologyLcm() throws Exception {
71  dockerClient = DefaultDockerClient.fromEnv().build();
72  wfmContainer = dockerClient.listContainers(ListContainersParam.allContainers())
73  .stream()
74  .filter(container ->
75  container.names().contains(WFM_CONTAINER_NAME))
76  .findFirst().orElseThrow(
77  () -> new IllegalStateException("Can't find wfm container"));
78  kafkaUtils = new KafkaUtils();
79  }
80 
84  @When("^storm topologies are restarted$")
85  public void reloadStormTopologies() throws Exception {
86 
87  System.out.println("\n=====> Recreate wfm container");
88  dockerClient.restartContainer(wfmContainer.id());
89  dockerClient.waitContainer(wfmContainer.id());
90  }
91 
95  @Given("^active simple network topology with two switches and flow$")
96  public void activeSimpleNetworkTopologyWithTwoSwitchesAndFlow() throws Throwable {
97  // clearEverything
98  TestUtils.clearEverything(StringUtils.EMPTY);
100 
101  // Load topo from file and send to mininet
102  String topology = IOUtils.toString(this.getClass().getResourceAsStream(
103  "/topologies/simple-topology.json"), UTF_8);
105 
106  // Create and check flow
107  FlowPayload flowPayload = new FlowPayload(flowId,
108  new FlowEndpointPayload(new SwitchId(1L), 1, 100),
109  new FlowEndpointPayload(new SwitchId(2L), 1, 100),
110  10000, false, flowId, null, FlowState.UP.getState());
111 
112  FlowPayload response = null;
113  for (int i = 0; i < 10; ++i) {
114  response = FlowUtils.putFlow(flowPayload);
115  if (response != null) {
116  break;
117  }
118  TimeUnit.SECONDS.sleep(1);
119  }
120 
121  assertNotNull(response);
122  response.setLastUpdated(null);
123  assertEquals(flowPayload, response);
124 
125  // Check traffic on new flow
126  assertTrue(trafficIsOk(true));
127 
128  // Save bolt state for compare it with new one after restart
129  expectedStateDumpsFromBolts = kafkaUtils.getStateDumpsFromBolts();
130  }
131 
135  @Then("^network topology in the same state$")
136  public void networkTopologyInTheSameState() throws Throwable {
137 
138  FlowIdStatusPayload flowStatus = null;
139  for (int i = 0; i < 6; ++i) {
140  flowStatus = FlowUtils.getFlowStatus(flowId);
141  if (flowStatus != null && FlowState.UP == flowStatus.getStatus()) {
142  break;
143  }
144  TimeUnit.SECONDS.sleep(10);
145  }
146 
147  assertNotNull(flowStatus);
148  assertEquals(FlowState.UP, flowStatus.getStatus());
149  }
150 
155  @And("^all storm topologies in the same state$")
156  public void allStormTopologiesInTheSameState() throws Throwable {
157 
158  DumpStateManager actualSateDumpsFromBolts = kafkaUtils.getStateDumpsFromBolts();
159 
160  // CacheBolt part
161  Set<ImmutablePair<Flow, Flow>> actualCacheBoltFlows = actualSateDumpsFromBolts
163  Set<ImmutablePair<Flow, Flow>> expectedCacheBoltFlows = expectedStateDumpsFromBolts
165  assertTrue(CollectionUtils.isEqualCollection(actualCacheBoltFlows, expectedCacheBoltFlows));
166 
167  // OFELinkBolt
168  OFELinkBoltState actualOfeLinkBoltState = actualSateDumpsFromBolts.getOfeLinkBoltState();
169  OFELinkBoltState expectedOfeLinkBoltState = expectedStateDumpsFromBolts.getOfeLinkBoltState();
170  assertTrue(CollectionUtils.isEqualCollection(expectedOfeLinkBoltState.getDiscovery(),
171  actualOfeLinkBoltState.getDiscovery()));
172  }
173 
174  @And("^traffic flows through flow$")
175  public void trafficFlowsThroughFlow() throws Throwable {
176  assertTrue(trafficIsOk(true));
177  }
178 
179  private boolean trafficIsOk(boolean expectedResult) throws Throwable {
180 
181  //TODO: move that to utils
182  if (isTrafficTestsEnabled()) {
183  System.out.println("=====> Send traffic");
184 
185  long current = System.currentTimeMillis();
186  Client client = ClientBuilder.newClient(new ClientConfig());
187  Response result = client
188  .target(trafficEndpoint)
189  .path("/checkflowtraffic")
190  .queryParam("srcswitch", "switch1")
191  .queryParam("dstswitch", "switch2")
192  .queryParam("srcport", "1")
193  .queryParam("dstport", "1")
194  .queryParam("srcvlan", "1000")
195  .queryParam("dstvlan", "1000")
196  .request()
197  .get();
198 
199  System.out.println(String.format("======> Response = %s", result.toString()));
200  System.out.println(String.format("======> Send traffic Time: %,.3f", getTimeDuration(current)));
201 
202  return result.getStatus() == 200;
203  } else {
204  return expectedResult;
205  }
206  }
207 }
static boolean isTrafficTestsEnabled()
Definition: FlowUtils.java:612
Set< ImmutablePair< Flow, Flow > > getFlows()
Definition: FlowDump.java:28
static FlowIdStatusPayload getFlowStatus(final String flowId)
Definition: FlowUtils.java:367
static boolean CreateMininetTopology(String json)
list result
Definition: plan-d.py:72
DumpStateManager getStateDumpsFromBolts()
static FlowPayload putFlow(final FlowPayload payload)
Definition: FlowUtils.java:163
static String getFlowName(final String flowId)
Definition: FlowUtils.java:595
static double getTimeDuration(final long current)
Definition: FlowUtils.java:605