16 package org.openkilda.wfm;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import org.apache.storm.LocalCluster;
23 import org.apache.storm.state.KeyValueState;
24 import org.apache.storm.testing.FeederSpout;
25 import org.apache.storm.topology.TopologyBuilder;
26 import org.apache.storm.tuple.Fields;
27 import org.apache.storm.tuple.Tuple;
28 import org.apache.storm.utils.Utils;
29 import org.junit.Assert;
30 import org.junit.Test;
32 import java.io.IOException;
33 import java.util.Arrays;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
48 private static class SimpleStatefulTick
54 protected SimpleStatefulTick(){
59 protected void doTick(Tuple tuple) {
64 protected void doWork(Tuple tuple) {
69 public void initState(KeyValueState<String, ConcurrentHashMap<String, String>> state) {
74 return new FeederSpout(
new Fields(
"key",
"message"));
79 System.out.println(
"==> Starting BasicTickTest");
81 String spoutId =
"feeder.spout";
82 String boltId =
"tick.bolt";
83 String topoId =
"TestTopology";
86 TopologyBuilder builder =
new TopologyBuilder();
88 builder.setSpout(spoutId, spout);
89 SimpleStatefulTick tickBolt =
new SimpleStatefulTick();
90 builder.setBolt(boltId, tickBolt).shuffleGrouping(spoutId);
91 LocalCluster cluster =
new LocalCluster();
95 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
96 spout.feed(Arrays.asList(
new String[]{
"key1",
"msg1"}));
99 Uninterruptibles.sleepUninterruptibly(6, TimeUnit.SECONDS);
102 int expectedLines = 3;
103 Assert.assertTrue(
"We should have at least " + expectedLines +
" lines in the test file.",
104 expectedLines <= tickBolt.tickFile.numLines());
105 Assert.assertEquals(1, tickBolt.workFile.numLines());
107 cluster.killTopology(topoId);
108 Utils.sleep(4 * 1000);
boolean append(String text)
static Config stormConfig()
FileUtil withFileName(String fileName)
static FeederSpout createFeeder()