16 package org.openkilda.floodlight.kafka;
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.mock;
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.KafkaConsumer;
25 import org.easymock.EasyMock;
26 import org.junit.Rule;
27 import org.junit.Test;
28 import org.junit.rules.ExpectedException;
30 import java.util.concurrent.TimeUnit;
39 @SuppressWarnings(
"unchecked")
45 ConsumerRecord<String, String> record =
new ConsumerRecord<>(
"test", 1, 1,
"key",
"value");
46 registry.addAndCommit(record);
55 @SuppressWarnings(
"unchecked")
58 EasyMock.expectLastCall();
63 TimeUnit.MILLISECONDS.sleep(10);
65 ConsumerRecord<String, String> record =
new ConsumerRecord<>(
"test", 1, 1,
"key",
"value");
66 registry.addAndCommit(record);
75 @SuppressWarnings(
"unchecked")
78 EasyMock.expectLastCall();
83 ConsumerRecord<String, String> record =
new ConsumerRecord<>(
"test", 1, 1,
"key",
"value");
84 registry.addAndCommit(record);
85 registry.commitOffsets();
94 @SuppressWarnings(
"unchecked")
98 ConsumerRecord<String, String> record =
new ConsumerRecord<>(
"test", 1, 10,
"key",
"value");
99 registry.addAndCommit(record);
104 ConsumerRecord<String, String> outdated =
new ConsumerRecord<>(
"test", 1, 1,
"key2",
"value2");
105 registry.addAndCommit(outdated);
ExpectedException expectedException
void shouldCommitOnAddIfIntervalPassed()
void shouldNotCommitRightOnAdd()
void failTryingToAddRecordWithOutdatedOffset()
void shouldCommitOffsetsIfRequested()