Open Kilda Java Documentation
FilerBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.utils;
17 
18 import com.google.common.base.Charsets;
19 import com.google.common.io.Files;
20 import org.slf4j.LoggerFactory;
21 import org.slf4j.Logger;
22 import org.apache.storm.task.OutputCollector;
23 import org.apache.storm.task.TopologyContext;
24 import org.apache.storm.topology.OutputFieldsDeclarer;
25 import org.apache.storm.topology.base.BaseRichBolt;
26 import org.apache.storm.tuple.Tuple;
27 
28 import java.io.File;
29 import java.io.IOException;
30 import java.util.Map;
31 
35 public class FilerBolt extends BaseRichBolt {
36 
37  private static Logger logger = LoggerFactory.getLogger(FilerBolt.class);
38  public File dir = Files.createTempDir();
39  public String fileName = "utils.log";
40  private OutputCollector _collector;
41  private File file;
42 
43  public FilerBolt withDir(File dir) {
44  this.dir = dir;
45  return this;
46  }
47 
48  public FilerBolt withFileName(String fileName) {
49  this.fileName = fileName;
50  return this;
51  }
52 
53  public File getFile() {
54  if (file == null) {
55  dir.mkdirs();
56  file = new File(dir.getAbsolutePath(), fileName);
57  }
58  return file;
59  }
60 
61  @Override
62  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
63  _collector = collector;
64  }
65 
66  @Override
67  public void execute(Tuple tuple) {
68  File file = getFile();
69  logger.debug("FILER: Writing tuple to disk: File = {}, tuple={}", file.getAbsolutePath(), tuple);
70 
71  try {
72  // Start with just the values; determine later if the fields are needed.
73  //Files.append(tuple.getFields().toString(), file, Charsets.UTF_8);
74  Files.append(tuple.getValues().toString() + "\n", file, Charsets.UTF_8);
75  } catch (IOException e) {
76  logger.error("FILER: couldn't append to file: {}. Exception: {}. Cause: {}",
77  file.getAbsolutePath(), e.getMessage(), e.getCause());
78  }
79  _collector.ack(tuple);
80  }
81 
82  @Override
83  public void declareOutputFields(OutputFieldsDeclarer declarer) {
84  }
85 
86 }
FilerBolt withFileName(String fileName)
Definition: FilerBolt.java:48
void prepare(Map conf, TopologyContext context, OutputCollector collector)
Definition: FilerBolt.java:62
void declareOutputFields(OutputFieldsDeclarer declarer)
Definition: FilerBolt.java:83