Open Kilda Java Documentation
KafkaOffsetRegistryTest.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka;
17 
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.mock;
20 
22 
23 import org.apache.kafka.clients.consumer.ConsumerRecord;
24 import org.apache.kafka.clients.consumer.KafkaConsumer;
25 import org.easymock.EasyMock;
26 import org.junit.Rule;
27 import org.junit.Test;
28 import org.junit.rules.ExpectedException;
29 
30 import java.util.concurrent.TimeUnit;
31 
33  @Rule
34  public ExpectedException expectedException = ExpectedException.none();
35 
36  @Test
37  public void shouldNotCommitRightOnAdd() {
38  // given
39  @SuppressWarnings("unchecked")
40  KafkaConsumer<String, String> consumer = mock(KafkaConsumer.class);
41  EasyMock.replay(consumer);
42  Consumer.KafkaOffsetRegistry registry = new KafkaOffsetRegistry(consumer, 10000L);
43 
44  // when
45  ConsumerRecord<String, String> record = new ConsumerRecord<>("test", 1, 1, "key", "value");
46  registry.addAndCommit(record);
47 
48  // then
49  EasyMock.verify(consumer);
50  }
51 
52  @Test
53  public void shouldCommitOnAddIfIntervalPassed() throws InterruptedException {
54  // given
55  @SuppressWarnings("unchecked")
56  KafkaConsumer<String, String> consumer = mock(KafkaConsumer.class);
57  consumer.commitSync(anyObject());
58  EasyMock.expectLastCall();
59  EasyMock.replay(consumer);
60  Consumer.KafkaOffsetRegistry registry = new KafkaOffsetRegistry(consumer, 1L);
61 
62  // when
63  TimeUnit.MILLISECONDS.sleep(10);
64 
65  ConsumerRecord<String, String> record = new ConsumerRecord<>("test", 1, 1, "key", "value");
66  registry.addAndCommit(record);
67 
68  // then
69  EasyMock.verify(consumer);
70  }
71 
72  @Test
74  // given
75  @SuppressWarnings("unchecked")
76  KafkaConsumer<String, String> consumer = mock(KafkaConsumer.class);
77  consumer.commitSync(anyObject());
78  EasyMock.expectLastCall();
79  EasyMock.replay(consumer);
80  Consumer.KafkaOffsetRegistry registry = new KafkaOffsetRegistry(consumer, 10000L);
81 
82  // when
83  ConsumerRecord<String, String> record = new ConsumerRecord<>("test", 1, 1, "key", "value");
84  registry.addAndCommit(record);
85  registry.commitOffsets();
86 
87  // then
88  EasyMock.verify(consumer);
89  }
90 
91  @Test
93  // given
94  @SuppressWarnings("unchecked")
95  KafkaConsumer<String, String> consumer = mock(KafkaConsumer.class);
96  Consumer.KafkaOffsetRegistry registry = new KafkaOffsetRegistry(consumer, 10000L);
97 
98  ConsumerRecord<String, String> record = new ConsumerRecord<>("test", 1, 10, "key", "value");
99  registry.addAndCommit(record);
100 
101  expectedException.expect(IllegalArgumentException.class);
102 
103  // when
104  ConsumerRecord<String, String> outdated = new ConsumerRecord<>("test", 1, 1, "key2", "value2");
105  registry.addAndCommit(outdated);
106 
107  // then an IllegalArgumentException is thrown
108  }
109 }