Open Kilda Java Documentation
AbstractStormTest.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm;
17 
23 
24 import org.apache.kafka.clients.consumer.ConsumerConfig;
25 import org.apache.kafka.clients.producer.ProducerConfig;
26 import org.apache.storm.Config;
27 import org.apache.storm.LocalCluster;
28 import org.apache.storm.testing.CompleteTopologyParam;
29 import org.apache.storm.testing.MkClusterParam;
30 import org.junit.AfterClass;
31 import org.junit.BeforeClass;
32 import org.junit.ClassRule;
33 import org.junit.rules.TemporaryFolder;
34 import org.kohsuke.args4j.CmdLineException;
35 
36 import java.io.File;
37 import java.io.FileWriter;
38 import java.io.IOException;
39 import java.util.Properties;
40 
44 public class AbstractStormTest {
45  protected static String CONFIG_NAME = "class-level-overlay.properties";
46  protected static String NEO4J_LISTEN_ADDRESS = "localhost:27600";
47 
48  protected static TestKafkaProducer kProducer;
49  protected static LocalCluster cluster;
50  protected static MkClusterParam clusterParam;
51  protected static CompleteTopologyParam completeTopologyParam;
52  static TestUtils.KafkaTestFixture server;
53 
54  @ClassRule
55  public static TemporaryFolder fsData = new TemporaryFolder();
56 
57  protected static Properties kafkaProperties() throws ConfigurationException, CmdLineException {
58  Properties properties = new Properties();
59  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, makeUnboundConfig(KafkaConfig.class).getHosts());
60  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
61  properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
62  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
63  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
64  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
65  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
66  properties.put("request.required.acks", "1");
67  return properties;
68  }
69 
70  protected static Properties kafkaProperties(final String groupId) throws ConfigurationException, CmdLineException {
71  Properties properties = kafkaProperties();
72  properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
73  return properties;
74  }
75 
76  protected static Config stormConfig() {
77  Config config = new Config();
78  config.setDebug(false);
79  config.setMaxTaskParallelism(1);
80  config.setNumWorkers(1);
81  return config;
82  }
83 
84  @BeforeClass
85  public static void setupOnce() throws Exception {
86  System.out.println("------> Creating Sheep \uD83D\uDC11\n");
87 
89 
91  server.start();
92 
93  cluster = new LocalCluster();
95  }
96 
97  @AfterClass
98  public static void teardownOnce() throws Exception {
99  System.out.println("------> Killing Sheep \uD83D\uDC11\n");
100  kProducer.close();
101  cluster.shutdown();
102  server.stop();
103  }
104 
105  protected static <T> T makeUnboundConfig(Class<T> configurationType)
106  throws ConfigurationException, CmdLineException {
108  ConfigurationProvider configurationProvider = env.getConfigurationProvider();
109  return configurationProvider.getConfiguration(configurationType);
110  }
111 
112  protected static LaunchEnvironment makeLaunchEnvironment() throws CmdLineException, ConfigurationException {
113  String args[] = makeLaunchArgs();
114  return new LaunchEnvironment(args);
115  }
116  protected static LaunchEnvironment makeLaunchEnvironment(Properties overlay)
117  throws CmdLineException, ConfigurationException, IOException {
118  String extra = fsData.newFile().getName();
119  makeConfigFile(overlay, extra);
120 
121  String args[] = makeLaunchArgs(extra);
122  return new LaunchEnvironment(args);
123  }
124 
125  protected static String[] makeLaunchArgs(String ...extraConfig) {
126  String args[] = new String[extraConfig.length + 1];
127 
128  File root = fsData.getRoot();
129  args[0] = new File(root, CONFIG_NAME).toString();
130  for (int idx = 0; idx < extraConfig.length; idx += 1) {
131  args[idx + 1] = new File(root, extraConfig[idx]).toString();
132  }
133 
134  return args;
135  }
136 
137  protected static void makeConfigFile() throws IOException {
139  }
140 
141  protected static void makeConfigFile(Properties overlay, String location) throws IOException {
142  File path = new File(fsData.getRoot(), location);
143  overlay.store(new FileWriter(path), null);
144  }
145 
146  protected static Properties makeConfigOverlay() {
147  return new Properties();
148  }
149 }
root
Definition: setup.py:12
static LaunchEnvironment makeLaunchEnvironment()
static CompleteTopologyParam completeTopologyParam
static< T > T makeUnboundConfig(Class< T > configurationType)
static Properties kafkaProperties(final String groupId)
static String [] makeLaunchArgs(String ...extraConfig)
static LaunchEnvironment makeLaunchEnvironment(Properties overlay)
static void makeConfigFile(Properties overlay, String location)