16 package org.openkilda.wfm;
20 import com.google.common.io.Files;
21 import kafka.server.KafkaConfig;
22 import kafka.server.KafkaServerStartable;
23 import org.apache.curator.test.TestingServer;
24 import org.apache.storm.Config;
27 import java.io.IOException;
28 import java.util.Properties;
37 Properties props =
new Properties();
38 props.setProperty(
"zookeeper.connect",
config.getHosts());
39 props.setProperty(
"broker.id",
"1");
40 props.setProperty(
"delete.topic.enable",
"true");
45 Config
config =
new Config();
51 public static class KafkaTestFixture {
52 public TestingServer zk;
53 public KafkaServerStartable kafka;
54 public File tempDir = Files.createTempDir();
58 this.zooKeeperConfig = zooKeeperConfig;
61 public void start() throws Exception {
66 public void start(Properties props)
throws Exception {
67 Integer
port = getZkPort(props);
69 zk =
new TestingServer(
port, tempDir);
70 System.out.println(
"Started ZooKeeper: ");
71 System.out.println(
"--> Temp Directory: " + zk.getTempDirectory());
73 props.put(
"log.dirs", tempDir.getAbsolutePath());
74 KafkaConfig kafkaConfig =
new KafkaConfig(props);
75 kafka =
new KafkaServerStartable(kafkaConfig);
77 System.out.println(
"Started KAFKA: ");
80 public void stop() throws IOException {
87 private int getZkPort(Properties properties) {
88 String
url = (String) properties.get(
"zookeeper.connect");
89 String
port =
url.split(
":")[1];
90 return Integer.valueOf(
port);
static Config stormConfig()
static Properties serverProperties(ZookeeperConfig config)