Open Kilda Java Documentation
Original.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm;
17 
22 import static org.hamcrest.core.Is.is;
23 import static org.junit.Assert.assertThat;
24 
25 import kafka.consumer.Consumer;
26 import kafka.consumer.ConsumerConfig;
27 import kafka.consumer.ConsumerIterator;
28 import kafka.consumer.KafkaStream;
29 import kafka.javaapi.consumer.ConsumerConnector;
30 import kafka.message.MessageAndMetadata;
31 import kafka.serializer.StringDecoder;
32 import kafka.server.KafkaConfig;
33 import kafka.server.KafkaServerStartable;
34 import org.apache.curator.test.TestingServer;
35 import org.apache.kafka.clients.producer.KafkaProducer;
36 import org.apache.kafka.clients.producer.Producer;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 
42 import java.io.IOException;
43 import java.util.HashMap;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Properties;
47 
48 
49 public class Original {
50  public static final String topic = "topic1-" + System.currentTimeMillis();
51 
52  private KafkaTestFixture server;
53  private Producer<String, String> producer;
54  private ConsumerConnector consumerConnector;
55 
56  @Before
57  public void setup() throws Exception {
58  server = new KafkaTestFixture();
59  server.start(serverProperties());
60  }
61 
62  @After
63  public void teardown() throws Exception {
64  producer.close();
65  consumerConnector.shutdown();
66  server.stop();
67  }
68 
69  @Test
70  public void shouldWriteThenRead() throws Exception {
71 
72  //Create a consumer
73  ConsumerIterator<String, String> it = buildConsumer(Original.topic);
74 
75  //Create a producer
76  producer = new KafkaProducer<>(producerProps());
77 
78  //send a message
79  producer.send(new ProducerRecord<>(Original.topic, "message")).get();
80 
81  //read it back
82  MessageAndMetadata<String, String> messageAndMetadata = it.next();
83  String value = messageAndMetadata.message();
84  assertThat(value, is("message"));
85  }
86 
87  private ConsumerIterator<String, String> buildConsumer(String topic) {
88  Properties props = consumerProperties();
89 
90  Map<String, Integer> topicCountMap = new HashMap<>();
91  topicCountMap.put(topic, 1);
92  ConsumerConfig consumerConfig = new ConsumerConfig(props);
93  consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
94  Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
95  KafkaStream<String, String> stream = consumers.get(topic).get(0);
96  return stream.iterator();
97  }
98 
99  private Properties consumerProperties() {
100  Properties props = new Properties();
101  props.put("zookeeper.connect", serverProperties().get("zookeeper.connect"));
102  props.put("group.id", "group1");
103  props.put("auto.offset.reset", "smallest");
104  return props;
105  }
106 
107  private Properties producerProps() {
108  Properties props = new Properties();
109  props.put("bootstrap.servers", "localhost:9092");
110  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
111  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
112  props.put("request.required.acks", "1");
113  return props;
114  }
115 
116  private Properties serverProperties() {
117  Properties props = new Properties();
118  props.put("zookeeper.connect", "localhost:2181");
119  props.put("broker.id", "1");
120  return props;
121  }
122 
123  private static class KafkaTestFixture {
124  private TestingServer zk;
125  private KafkaServerStartable kafka;
126 
127  public void start(Properties properties) throws Exception {
128  Integer port = getZkPort(properties);
129  zk = new TestingServer(port);
130  zk.start();
131 
132  KafkaConfig kafkaConfig = new KafkaConfig(properties);
133  kafka = new KafkaServerStartable(kafkaConfig);
134  kafka.startup();
135  }
136 
137  public void stop() throws IOException {
138  kafka.shutdown();
139  zk.stop();
140  zk.close();
141  }
142 
143  private int getZkPort(Properties properties) {
144  String url = (String) properties.get("zookeeper.connect");
145  String port = url.split(":")[1];
146  return Integer.valueOf(port);
147  }
148  }
149 }
value
Definition: nodes.py:62
static final String topic
Definition: Original.java:50