16 package org.openkilda.wfm;
24 import org.apache.kafka.clients.consumer.ConsumerConfig;
25 import org.apache.kafka.clients.producer.ProducerConfig;
26 import org.apache.storm.Config;
27 import org.apache.storm.LocalCluster;
28 import org.apache.storm.testing.CompleteTopologyParam;
29 import org.apache.storm.testing.MkClusterParam;
30 import org.junit.AfterClass;
31 import org.junit.BeforeClass;
32 import org.junit.ClassRule;
33 import org.junit.rules.TemporaryFolder;
34 import org.kohsuke.args4j.CmdLineException;
37 import java.io.FileWriter;
38 import java.io.IOException;
39 import java.util.Properties;
45 protected static String
CONFIG_NAME =
"class-level-overlay.properties";
55 public static TemporaryFolder
fsData =
new TemporaryFolder();
58 Properties properties =
new Properties();
60 properties.put(ConsumerConfig.GROUP_ID_CONFIG,
"test");
61 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
62 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
63 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
64 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
65 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
66 properties.put(
"request.required.acks",
"1");
72 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
77 Config
config =
new Config();
79 config.setMaxTaskParallelism(1);
86 System.out.println(
"------> Creating Sheep \uD83D\uDC11\n");
99 System.out.println(
"------> Killing Sheep \uD83D\uDC11\n");
109 return configurationProvider.getConfiguration(configurationType);
118 String extra =
fsData.newFile().getName();
126 String
args[] =
new String[extraConfig.length + 1];
130 for (
int idx = 0; idx < extraConfig.length; idx += 1) {
131 args[idx + 1] =
new File(
root, extraConfig[idx]).toString();
141 protected static void makeConfigFile(Properties overlay, String location)
throws IOException {
142 File
path =
new File(
fsData.getRoot(), location);
143 overlay.store(
new FileWriter(
path), null);
147 return new Properties();
static TemporaryFolder fsData
static Config stormConfig()
static LaunchEnvironment makeLaunchEnvironment()
static MkClusterParam clusterParam
static TestKafkaProducer kProducer
static CompleteTopologyParam completeTopologyParam
static void teardownOnce()
static LocalCluster cluster
static< T > T makeUnboundConfig(Class< T > configurationType)
static Properties kafkaProperties(final String groupId)
static String CONFIG_NAME
static String [] makeLaunchArgs(String ...extraConfig)
static LaunchEnvironment makeLaunchEnvironment(Properties overlay)
static void makeConfigFile(Properties overlay, String location)
static Properties makeConfigOverlay()
static void makeConfigFile()
static Properties kafkaProperties()
static String NEO4J_LISTEN_ADDRESS