Open Kilda Java Documentation
HeartBeatTest.java
Go to the documentation of this file.
1 package org.openkilda.atdd.floodlight;
2 
3 import static org.junit.Assert.assertTrue;
4 import static org.openkilda.messaging.Utils.MAPPER;
5 
10 
11 import cucumber.api.java.en.Given;
12 import cucumber.api.java.en.Then;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 
16 import java.io.IOException;
17 import java.util.Collections;
18 
19 public class HeartBeatTest {
20  private final KafkaUtils kafkaUtils;
21  private final KafkaConsumer<String, String> heartBeatConsumer;
22 
23  public HeartBeatTest() throws IOException {
24  kafkaUtils = new KafkaUtils();
25  heartBeatConsumer = kafkaUtils.createConsumer();
26  }
27 
28  @Given("^rewind heart beat kafka position to the end$")
29  public void heart_beat_consumer_installed() throws Throwable {
30  KafkaParameters options = new KafkaParameters();
31 
32  String topic = options.getDiscoTopic();
33  heartBeatConsumer.subscribe(Collections.singletonList(topic));
34 
35  // bind to "start" offset inside topic
36  heartBeatConsumer.poll(100);
37  heartBeatConsumer.seekToEnd(Collections.emptySet());
38  heartBeatConsumer.poll(100);
39  }
40 
41  @Then("^got at least (\\d+) heart beat event$")
42  public void got_heart_beat_event(int expect) throws Throwable {
43  int beatsCount = 0;
44 
45  for (ConsumerRecord<String, String> record : heartBeatConsumer.poll(500)) {
46  Message raw = MAPPER.readValue(record.value(), Message.class);
47 
48  if (raw instanceof HeartBeat) {
49  beatsCount += 1;
50  }
51  }
52 
53  System.out.println(String.format("Got %d heart beats.", beatsCount));
54  assertTrue(
55  String.format("Actual heart beats count is %d, expect more than %d", beatsCount, expect),
56  expect <= beatsCount);
57  }
58 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
KafkaConsumer< String, String > createConsumer()
Definition: KafkaUtils.java:88