16 package org.openkilda.wfm.topology.utils;
18 import com.google.common.base.Charsets;
19 import com.google.common.io.Files;
20 import org.slf4j.LoggerFactory;
21 import org.slf4j.Logger;
22 import org.apache.storm.task.OutputCollector;
23 import org.apache.storm.task.TopologyContext;
24 import org.apache.storm.topology.OutputFieldsDeclarer;
25 import org.apache.storm.topology.base.BaseRichBolt;
26 import org.apache.storm.tuple.Tuple;
29 import java.io.IOException;
37 private static Logger logger = LoggerFactory.getLogger(
FilerBolt.class);
38 public File
dir = Files.createTempDir();
40 private OutputCollector _collector;
62 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
63 _collector = collector;
69 logger.debug(
"FILER: Writing tuple to disk: File = {}, tuple={}", file.getAbsolutePath(), tuple);
74 Files.append(tuple.getValues().toString() +
"\n", file, Charsets.UTF_8);
75 }
catch (IOException e) {
76 logger.error(
"FILER: couldn't append to file: {}. Exception: {}. Cause: {}",
77 file.getAbsolutePath(), e.getMessage(), e.getCause());
79 _collector.ack(tuple);
FilerBolt withDir(File dir)
FilerBolt withFileName(String fileName)
void execute(Tuple tuple)
void prepare(Map conf, TopologyContext context, OutputCollector collector)
void declareOutputFields(OutputFieldsDeclarer declarer)