Open Kilda Java Documentation
SimpleKafkaTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm;
17 
18 import static org.hamcrest.core.Is.is;
19 import static org.junit.Assert.assertThat;
20 
25 
26 import kafka.consumer.Consumer;
27 import kafka.consumer.ConsumerConfig;
28 import kafka.consumer.ConsumerIterator;
29 import kafka.consumer.KafkaStream;
30 import kafka.javaapi.consumer.ConsumerConnector;
31 import kafka.message.MessageAndMetadata;
32 import kafka.serializer.StringDecoder;
33 import org.apache.kafka.clients.producer.KafkaProducer;
34 import org.apache.kafka.clients.producer.Producer;
35 import org.apache.kafka.clients.producer.ProducerRecord;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.kohsuke.args4j.CmdLineException;
41 
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Properties;
46 
51 public class SimpleKafkaTest {
52  public static final String topic = "simple-kafka"; // + System.currentTimeMillis();
53  private static final String[] NO_ARGS = {};
54 
55  private static ZookeeperConfig zooKeeperConfig;
56  private static KafkaConfig kafkaConfig;
57 
58  private TestUtils.KafkaTestFixture server;
59  private Producer<String, String> producer;
60  private ConsumerConnector consumerConnector;
61 
62  @BeforeClass
63  public static void allocateConfig() throws ConfigurationException, CmdLineException {
64  ConfigurationProvider configurationProvider = new LaunchEnvironment(NO_ARGS).getConfigurationProvider();
65  zooKeeperConfig = configurationProvider.getConfiguration(ZookeeperConfig.class);
66  kafkaConfig = configurationProvider.getConfiguration(KafkaConfig.class);
67  }
68 
69  @Before
70  public void setup() throws Exception {
71  server = new TestUtils.KafkaTestFixture(zooKeeperConfig);
72  server.start();
73  }
74 
75  @After
76  public void teardown() throws Exception {
77  producer.close();
78  consumerConnector.shutdown();
79  server.stop();
80  }
81 
82  @Test
83  public void shouldWriteThenRead() throws Exception {
84 
85  //Create a consumer
86  ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic);
87 
88  //Create a producer
89  producer = new KafkaProducer<>(producerProps());
90 
91  //send a message
92  producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get();
93 
94  //read it back
95  MessageAndMetadata<String, String> messageAndMetadata = it.next();
96  String value = messageAndMetadata.message();
97  assertThat(value, is("message"));
98  }
99 
100  private ConsumerIterator<String, String> buildConsumer(String topic) {
101  Properties props = consumerProperties();
102 
103  Map<String, Integer> topicCountMap = new HashMap<>();
104  topicCountMap.put(topic, 1);
105  ConsumerConfig consumerConfig = new ConsumerConfig(props);
106  consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
107  Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(
108  topicCountMap, new StringDecoder(null), new StringDecoder(null));
109  KafkaStream<String, String> stream = consumers.get(topic).get(0);
110  return stream.iterator();
111  }
112 
113  private Properties consumerProperties() {
114  Properties props = new Properties();
115  props.put("zookeeper.connect", zooKeeperConfig.getHosts());
116  props.put("group.id", "group1");
117  props.put("auto.offset.reset", "smallest");
118  return props;
119  }
120 
121  private Properties producerProps() {
122  Properties props = new Properties();
123  props.put("bootstrap.servers", kafkaConfig.getHosts());
124  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
125  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
126  props.put("request.required.acks", "1");
127  return props;
128  }
129 }
value
Definition: nodes.py:62
ConfigurationProvider getConfigurationProvider(String... prefixes)