Open Kilda Java Documentation
KafkaFilerTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.utils;
17 
20 
21 import org.apache.storm.generated.StormTopology;
22 import org.apache.storm.topology.TopologyBuilder;
23 
24 import java.io.File;
25 
29 public class KafkaFilerTopology extends AbstractTopology<KafkaFilerTopologyConfig> {
30  private String topic;
34  private FilerBolt filer;
35 
37  super(env, KafkaFilerTopologyConfig.class);
38 
39  topic = topologyConfig.getKafkaSpeakerTopic();
40  }
41 
42  public KafkaFilerTopology(LaunchEnvironment env, String topic) {
43  super(env, KafkaFilerTopologyConfig.class);
44 
45  this.topic = topic;
46  }
47 
48  @Override
49  public StormTopology createTopology() {
50  final String directory = topologyConfig.getFilterDirectory();
51  final String name = String.format("%s_%s_%s_%d", getTopologyName(), topic, directory, System.currentTimeMillis());
52 
53  String spoutId = "KafkaSpout-" + topic;
54  int parallelism = 1;
55 
56  TopologyBuilder builder = new TopologyBuilder();
57  builder.setSpout(spoutId, createKafkaSpout(topic, name), parallelism);
58  filer = new FilerBolt().withFileName("utils-" + topic + ".log");
59  if (directory.length() != 0)
60  filer.withDir(new File(directory));
61 
62  builder.setBolt("utils", filer, parallelism)
63  .shuffleGrouping(spoutId);
64  return builder.createTopology();
65  }
66 
67  public FilerBolt getFiler() {
68  return filer;
69  }
70 
71  public static void main(String[] args) {
72  try {
74  (new KafkaFilerTopology(env)).setup();
75  } catch (Exception e) {
76  System.exit(handleLaunchException(e));
77  }
78  }
79 }
KafkaFilerTopology(LaunchEnvironment env, String topic)
name
Definition: setup.py:24
static int handleLaunchException(Exception error)
FilerBolt withFileName(String fileName)
Definition: FilerBolt.java:48
KafkaSpout< String, String > createKafkaSpout(String topic, String spoutId)