16 package org.openkilda.floodlight.kafka.producer;
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.expect;
20 import static org.easymock.EasyMock.getCurrentArguments;
21 import static org.easymock.EasyMock.replay;
22 import static org.easymock.EasyMock.verify;
29 import org.apache.kafka.clients.producer.Callback;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.apache.kafka.common.PartitionInfo;
33 import org.apache.kafka.common.TopicPartition;
34 import org.easymock.Capture;
35 import org.easymock.CaptureType;
36 import org.easymock.EasyMock;
37 import org.easymock.EasyMockSupport;
38 import org.easymock.IAnswer;
39 import org.junit.Assert;
40 import org.junit.Before;
41 import org.junit.Test;
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.List;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.Future;
50 private static final String TOPIC =
"A";
51 private static final TopicPartition[] partitions =
new TopicPartition[]{
52 new TopicPartition(TOPIC, 0),
53 new TopicPartition(TOPIC, 1)
56 private org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer;
60 @SuppressWarnings(
"unchecked")
61 public
void setUp() throws Exception {
62 kafkaProducer = strictMock(
org.apache.kafka.clients.producer.Producer.class);
63 subject =
new Producer(kafkaProducer);
65 ArrayList<PartitionInfo> partitionsForResult =
new ArrayList<>(2);
66 for (TopicPartition
p : partitions) {
67 partitionsForResult.add(
new PartitionInfo(
p.topic(),
p.partition(), null, null, null));
69 expect(kafkaProducer.partitionsFor(TOPIC)).andReturn(partitionsForResult).anyTimes();
74 RecordMetadata[] sendResults =
new RecordMetadata[]{
75 new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
76 new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
77 new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
78 new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
79 new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
80 new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
81 new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
82 new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
83 new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0)
86 Integer[] expectedPartitions =
new Integer[] {
87 null, null, null, 0, null, null, null, 1, null};
88 Assert.assertEquals(sendResults.length, expectedPartitions.length);
90 Capture<ProducerRecord<String, String>> sendArguments = Capture.newInstance(CaptureType.ALL);
91 setupSendCapture(sendArguments, sendResults);
93 replay(kafkaProducer);
117 verify(kafkaProducer);
119 List<ProducerRecord<String, String>> values = sendArguments.getValues();
120 for (
int i = 0;
i < values.size();
i++) {
121 ProducerRecord<String, String> record = values.get(
i);
122 Integer partition = expectedPartitions[
i];
123 Assert.assertEquals(String.format(
124 "%d: Invalid partition argument for message \"%s\" - %s",
i, record.value(), record.partition()),
125 partition, record.partition());
131 final ExecutionException error =
new ExecutionException(
"Emulate kafka send error",
new IOException());
133 Future promise = mock(Future.class);
134 expect(promise.get()).andThrow(error).anyTimes();
136 expect(kafkaProducer.send(anyObject(), anyObject(Callback.class)))
137 .andAnswer(
new IAnswer<Future<RecordMetadata>>() {
139 public Future<RecordMetadata> answer()
throws Throwable {
140 Callback callback = (Callback) getCurrentArguments()[1];
141 callback.onCompletion(null, error);
146 replay(kafkaProducer);
148 verify(kafkaProducer);
156 Future promise = mock(Future.class);
157 final ExecutionException error =
new ExecutionException(
"Emulate kafka send error",
new IOException());
158 expect(promise.get()).andThrow(error).anyTimes();
161 expect(kafkaProducer.send(anyObject(), anyObject(Callback.class))).andReturn(promise);
163 replay(kafkaProducer);
165 verify(kafkaProducer);
169 status.waitTillComplete();
171 }
catch (ExecutionException e) {
174 Assert.assertTrue(String.format(
175 "Exception was not thrown by %s object",
status.getClass().getCanonicalName()), isThrown);
181 System.currentTimeMillis(), getClass().getCanonicalName() +
"-test");
184 @SuppressWarnings(
"unchecked")
185 private
void setupSendCapture(Capture<ProducerRecord<String, String>> trap, RecordMetadata[] sendResults)
187 for (RecordMetadata metadata : sendResults) {
188 Future promise = mock(Future.class);
189 expect(promise.get()).andReturn(metadata);
192 expect(kafkaProducer.send(EasyMock.capture(trap), anyObject(Callback.class)))
void partitionSpreading()
synchronized void disableGuaranteedOrder(String topic)
SendStatus sendMessage(String topic, Message message)
void sendMessageAndTrack(String topic, Message message)
synchronized void enableGuaranteedOrder(String topic)