16 package org.openkilda.floodlight.kafka;
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.capture;
20 import static org.easymock.EasyMock.createMock;
21 import static org.easymock.EasyMock.expect;
22 import static org.easymock.EasyMock.newCapture;
23 import static org.easymock.EasyMock.replay;
24 import static org.junit.Assert.assertEquals;
45 import com.google.common.base.Charsets;
46 import com.google.common.io.Resources;
47 import com.google.common.util.concurrent.Futures;
48 import com.google.common.util.concurrent.MoreExecutors;
49 import net.floodlightcontroller.core.IOFSwitch;
50 import net.floodlightcontroller.core.SwitchDescription;
51 import net.floodlightcontroller.core.internal.IOFSwitchService;
52 import net.floodlightcontroller.core.module.FloodlightModuleContext;
53 import net.floodlightcontroller.core.module.FloodlightModuleException;
54 import net.floodlightcontroller.restserver.IRestApiService;
55 import org.apache.kafka.clients.consumer.ConsumerRecord;
56 import org.easymock.Capture;
57 import org.easymock.CaptureType;
58 import org.junit.Before;
59 import org.junit.Test;
60 import org.projectfloodlight.openflow.protocol.OFBarrierReply;
61 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
62 import org.projectfloodlight.openflow.protocol.OFFlowAdd;
63 import org.projectfloodlight.openflow.protocol.OFMeterMod;
64 import org.projectfloodlight.openflow.types.DatapathId;
66 import java.io.IOException;
67 import java.util.concurrent.ExecutorService;
68 import java.util.concurrent.TimeUnit;
71 private static final FloodlightModuleContext context =
new FloodlightModuleContext();
72 private final ExecutorService parseRecordExecutor = MoreExecutors.sameThreadExecutor();
75 private IOFSwitchService ofSwitchService;
92 public void setUp() throws FloodlightModuleException {
96 ofSwitchService = createMock(IOFSwitchService.class);
99 context.addService(IOFSwitchService.class, ofSwitchService);
100 context.addService(IRestApiService.class, null);
106 switchManager.
init(context);
109 context.addConfigParam(collector,
"topic",
"");
110 context.addConfigParam(collector,
"bootstrap-servers",
"");
111 collector.
init(context);
123 String
value = Resources.toString(getClass().getResource(
"/install_one_switch_none_flow.json"), Charsets.UTF_8);
127 data.getMeterId(), 123L);
128 runTest(
value, flowCommand, meterCommand, null, null);
133 String
value = Resources.toString(getClass().getResource(
"/install_one_switch_replace_flow.json"), Charsets.UTF_8);
137 data.getInputVlanId(),
data.getOutputVlanId(),
data.getMeterId(), 123L);
138 runTest(
value, flowCommand, meterCommand, null, null);
143 String
value = Resources.toString(getClass().getResource(
"/install_one_switch_push_flow.json"), Charsets.UTF_8);
147 data.getOutputVlanId(),
data.getMeterId(), 123L);
148 runTest(
value, flowCommand, meterCommand, null, null);
153 String
value = Resources.toString(getClass().getResource(
"/install_one_switch_pop_flow.json"), Charsets.UTF_8);
157 data.getInputVlanId(),
data.getMeterId(), 123L);
158 runTest(
value, flowCommand, meterCommand, null, null);
163 String
value = Resources.toString(getClass().getResource(
"/install_ingress_none_flow.json"), Charsets.UTF_8);
167 data.getTransitVlanId(),
data.getMeterId(), 123L);
168 runTest(
value, flowCommand, meterCommand, null, null);
173 String
value = Resources.toString(getClass().getResource(
"/install_ingress_replace_flow.json"), Charsets.UTF_8);
177 data.getInputVlanId(),
data.getTransitVlanId(),
data.getMeterId(), 123L);
178 runTest(
value, flowCommand, meterCommand, null, null);
183 String
value = Resources.toString(getClass().getResource(
"/install_ingress_push_flow.json"), Charsets.UTF_8);
187 data.getTransitVlanId(),
data.getMeterId(), 123L);
188 runTest(
value, flowCommand, meterCommand, null, null);
193 String
value = Resources.toString(getClass().getResource(
"/install_ingress_pop_flow.json"), Charsets.UTF_8);
197 data.getInputVlanId(),
data.getTransitVlanId(),
data.getMeterId(), 123L);
198 runTest(
value, flowCommand, meterCommand, null, null);
203 String
value = Resources.toString(getClass().getResource(
"/install_egress_none_flow.json"), Charsets.UTF_8);
206 data.getTransitVlanId(), 123L);
207 runTest(
value, flowCommand, null, null, null);
212 String
value = Resources.toString(getClass().getResource(
"/install_egress_replace_flow.json"), Charsets.UTF_8);
215 data.getTransitVlanId(),
data.getOutputVlanId(), 123L);
216 runTest(
value, flowCommand, null, null, null);
221 String
value = Resources.toString(getClass().getResource(
"/install_egress_push_flow.json"), Charsets.UTF_8);
224 data.getTransitVlanId(),
data.getOutputVlanId(), 123L);
225 runTest(
value, flowCommand, null, null, null);
230 String
value = Resources.toString(getClass().getResource(
"/install_egress_pop_flow.json"), Charsets.UTF_8);
233 data.getTransitVlanId(), 123L);
234 runTest(
value, flowCommand, null, null, null);
239 String
value = Resources.toString(getClass().getResource(
"/install_transit_flow.json"), Charsets.UTF_8);
242 data.getTransitVlanId(), 123L);
243 runTest(
value, flowCommand, null, null, null);
253 private void runTest(
final String
value,
final OFFlowAdd flowCommand,
final OFMeterMod meterCommand,
254 final OFFlowAdd reverseFlowCommand,
final OFMeterMod reverseMeterCommand)
255 throws InterruptedException {
257 ConsumerRecord<String, String> record =
new ConsumerRecord<>(
"", 0, 0,
"",
value);
264 RecordHandler parseRecord =
new RecordHandler(kafkaContext, record,
new MeterPool());
266 Capture<OFFlowAdd> flowAddCapture = flowCommand == null ? null : newCapture(CaptureType.ALL);
267 Capture<OFMeterMod> meterAddCapture = meterCommand == null ? null : newCapture(CaptureType.ALL);
268 prepareMocks(flowAddCapture, meterAddCapture, reverseFlowCommand != null, reverseMeterCommand != null);
271 parseRecordExecutor.execute(parseRecord);
272 parseRecordExecutor.shutdown();
273 parseRecordExecutor.awaitTermination(10, TimeUnit.SECONDS);
276 if (meterCommand != null) {
277 assertEquals(meterCommand, meterAddCapture.getValues().get(0));
278 if (reverseMeterCommand != null) {
279 assertEquals(reverseMeterCommand, meterAddCapture.getValues().get(1));
282 if (flowCommand != null) {
283 assertEquals(flowCommand, flowAddCapture.getValues().get(0));
284 if (reverseFlowCommand != null) {
285 assertEquals(reverseFlowCommand, flowAddCapture.getValues().get(1));
296 private void prepareMocks(Capture<OFFlowAdd> flowAddCapture, Capture<OFMeterMod> meterAddCapture,
297 boolean needCheckReverseFlow,
boolean needCheckReverseMeter) {
298 IOFSwitch iofSwitch = createMock(IOFSwitch.class);
300 expect(ofSwitchService.getSwitch(anyObject(DatapathId.class))).andStubReturn(iofSwitch);
301 expect(iofSwitch.getOFFactory()).andStubReturn(ofFactory);
304 if (meterAddCapture != null) {
305 expect(iofSwitch.write(capture(meterAddCapture))).andReturn(
true);
306 expect(iofSwitch.writeRequest(anyObject(OFBarrierRequest.class)))
307 .andReturn(Futures.immediateFuture(createMock(OFBarrierReply.class)));
308 if (flowAddCapture != null) {
309 expect(iofSwitch.write(capture(flowAddCapture))).andReturn(
true);
311 if (needCheckReverseMeter) {
312 expect(iofSwitch.write(capture(meterAddCapture))).andReturn(
true);
314 if (needCheckReverseFlow) {
315 expect(iofSwitch.write(capture(flowAddCapture))).andReturn(
true);
317 }
else if (flowAddCapture != null) {
318 expect(iofSwitch.write(capture(flowAddCapture))).andReturn(
true).times(needCheckReverseFlow ? 2 : 1);
321 replay(ofSwitchService);
void init(FloodlightModuleContext context)
default OFFlowAdd ingressPopFlowMod(int inputPort, int outputPort, int inputVlan, int transitVlan, long meterId, long cookie)
OFFlowAdd egressPushFlowMod(int inputPort, int outputPort, int transitVlanId, int outputVlan, long cookie)
static final ObjectMapper MAPPER
default OFFlowAdd oneSwitchReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int outputVlan, long meterId, long cookie)
default OFMeterMod installMeter(long bandwidth, long burstSize, long meterId)
void init(FloodlightModuleContext context)
void installIngressReplaceFlow()
OFFlowAdd egressPopFlowMod(int inputPort, int outputPort, int transitVlanId, long cookie)
void installOneSwitchPopFlow()
void installIngressNoneFlow()
default OFFlowAdd ingressNoneFlowMod(int inputPort, int outputPort, int transitVlan, long meterId, long cookie)
void installOneSwitchPushFlow()
void installEgressNoneFlow()
default OFFlowAdd oneSwitchPushFlowMod(int inputPort, int outputPort, int outputVlan, long meterId, long cookie)
void installOneSwitchReplaceFlow()
void installTransitFlow()
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
default OFFlowAdd oneSwitchNoneFlowMod(int inputPort, int outputPort, long meterId, long cookie)
default OFFlowAdd transitFlowMod(int inputPort, int outputPort, int transitVlan, long cookie)
default OFFlowAdd oneSwitchPopFlowMod(int inputPort, int outputPort, int inputVlan, long meterId, long cookie)
default OFFlowAdd ingressReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int transitVlan, long meterId, long cookie)
default OFFlowAdd ingressPushFlowMod(int inputPort, int outputPort, int transitVlan, long meterId, long cookie)
void installIngressPopFlow()
void installEgressReplaceFlow()
void installEgressPopFlow()
OFFlowAdd egressNoneFlowMod(int inputPort, int outputPort, int transitVlanId, long cookie)
void installEgressPushFlow()
void installIngressPushFlow()
SwitchDescription switchDescription
void installOneSwitchNoneFlow()
OFFlowAdd egressReplaceFlowMod(int inputPort, int outputPort, int inputVlan, int outputVlan, long cookie)