Open Kilda Java Documentation
OpenTSDBTopology.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.opentsdb;
17 
23 
24 import com.google.common.annotations.VisibleForTesting;
25 import org.apache.storm.generated.StormTopology;
26 import org.apache.storm.kafka.spout.KafkaSpout;
27 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
28 import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
29 import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
30 import org.apache.storm.opentsdb.client.OpenTsdbClient;
31 import org.apache.storm.topology.TopologyBuilder;
32 import org.apache.storm.tuple.Fields;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 
36 import java.util.Collections;
37 
41 public class OpenTSDBTopology extends AbstractTopology<OpenTsdbTopologyConfig> {
42 
43  private static final Logger LOGGER = LoggerFactory.getLogger(OpenTSDBTopology.class);
44 
46  super(env, OpenTsdbTopologyConfig.class);
47  }
48 
49  @VisibleForTesting
50  static final String OTSDB_SPOUT_ID = "kilda.otsdb-spout";
51  private static final String OTSDB_BOLT_ID = "otsdb-bolt";
52  private static final String OTSDB_FILTER_BOLT_ID = OpenTSDBFilterBolt.class.getSimpleName();
53  private static final String OTSDB_PARSE_BOLT_ID = DatapointParseBolt.class.getSimpleName();
54 
55  @Override
56  public StormTopology createTopology() {
57  LOGGER.info("Creating OpenTSDBTopology - {}", topologyName);
58 
59  TopologyBuilder tb = new TopologyBuilder();
60 
61  attachInput(tb);
62 
63  OpenTsdbConfig openTsdbConfig = topologyConfig.getOpenTsdbConfig();
64 
65  tb.setBolt(OTSDB_PARSE_BOLT_ID, new DatapointParseBolt(), openTsdbConfig.getDatapointParseBoltExecutors())
66  .setNumTasks(openTsdbConfig.getDatapointParseBoltWorkers())
67  .shuffleGrouping(OTSDB_SPOUT_ID);
68 
69  tb.setBolt(OTSDB_FILTER_BOLT_ID, new OpenTSDBFilterBolt(), openTsdbConfig.getFilterBoltExecutors())
70  .fieldsGrouping(OTSDB_PARSE_BOLT_ID, new Fields("hash"));
71 
72  OpenTsdbClient.Builder tsdbBuilder = OpenTsdbClient
73  .newBuilder(openTsdbConfig.getHosts())
74  // .sync(config.getOpenTsdbTimeout())
75  .returnDetails();
76  if (openTsdbConfig.getClientChunkedRequestsEnabled()) {
77  tsdbBuilder.enableChunkedEncoding();
78  }
79 
80  OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(tsdbBuilder,
81  Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
82  openTsdbBolt.withBatchSize(openTsdbConfig.getBatchSize()).withFlushInterval(openTsdbConfig.getFlushInterval());
83  // .failTupleForFailedMetrics();
84  tb.setBolt(OTSDB_BOLT_ID, openTsdbBolt, openTsdbConfig.getBoltExecutors())
85  .setNumTasks(openTsdbConfig.getBoltWorkers())
86  .shuffleGrouping(OTSDB_FILTER_BOLT_ID);
87 
88  return tb.createTopology();
89  }
90 
91  private void attachInput(TopologyBuilder topology) {
92  String otsdbTopic = topologyConfig.getKafkaOtsdbTopic();
93  checkAndCreateTopic(otsdbTopic);
94 
95  OpenTsdbConfig openTsdbConfig = topologyConfig.getOpenTsdbConfig();
96 
97  KafkaSpoutConfig<String, String> spoutConfig = makeKafkaSpoutConfigBuilder(OTSDB_SPOUT_ID, otsdbTopic)
98  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
99  .build();
100  KafkaSpout kafkaSpout = new KafkaSpout<>(spoutConfig);
101  topology.setSpout(OTSDB_SPOUT_ID, kafkaSpout, openTsdbConfig.getNumSpouts());
102  }
103 
107  public static void main(String[] args) {
108  try {
110  (new OpenTSDBTopology(env)).setup();
111  } catch (Exception e) {
112  System.exit(handleLaunchException(e));
113  }
114  }
115 }
KafkaSpoutConfig.Builder< String, String > makeKafkaSpoutConfigBuilder(String spoutId, String topic)
static int handleLaunchException(Exception error)