Open Kilda Java Documentation
OfeLinkBoltTest.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.event;
17 
18 import static org.hamcrest.Matchers.allOf;
19 import static org.hamcrest.Matchers.contains;
20 import static org.hamcrest.Matchers.hasProperty;
21 import static org.hamcrest.Matchers.is;
22 import static org.junit.Assert.assertThat;
24 import static org.openkilda.wfm.topology.event.OfeLinkBolt.STATE_ID_DISCOVERY;
25 
37 
38 import com.fasterxml.jackson.core.JsonProcessingException;
39 import com.fasterxml.jackson.databind.ObjectMapper;
40 import org.apache.storm.state.InMemoryKeyValueState;
41 import org.apache.storm.state.KeyValueState;
42 import org.apache.storm.task.OutputCollector;
43 import org.apache.storm.task.TopologyContext;
44 import org.apache.storm.tuple.Tuple;
45 import org.apache.storm.tuple.TupleImpl;
46 import org.apache.storm.tuple.Values;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.kohsuke.args4j.CmdLineException;
50 import org.mockito.Mockito;
51 
52 import java.util.Collections;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.stream.Collectors;
56 
57 public class OfeLinkBoltTest extends AbstractStormTest {
58 
59  private static final Integer TASK_ID_BOLT = 0;
60  private static final String STREAM_ID_INPUT = "input";
61 
62  private ObjectMapper objectMapper = new ObjectMapper();
63 
64  private TopologyContext context;
65  private OfeLinkBolt bolt;
66  private OutputCollectorMock outputDelegate;
67  private OFEventWfmTopologyConfig config;
68 
69  @Before
70  public void before() throws CmdLineException, ConfigurationException {
73  config = manager.getConfig();
74  bolt = new OfeLinkBolt(config);
75 
76  context = Mockito.mock(TopologyContext.class);
77 
78  Mockito.when(context.getComponentId(TASK_ID_BOLT))
80  Mockito.when(context.getComponentOutputFields(OfEventWfmTopology.DISCO_SPOUT_ID, STREAM_ID_INPUT))
81  .thenReturn(KafkaMessage.FORMAT);
82 
83  outputDelegate = Mockito.spy(new OutputCollectorMock());
84  OutputCollector output = new OutputCollector(outputDelegate);
85 
86  bolt.prepare(stormConfig(), context, output);
87  bolt.initState(new InMemoryKeyValueState<>());
88  }
89 
90  @Test
91  public void invalidJsonForDiscoveryFilter() throws JsonProcessingException {
92  Tuple tuple = new TupleImpl(context, new Values("{\"corrupted-json"), TASK_ID_BOLT,
93  STREAM_ID_INPUT);
94  bolt.doWork(tuple);
95 
96  Mockito.verify(outputDelegate).ack(tuple);
97  }
98 
99  @Test
100  public void shouldNotResetDiscoveryStatusOnSync() throws JsonProcessingException {
101  // given
102  DiscoveryLink testLink = new DiscoveryLink(new SwitchId("ff:01"), 2, new SwitchId("ff:02"), 2, 0, -1, true);
103 
104  KeyValueState<String, Object> boltState = new InMemoryKeyValueState<>();
105  Map<SwitchId, List<DiscoveryLink>> links =
106  Collections.singletonMap(testLink.getSource().getDatapath(), Collections.singletonList(testLink));
107  boltState.put(STATE_ID_DISCOVERY, links);
108  bolt.initState(boltState);
109 
110  // set the state to WAIT_SYNC
111  bolt.state = State.SYNC_IN_PROGRESS;
112 
113  // when
114  PortInfoData dumpPortData = new PortInfoData(new SwitchId("ff:01"), 2, PortChangeType.UP);
115  InfoMessage dumpBeginMessage = new InfoMessage(dumpPortData, 0, DEFAULT_CORRELATION_ID, Destination.WFM);
116  Tuple tuple = new TupleImpl(context, new Values(objectMapper.writeValueAsString(dumpBeginMessage)),
117  TASK_ID_BOLT, STREAM_ID_INPUT);
118  bolt.doWork(tuple);
119 
120  // then
121  @SuppressWarnings("unchecked")
122  Map<String, List<DiscoveryLink>> stateAfterSync =
123  (Map<String, List<DiscoveryLink>>) boltState.get(STATE_ID_DISCOVERY);
124 
125  List<DiscoveryLink> linksAfterSync = stateAfterSync.values()
126  .stream()
127  .flatMap(List::stream)
128  .collect(Collectors.toList());
129 
130  assertThat(linksAfterSync, contains(
131  allOf(hasProperty("source", hasProperty("datapath", is(new SwitchId("ff:01")))),
132  hasProperty("destination", hasProperty("datapath", is(new SwitchId("ff:02")))),
133  hasProperty("active", is(true)))));
134  }
135 }
static LaunchEnvironment makeLaunchEnvironment()
static final String DEFAULT_CORRELATION_ID
Definition: Utils.java:69