16 package org.openkilda.wfm;
18 import org.apache.kafka.clients.consumer.ConsumerRecord;
19 import org.apache.kafka.clients.consumer.ConsumerRecords;
20 import org.apache.kafka.clients.consumer.KafkaConsumer;
21 import org.apache.kafka.common.utils.Utils;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.Properties;
34 List<String> results =
new ArrayList<>();
36 Properties props =
new Properties();
37 props.put(
"bootstrap.servers",
"localhost:9092");
38 props.put(
"group.id",
"test");
39 props.put(
"enable.auto.commit",
"true");
40 props.put(
"auto.commit.interval.ms",
"1000");
41 props.put(
"session.timeout.ms",
"15000");
42 props.put(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
43 props.put(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
44 KafkaConsumer<String, String>
consumer =
new KafkaConsumer<>(props);
45 String
topic =
"speaker.info.switch.updown";
52 System.out.println(
"consumer.listTopics() = " +
consumer.listTopics());
54 System.out.println(
"");
55 for (
int i = 0;
i < 10;
i++) {
56 ConsumerRecords<String, String> records =
consumer.poll(500);
57 System.out.println(
".");
58 for (ConsumerRecord<String, String> record : records) {
59 System.out.printf(
"offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
60 System.out.print(
"$");
61 results.add(record.value());
65 System.out.println(
"");
static void main(String[] args)