16 package org.openkilda.floodlight.kafka;
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.eq;
20 import static org.easymock.EasyMock.expect;
21 import static org.easymock.EasyMock.expectLastCall;
22 import static org.easymock.EasyMock.verify;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableMap;
39 import net.floodlightcontroller.core.IOFSwitch;
40 import net.floodlightcontroller.core.internal.OFSwitch;
41 import net.floodlightcontroller.core.module.FloodlightModuleContext;
42 import org.easymock.EasyMockSupport;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.projectfloodlight.openflow.protocol.OFPortDesc;
46 import org.projectfloodlight.openflow.types.DatapathId;
47 import org.projectfloodlight.openflow.types.OFPort;
52 private static final FloodlightModuleContext context =
new FloodlightModuleContext();
57 private RecordHandlerMock handler;
60 public void setUp() throws Exception {
68 context.addConfigParam(collectorModule,
"topic",
"");
69 context.addConfigParam(collectorModule,
"bootstrap-servers",
"");
70 collectorModule.
init(context);
77 handler =
new RecordHandlerMock(consumerContext);
91 OFSwitch iofSwitch1 = mock(OFSwitch.class);
92 OFSwitch iofSwitch2 = mock(OFSwitch.class);
94 Map<DatapathId, IOFSwitch> switches = ImmutableMap.of(
95 DatapathId.of(1), iofSwitch1,
96 DatapathId.of(2), iofSwitch2
99 for (DatapathId swId : switches.keySet()) {
100 IOFSwitch sw = switches.get(swId);
101 expect(sw.isActive()).andReturn(
true).anyTimes();
102 expect(sw.getId()).andReturn(swId).anyTimes();
108 OFPortDesc ofPortDesc1 = mock(OFPortDesc.class);
109 OFPortDesc ofPortDesc2 = mock(OFPortDesc.class);
110 OFPortDesc ofPortDesc3 = mock(OFPortDesc.class);
111 OFPortDesc ofPortDesc4 = mock(OFPortDesc.class);
112 OFPortDesc ofPortDesc5 = mock(OFPortDesc.class);
114 expect(ofPortDesc1.getPortNo()).andReturn(OFPort.ofInt(1)).times(2);
115 expect(ofPortDesc2.getPortNo()).andReturn(OFPort.ofInt(2)).times(2);
116 expect(ofPortDesc3.getPortNo()).andReturn(OFPort.ofInt(3)).times(2);
117 expect(ofPortDesc4.getPortNo()).andReturn(OFPort.ofInt(4)).times(2);
119 expect(ofPortDesc5.getPortNo()).andReturn(OFPort.ofInt(-2)).times(2);
122 expect(iofSwitch1.getEnabledPorts()).andReturn(ImmutableList.of(
126 expect(iofSwitch2.getEnabledPorts()).andReturn(ImmutableList.of(
134 handler.overrideNetworkDumpSwitchData(
137 handler.overrideNetworkDumpSwitchData(
143 expectLastCall().times(8);
146 expect(producer.getProducer()).andReturn(kafkaProducer).times(2);
161 handler.handleMessage(
command);
void init(FloodlightModuleContext context)
static final String SYSTEM_CORRELATION_ID
Map< DatapathId, IOFSwitch > getAllSwitchMap()
synchronized void disableGuaranteedOrder(String topic)
def command(payload, fields)
static ConfigurationProvider of(FloodlightModuleContext moduleContext, IFloodlightModule module)
synchronized void enableGuaranteedOrder(String topic)
String getKafkaTopoDiscoTopic()