16 package org.openkilda.wfm.topology.event;
18 import static org.hamcrest.Matchers.allOf;
19 import static org.hamcrest.Matchers.contains;
20 import static org.hamcrest.Matchers.hasProperty;
21 import static org.hamcrest.Matchers.is;
22 import static org.junit.Assert.assertThat;
38 import com.fasterxml.jackson.core.JsonProcessingException;
39 import com.fasterxml.jackson.databind.ObjectMapper;
40 import org.apache.storm.state.InMemoryKeyValueState;
41 import org.apache.storm.state.KeyValueState;
42 import org.apache.storm.task.OutputCollector;
43 import org.apache.storm.task.TopologyContext;
44 import org.apache.storm.tuple.Tuple;
45 import org.apache.storm.tuple.TupleImpl;
46 import org.apache.storm.tuple.Values;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.kohsuke.args4j.CmdLineException;
50 import org.mockito.Mockito;
52 import java.util.Collections;
53 import java.util.List;
55 import java.util.stream.Collectors;
59 private static final Integer TASK_ID_BOLT = 0;
60 private static final String STREAM_ID_INPUT =
"input";
62 private ObjectMapper objectMapper =
new ObjectMapper();
64 private TopologyContext context;
76 context = Mockito.mock(TopologyContext.class);
78 Mockito.when(context.getComponentId(TASK_ID_BOLT))
84 OutputCollector output =
new OutputCollector(outputDelegate);
87 bolt.
initState(
new InMemoryKeyValueState<>());
92 Tuple tuple =
new TupleImpl(context,
new Values(
"{\"corrupted-json"), TASK_ID_BOLT,
96 Mockito.verify(outputDelegate).ack(tuple);
104 KeyValueState<String, Object> boltState =
new InMemoryKeyValueState<>();
105 Map<SwitchId, List<DiscoveryLink>> links =
106 Collections.singletonMap(testLink.getSource().getDatapath(), Collections.singletonList(testLink));
107 boltState.put(STATE_ID_DISCOVERY, links);
111 bolt.state = State.SYNC_IN_PROGRESS;
116 Tuple tuple =
new TupleImpl(context,
new Values(objectMapper.writeValueAsString(dumpBeginMessage)),
117 TASK_ID_BOLT, STREAM_ID_INPUT);
121 @SuppressWarnings(
"unchecked")
122 Map<String, List<DiscoveryLink>> stateAfterSync =
123 (Map<String, List<DiscoveryLink>>) boltState.get(STATE_ID_DISCOVERY);
125 List<DiscoveryLink> linksAfterSync = stateAfterSync.values()
127 .flatMap(List::stream)
128 .collect(Collectors.toList());
130 assertThat(linksAfterSync, contains(
131 allOf(hasProperty(
"source", hasProperty(
"datapath", is(
new SwitchId(
"ff:01")))),
132 hasProperty(
"destination", hasProperty(
"datapath", is(
new SwitchId(
"ff:02")))),
133 hasProperty(
"active", is(
true)))));
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
static Config stormConfig()
static LaunchEnvironment makeLaunchEnvironment()
static final Fields FORMAT
void invalidJsonForDiscoveryFilter()
static final String DISCO_SPOUT_ID
static final String DEFAULT_CORRELATION_ID
void initState(KeyValueState< String, Object > state)
void shouldNotResetDiscoveryStatusOnSync()