16 package org.openkilda.wfm;
18 import static org.mockito.Mockito.when;
42 import com.google.common.base.Charsets;
43 import com.google.common.io.Files;
44 import org.apache.storm.Constants;
45 import org.apache.storm.state.InMemoryKeyValueState;
46 import org.apache.storm.state.KeyValueState;
47 import org.apache.storm.task.OutputCollector;
48 import org.apache.storm.task.TopologyContext;
49 import org.apache.storm.tuple.Fields;
50 import org.apache.storm.tuple.Tuple;
51 import org.apache.storm.tuple.TupleImpl;
52 import org.apache.storm.utils.Utils;
53 import org.junit.AfterClass;
54 import org.junit.Assert;
55 import org.junit.BeforeClass;
56 import org.junit.Ignore;
57 import org.junit.Test;
58 import org.mockito.Mock;
61 import java.io.IOException;
62 import java.util.ArrayList;
63 import java.util.Arrays;
64 import java.util.Collections;
65 import java.util.List;
66 import java.util.Properties;
73 private long messagesExpected;
74 private long messagesReceived;
79 private TopologyContext topologyContext;
81 private OutputCollector outputCollector =
new OutputCollector(outputCollectorMock);
91 Properties overlay =
new Properties();
92 overlay.setProperty(
"filter.directory", server.tempDir.getAbsolutePath());
101 Utils.sleep(5 * 1000);
110 cluster.killTopology(
"utils-1");
112 Utils.sleep(4 * 1000);
120 System.out.println(
"==> Starting BasicSwitchEventTest");
132 String switchTopic =
config.getKafkaTopoDiscoTopic();
133 String portTopic =
config.getKafkaTopoDiscoTopic();
152 Utils.sleep(4 * 1000);
154 messagesExpected = 8;
156 Assert.assertEquals(messagesExpected, messagesReceived);
158 Utils.sleep(1 * 1000);
164 Utils.sleep(2 * 1000);
168 if (messagesReceived == 0) {
169 System.out.println(
"Message count failure; NO MESSAGES RECEIVED!");
170 for (String s : Files.readLines(discoFiler.
getFiler().
getFile(), Charsets.UTF_8)) {
171 System.out.println(
"\t\t > " + s);
176 Assert.assertTrue(messagesReceived > 0);
179 cluster.killTopology(
"utils-1");
180 Utils.sleep(4 * 1000);
183 private long safeLinesCount(File filename) {
184 List<String>
lines = null;
186 lines = Files.readLines(filename, Charsets.UTF_8);
187 }
catch (IOException e) {
200 System.out.println(
"==> Starting BasicLinkDiscoveryTest");
203 String topoInputTopic =
config.getKafkaTopoDiscoTopic();
205 KeyValueState<String, Object> state =
new InMemoryKeyValueState<>();
206 initMocks(topoInputTopic);
213 ArrayList<DiscoveryFilterEntity> skipNodes =
new ArrayList<>(1);
217 String json = MAPPER.writeValueAsString(islFilterSetup);
218 Tuple tuple =
new TupleImpl(topologyContext, Collections.singletonList(json), 4,
"message");
224 json = MAPPER.writeValueAsString(switch1Up);
225 tuple =
new TupleImpl(topologyContext, Collections.singletonList(json), 0, topoInputTopic);
231 json = MAPPER.writeValueAsString(switch2Up);
232 tuple =
new TupleImpl(topologyContext, Collections.singletonList(json), 0, topoInputTopic);
237 json = MAPPER.writeValueAsString(port1Up);
238 tuple =
new TupleImpl(topologyContext, Collections.singletonList(json), 1, topoInputTopic);
243 json = MAPPER.writeValueAsString(port2Up);
244 tuple =
new TupleImpl(topologyContext, Collections.singletonList(json), 1, topoInputTopic);
247 Tuple tickTuple =
new TupleImpl(topologyContext, Collections.emptyList(), 2, Constants.SYSTEM_TICK_STREAM_ID);
250 List<PathNode>
nodes = Arrays.asList(
254 String islDiscovered = MAPPER.writeValueAsString(
data);
255 tuple =
new TupleImpl(topologyContext, Collections.singletonList(islDiscovered), 3, topoInputTopic);
264 messagesExpected = 7;
266 Assert.assertEquals(messagesExpected, messagesReceived);
270 messagesExpected = 7;
272 Assert.assertEquals(messagesExpected, messagesReceived);
278 messagesExpected = 7;
280 Assert.assertEquals(messagesExpected, messagesReceived);
284 messagesExpected = 7;
286 Assert.assertEquals(messagesExpected, messagesReceived);
289 private void initMocks(String topoInputTopic) {
291 when(topologyContext.getComponentId(0)).thenReturn(topoInputTopic);
292 when(topologyContext.getComponentOutputFields(topoInputTopic,
293 topoInputTopic)).thenReturn(switchSchema);
297 when(topologyContext.getComponentId(1)).thenReturn(topoInputTopic);
298 when(topologyContext.getComponentOutputFields(topoInputTopic,
299 topoInputTopic)).thenReturn(portSchema);
301 Fields tickSchema =
new Fields();
302 when(topologyContext.getComponentId(2)).thenReturn(Constants.SYSTEM_COMPONENT_ID);
303 when(topologyContext.getComponentOutputFields(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID))
304 .thenReturn(tickSchema);
306 Fields islSchema =
new Fields(topoInputTopic);
307 when(topologyContext.getComponentId(3)).thenReturn(topoInputTopic);
308 when(topologyContext.getComponentOutputFields(topoInputTopic,
309 topoInputTopic)).thenReturn(islSchema);
312 when(topologyContext.getComponentOutputFields(
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static final ObjectMapper MAPPER
static String createSwitchDataMessage(String state, String switchId)
static Config stormConfig()
static LaunchEnvironment makeLaunchEnvironment()
static String createPortDataMessage(String state, String switchId, String portId)
static final String MESSAGE_FIELD
static TestKafkaProducer kProducer
StormTopology createTopology()
static final Fields fieldMessage
static void teardownOnce()
static final String SWITCH_UP
static final String FIELD_PORT_ID
final String getTopologyName()
static LocalCluster cluster
static final String DISCO_SPOUT_ID
static final String PORT_UP
void pushMessage(final String topic, final String data)
void initState(KeyValueState< String, Object > state)
static final String FIELD_STATE
static final String FIELD_SWITCH_ID
void basicLinkDiscoveryTest()
void execute(Tuple tuple)
int getMessagesCount(String streamId)
static void teardownOnce()
StormTopology createTopology()
static final String PORT_DOWN
void basicSwitchPortEventsTest()