Open Kilda Java Documentation
ReplaceInstallFlowTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka;
17 
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.capture;
20 import static org.easymock.EasyMock.createMock;
21 import static org.easymock.EasyMock.expect;
22 import static org.easymock.EasyMock.newCapture;
23 import static org.easymock.EasyMock.replay;
24 import static org.junit.Assert.assertEquals;
26 import static org.openkilda.messaging.Utils.MAPPER;
27 
44 
45 import com.google.common.base.Charsets;
46 import com.google.common.io.Resources;
47 import com.google.common.util.concurrent.Futures;
48 import com.google.common.util.concurrent.MoreExecutors;
49 import net.floodlightcontroller.core.IOFSwitch;
50 import net.floodlightcontroller.core.SwitchDescription;
51 import net.floodlightcontroller.core.internal.IOFSwitchService;
52 import net.floodlightcontroller.core.module.FloodlightModuleContext;
53 import net.floodlightcontroller.core.module.FloodlightModuleException;
54 import net.floodlightcontroller.restserver.IRestApiService;
55 import org.apache.kafka.clients.consumer.ConsumerRecord;
56 import org.easymock.Capture;
57 import org.easymock.CaptureType;
58 import org.junit.Before;
59 import org.junit.Test;
60 import org.projectfloodlight.openflow.protocol.OFBarrierReply;
61 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
62 import org.projectfloodlight.openflow.protocol.OFFlowAdd;
63 import org.projectfloodlight.openflow.protocol.OFMeterMod;
64 import org.projectfloodlight.openflow.types.DatapathId;
65 
66 import java.io.IOException;
67 import java.util.concurrent.ExecutorService;
68 import java.util.concurrent.TimeUnit;
69 
70 public class ReplaceInstallFlowTest {
71  private static final FloodlightModuleContext context = new FloodlightModuleContext();
72  private final ExecutorService parseRecordExecutor = MoreExecutors.sameThreadExecutor();
73  protected SwitchDescription switchDescription;
75  private IOFSwitchService ofSwitchService;
76  private KafkaMessageCollector collector;
77  private KafkaMessageProducer producer;
78 
86  private static CommandData prepareData(String value) throws IOException {
87  CommandMessage message = MAPPER.readValue(value, CommandMessage.class);
88  return message.getData();
89  }
90 
91  @Before
92  public void setUp() throws FloodlightModuleException {
93  final SwitchManager switchManager = new SwitchManager();
94  final PathVerificationService pathVerificationService = new PathVerificationService();
95 
96  ofSwitchService = createMock(IOFSwitchService.class);
97  producer = createMock(KafkaMessageProducer.class);
98 
99  context.addService(IOFSwitchService.class, ofSwitchService);
100  context.addService(IRestApiService.class, null);
101  context.addService(SwitchEventCollector.class, null);
102  context.addService(KafkaMessageProducer.class, producer);
103  context.addService(IPathVerificationService.class, pathVerificationService);
104  context.addService(ISwitchManager.class, switchManager);
105 
106  switchManager.init(context);
107 
108  collector = new KafkaMessageCollector();
109  context.addConfigParam(collector, "topic", "");
110  context.addConfigParam(collector, "bootstrap-servers", "");
111  collector.init(context);
112 
113  initScheme();
114  }
115 
116  protected void initScheme() {
118  switchDescription = new SwitchDescription("", "", "", "", "");
119  }
120 
121  @Test
122  public void installOneSwitchNoneFlow() throws IOException, InterruptedException {
123  String value = Resources.toString(getClass().getResource("/install_one_switch_none_flow.json"), Charsets.UTF_8);
125  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
126  OFFlowAdd flowCommand = scheme.oneSwitchNoneFlowMod(data.getInputPort(), data.getOutputPort(),
127  data.getMeterId(), 123L);
128  runTest(value, flowCommand, meterCommand, null, null);
129  }
130 
131  @Test
132  public void installOneSwitchReplaceFlow() throws IOException, InterruptedException {
133  String value = Resources.toString(getClass().getResource("/install_one_switch_replace_flow.json"), Charsets.UTF_8);
135  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
136  OFFlowAdd flowCommand = scheme.oneSwitchReplaceFlowMod(data.getInputPort(), data.getOutputPort(),
137  data.getInputVlanId(), data.getOutputVlanId(), data.getMeterId(), 123L);
138  runTest(value, flowCommand, meterCommand, null, null);
139  }
140 
141  @Test
142  public void installOneSwitchPushFlow() throws IOException, InterruptedException {
143  String value = Resources.toString(getClass().getResource("/install_one_switch_push_flow.json"), Charsets.UTF_8);
145  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
146  OFFlowAdd flowCommand = scheme.oneSwitchPushFlowMod(data.getInputPort(), data.getOutputPort(),
147  data.getOutputVlanId(), data.getMeterId(), 123L);
148  runTest(value, flowCommand, meterCommand, null, null);
149  }
150 
151  @Test
152  public void installOneSwitchPopFlow() throws IOException, InterruptedException {
153  String value = Resources.toString(getClass().getResource("/install_one_switch_pop_flow.json"), Charsets.UTF_8);
155  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
156  OFFlowAdd flowCommand = scheme.oneSwitchPopFlowMod(data.getInputPort(), data.getOutputPort(),
157  data.getInputVlanId(), data.getMeterId(), 123L);
158  runTest(value, flowCommand, meterCommand, null, null);
159  }
160 
161  @Test
162  public void installIngressNoneFlow() throws IOException, InterruptedException {
163  String value = Resources.toString(getClass().getResource("/install_ingress_none_flow.json"), Charsets.UTF_8);
165  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
166  OFFlowAdd flowCommand = scheme.ingressNoneFlowMod(data.getInputPort(), data.getOutputPort(),
167  data.getTransitVlanId(), data.getMeterId(), 123L);
168  runTest(value, flowCommand, meterCommand, null, null);
169  }
170 
171  @Test
172  public void installIngressReplaceFlow() throws IOException, InterruptedException {
173  String value = Resources.toString(getClass().getResource("/install_ingress_replace_flow.json"), Charsets.UTF_8);
175  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
176  OFFlowAdd flowCommand = scheme.ingressReplaceFlowMod(data.getInputPort(), data.getOutputPort(),
177  data.getInputVlanId(), data.getTransitVlanId(), data.getMeterId(), 123L);
178  runTest(value, flowCommand, meterCommand, null, null);
179  }
180 
181  @Test
182  public void installIngressPushFlow() throws IOException, InterruptedException {
183  String value = Resources.toString(getClass().getResource("/install_ingress_push_flow.json"), Charsets.UTF_8);
185  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
186  OFFlowAdd flowCommand = scheme.ingressPushFlowMod(data.getInputPort(), data.getOutputPort(),
187  data.getTransitVlanId(), data.getMeterId(), 123L);
188  runTest(value, flowCommand, meterCommand, null, null);
189  }
190 
191  @Test
192  public void installIngressPopFlow() throws IOException, InterruptedException {
193  String value = Resources.toString(getClass().getResource("/install_ingress_pop_flow.json"), Charsets.UTF_8);
195  OFMeterMod meterCommand = scheme.installMeter(data.getBandwidth(), 1024, data.getMeterId());
196  OFFlowAdd flowCommand = scheme.ingressPopFlowMod(data.getInputPort(), data.getOutputPort(),
197  data.getInputVlanId(), data.getTransitVlanId(), data.getMeterId(), 123L);
198  runTest(value, flowCommand, meterCommand, null, null);
199  }
200 
201  @Test
202  public void installEgressNoneFlow() throws IOException, InterruptedException {
203  String value = Resources.toString(getClass().getResource("/install_egress_none_flow.json"), Charsets.UTF_8);
205  OFFlowAdd flowCommand = scheme.egressNoneFlowMod(data.getInputPort(), data.getOutputPort(),
206  data.getTransitVlanId(), 123L);
207  runTest(value, flowCommand, null, null, null);
208  }
209 
210  @Test
211  public void installEgressReplaceFlow() throws IOException, InterruptedException {
212  String value = Resources.toString(getClass().getResource("/install_egress_replace_flow.json"), Charsets.UTF_8);
214  OFFlowAdd flowCommand = scheme.egressReplaceFlowMod(data.getInputPort(), data.getOutputPort(),
215  data.getTransitVlanId(), data.getOutputVlanId(), 123L);
216  runTest(value, flowCommand, null, null, null);
217  }
218 
219  @Test
220  public void installEgressPushFlow() throws IOException, InterruptedException {
221  String value = Resources.toString(getClass().getResource("/install_egress_push_flow.json"), Charsets.UTF_8);
223  OFFlowAdd flowCommand = scheme.egressPushFlowMod(data.getInputPort(), data.getOutputPort(),
224  data.getTransitVlanId(), data.getOutputVlanId(), 123L);
225  runTest(value, flowCommand, null, null, null);
226  }
227 
228  @Test
229  public void installEgressPopFlow() throws IOException, InterruptedException {
230  String value = Resources.toString(getClass().getResource("/install_egress_pop_flow.json"), Charsets.UTF_8);
232  OFFlowAdd flowCommand = scheme.egressPopFlowMod(data.getInputPort(), data.getOutputPort(),
233  data.getTransitVlanId(), 123L);
234  runTest(value, flowCommand, null, null, null);
235  }
236 
237  @Test
238  public void installTransitFlow() throws IOException, InterruptedException {
239  String value = Resources.toString(getClass().getResource("/install_transit_flow.json"), Charsets.UTF_8);
241  OFFlowAdd flowCommand = scheme.transitFlowMod(data.getInputPort(), data.getOutputPort(),
242  data.getTransitVlanId(), 123L);
243  runTest(value, flowCommand, null, null, null);
244  }
245 
253  private void runTest(final String value, final OFFlowAdd flowCommand, final OFMeterMod meterCommand,
254  final OFFlowAdd reverseFlowCommand, final OFMeterMod reverseMeterCommand)
255  throws InterruptedException {
256  // construct kafka message
257  ConsumerRecord<String, String> record = new ConsumerRecord<>("", 0, 0, "", value);
258 
259  ConfigurationProvider provider = ConfigurationProvider.of(context, collector);
260  KafkaTopicsConfig topicsConfig = provider.getConfiguration(KafkaTopicsConfig.class);
261 
262  // create parser instance
263  ConsumerContext kafkaContext = new ConsumerContext(context, topicsConfig);
264  RecordHandler parseRecord = new RecordHandler(kafkaContext, record, new MeterPool());
265  // init test mocks
266  Capture<OFFlowAdd> flowAddCapture = flowCommand == null ? null : newCapture(CaptureType.ALL);
267  Capture<OFMeterMod> meterAddCapture = meterCommand == null ? null : newCapture(CaptureType.ALL);
268  prepareMocks(flowAddCapture, meterAddCapture, reverseFlowCommand != null, reverseMeterCommand != null);
269 
270  // run parser and wait for termination or timeout
271  parseRecordExecutor.execute(parseRecord);
272  parseRecordExecutor.shutdown();
273  parseRecordExecutor.awaitTermination(10, TimeUnit.SECONDS);
274 
275  // verify results
276  if (meterCommand != null) {
277  assertEquals(meterCommand, meterAddCapture.getValues().get(0));
278  if (reverseMeterCommand != null) {
279  assertEquals(reverseMeterCommand, meterAddCapture.getValues().get(1));
280  }
281  }
282  if (flowCommand != null) {
283  assertEquals(flowCommand, flowAddCapture.getValues().get(0));
284  if (reverseFlowCommand != null) {
285  assertEquals(reverseFlowCommand, flowAddCapture.getValues().get(1));
286  }
287  }
288  }
289 
296  private void prepareMocks(Capture<OFFlowAdd> flowAddCapture, Capture<OFMeterMod> meterAddCapture,
297  boolean needCheckReverseFlow, boolean needCheckReverseMeter) {
298  IOFSwitch iofSwitch = createMock(IOFSwitch.class);
299 
300  expect(ofSwitchService.getSwitch(anyObject(DatapathId.class))).andStubReturn(iofSwitch);
301  expect(iofSwitch.getOFFactory()).andStubReturn(ofFactory);
302  expect(iofSwitch.getSwitchDescription()).andStubReturn(switchDescription);
303 
304  if (meterAddCapture != null) {
305  expect(iofSwitch.write(capture(meterAddCapture))).andReturn(true);
306  expect(iofSwitch.writeRequest(anyObject(OFBarrierRequest.class)))
307  .andReturn(Futures.immediateFuture(createMock(OFBarrierReply.class)));
308  if (flowAddCapture != null) {
309  expect(iofSwitch.write(capture(flowAddCapture))).andReturn(true);
310  }
311  if (needCheckReverseMeter) {
312  expect(iofSwitch.write(capture(meterAddCapture))).andReturn(true);
313  }
314  if (needCheckReverseFlow) {
315  expect(iofSwitch.write(capture(flowAddCapture))).andReturn(true);
316  }
317  } else if (flowAddCapture != null) {
318  expect(iofSwitch.write(capture(flowAddCapture))).andReturn(true).times(needCheckReverseFlow ? 2 : 1);
319  }
320 
321  replay(ofSwitchService);
322  replay(iofSwitch);
323  }
324 }
default OFFlowAdd ingressPopFlowMod(int inputPort, int outputPort, int inputVlan, int transitVlan, long meterId, long cookie)
OFFlowAdd egressPushFlowMod(int inputPort, int outputPort, int transitVlanId, int outputVlan, long cookie)
static final ObjectMapper MAPPER
Definition: Utils.java:31
default OFFlowAdd oneSwitchReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int outputVlan, long meterId, long cookie)
default OFMeterMod installMeter(long bandwidth, long burstSize, long meterId)
void init(FloodlightModuleContext context)
OFFlowAdd egressPopFlowMod(int inputPort, int outputPort, int transitVlanId, long cookie)
value
Definition: nodes.py:62
default OFFlowAdd ingressNoneFlowMod(int inputPort, int outputPort, int transitVlan, long meterId, long cookie)
default OFFlowAdd oneSwitchPushFlowMod(int inputPort, int outputPort, int outputVlan, long meterId, long cookie)
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
default OFFlowAdd oneSwitchNoneFlowMod(int inputPort, int outputPort, long meterId, long cookie)
default OFFlowAdd transitFlowMod(int inputPort, int outputPort, int transitVlan, long cookie)
default OFFlowAdd oneSwitchPopFlowMod(int inputPort, int outputPort, int inputVlan, long meterId, long cookie)
default OFFlowAdd ingressReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int transitVlan, long meterId, long cookie)
default OFFlowAdd ingressPushFlowMod(int inputPort, int outputPort, int transitVlan, long meterId, long cookie)
OFFlowAdd egressNoneFlowMod(int inputPort, int outputPort, int transitVlanId, long cookie)
net
Definition: plan-b.py:46
OFFlowAdd egressReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int outputVlan, long cookie)