Open Kilda Java Documentation
ProducerTest.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.floodlight.kafka.producer;
17 
18 import static org.easymock.EasyMock.anyObject;
19 import static org.easymock.EasyMock.expect;
20 import static org.easymock.EasyMock.getCurrentArguments;
21 import static org.easymock.EasyMock.replay;
22 import static org.easymock.EasyMock.verify;
23 
28 
29 import org.apache.kafka.clients.producer.Callback;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.apache.kafka.common.PartitionInfo;
33 import org.apache.kafka.common.TopicPartition;
34 import org.easymock.Capture;
35 import org.easymock.CaptureType;
36 import org.easymock.EasyMock;
37 import org.easymock.EasyMockSupport;
38 import org.easymock.IAnswer;
39 import org.junit.Assert;
40 import org.junit.Before;
41 import org.junit.Test;
42 
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.List;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.Future;
48 
49 public class ProducerTest extends EasyMockSupport {
50  private static final String TOPIC = "A";
51  private static final TopicPartition[] partitions = new TopicPartition[]{
52  new TopicPartition(TOPIC, 0),
53  new TopicPartition(TOPIC, 1)
54  };
55 
56  private org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer;
57  private Producer subject;
58 
59  @Before
60  @SuppressWarnings("unchecked")
61  public void setUp() throws Exception {
62  kafkaProducer = strictMock(org.apache.kafka.clients.producer.Producer.class);
63  subject = new Producer(kafkaProducer);
64 
65  ArrayList<PartitionInfo> partitionsForResult = new ArrayList<>(2);
66  for (TopicPartition p : partitions) {
67  partitionsForResult.add(new PartitionInfo(p.topic(), p.partition(), null, null, null));
68  }
69  expect(kafkaProducer.partitionsFor(TOPIC)).andReturn(partitionsForResult).anyTimes();
70  }
71 
72  @Test
73  public void partitionSpreading() throws Exception {
74  RecordMetadata[] sendResults = new RecordMetadata[]{
75  new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
76  new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
77  new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
78  new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
79  new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
80  new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
81  new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
82  new RecordMetadata(partitions[1], -1L, 0L, System.currentTimeMillis(), 0, 0, 0),
83  new RecordMetadata(partitions[0], -1L, 0L, System.currentTimeMillis(), 0, 0, 0)
84  };
85 
86  Integer[] expectedPartitions = new Integer[] {
87  null, null, null, 0, null, null, null, 1, null};
88  Assert.assertEquals(sendResults.length, expectedPartitions.length);
89 
90  Capture<ProducerRecord<String, String>> sendArguments = Capture.newInstance(CaptureType.ALL);
91  setupSendCapture(sendArguments, sendResults);
92 
93  replay(kafkaProducer);
94 
95  InfoMessage payload = makePayload();
96 
97  subject.sendMessageAndTrack(TOPIC, payload);
98  subject.sendMessageAndTrack(TOPIC, payload);
99  subject.enableGuaranteedOrder(TOPIC);
100  try {
101  subject.sendMessageAndTrack(TOPIC, payload);
102  subject.sendMessageAndTrack(TOPIC, payload);
103  } finally {
104  subject.disableGuaranteedOrder(TOPIC, 0L);
105  }
106  subject.sendMessageAndTrack(TOPIC, payload);
107  subject.sendMessageAndTrack(TOPIC, payload);
108  subject.enableGuaranteedOrder(TOPIC);
109  try {
110  subject.sendMessageAndTrack(TOPIC, payload);
111  subject.sendMessageAndTrack(TOPIC, payload);
112  } finally {
113  subject.disableGuaranteedOrder(TOPIC, 0L);
114  }
115  subject.sendMessageAndTrack(TOPIC, payload);
116 
117  verify(kafkaProducer);
118 
119  List<ProducerRecord<String, String>> values = sendArguments.getValues();
120  for (int i = 0; i < values.size(); i++) {
121  ProducerRecord<String, String> record = values.get(i);
122  Integer partition = expectedPartitions[i];
123  Assert.assertEquals(String.format(
124  "%d: Invalid partition argument for message \"%s\" - %s", i, record.value(), record.partition()),
125  partition, record.partition());
126  }
127  }
128 
129  @Test
130  public void errorReporting() throws Exception {
131  final ExecutionException error = new ExecutionException("Emulate kafka send error", new IOException());
132 
133  Future promise = mock(Future.class);
134  expect(promise.get()).andThrow(error).anyTimes();
135  replay(promise);
136  expect(kafkaProducer.send(anyObject(), anyObject(Callback.class)))
137  .andAnswer(new IAnswer<Future<RecordMetadata>>() {
138  @Override
139  public Future<RecordMetadata> answer() throws Throwable {
140  Callback callback = (Callback) getCurrentArguments()[1];
141  callback.onCompletion(null, error);
142  return promise;
143  }
144  });
145 
146  replay(kafkaProducer);
147  subject.sendMessageAndTrack(TOPIC, makePayload());
148  verify(kafkaProducer);
149 
150  // This test does not do any assertions, because the only action is log message with error
151  // you can locate this message in test's output.
152  }
153 
154  @Test
155  public void errorDetection() throws Exception {
156  Future promise = mock(Future.class);
157  final ExecutionException error = new ExecutionException("Emulate kafka send error", new IOException());
158  expect(promise.get()).andThrow(error).anyTimes();
159  replay(promise);
160 
161  expect(kafkaProducer.send(anyObject(), anyObject(Callback.class))).andReturn(promise);
162 
163  replay(kafkaProducer);
164  SendStatus status = subject.sendMessage(TOPIC, makePayload());
165  verify(kafkaProducer);
166 
167  Boolean isThrown;
168  try {
169  status.waitTillComplete();
170  isThrown = false;
171  } catch (ExecutionException e) {
172  isThrown = true;
173  }
174  Assert.assertTrue(String.format(
175  "Exception was not thrown by %s object", status.getClass().getCanonicalName()), isThrown);
176  }
177 
178  private InfoMessage makePayload() {
179  return new InfoMessage(
180  new PortInfoData(new SwitchId("ff:fe:00:00:00:00:00:01"), 8, PortChangeType.UP),
181  System.currentTimeMillis(), getClass().getCanonicalName() + "-test");
182  }
183 
184  @SuppressWarnings("unchecked")
185  private void setupSendCapture(Capture<ProducerRecord<String, String>> trap, RecordMetadata[] sendResults)
186  throws Exception {
187  for (RecordMetadata metadata : sendResults) {
188  Future promise = mock(Future.class);
189  expect(promise.get()).andReturn(metadata);
190  replay(promise);
191 
192  expect(kafkaProducer.send(EasyMock.capture(trap), anyObject(Callback.class)))
193  .andReturn(promise);
194  }
195  }
196 }
def status()
Definition: rest.py:593
synchronized void disableGuaranteedOrder(String topic)
Definition: Producer.java:57
SendStatus sendMessage(String topic, Message message)
Definition: Producer.java:77
void sendMessageAndTrack(String topic, Message message)
Definition: Producer.java:73
synchronized void enableGuaranteedOrder(String topic)
Definition: Producer.java:48