16 package org.openkilda.wfm.topology.utils;
21 import org.apache.storm.generated.StormTopology;
22 import org.apache.storm.topology.TopologyBuilder;
51 final String
name = String.format(
"%s_%s_%s_%d",
getTopologyName(), topic, directory, System.currentTimeMillis());
53 String spoutId =
"KafkaSpout-" + topic;
56 TopologyBuilder builder =
new TopologyBuilder();
59 if (directory.length() != 0)
60 filer.
withDir(
new File(directory));
62 builder.setBolt(
"utils", filer, parallelism)
63 .shuffleGrouping(spoutId);
64 return builder.createTopology();
75 }
catch (Exception e) {
KafkaFilerTopology(LaunchEnvironment env, String topic)
static void main(String[] args)
StormTopology createTopology()
FilerBolt withDir(File dir)
final String getTopologyName()
static int handleLaunchException(Exception error)
FilerBolt withFileName(String fileName)
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)
KafkaFilerTopology(LaunchEnvironment env)