16 package org.openkilda.wfm.topology;
18 import org.apache.storm.task.IOutputCollector;
19 import org.apache.storm.tuple.Tuple;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicInteger;
32 private Map<String, AtomicInteger> messages =
new ConcurrentHashMap<>();
35 public List<Integer>
emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
36 AtomicInteger
count = messages.computeIfAbsent(streamId, k ->
new AtomicInteger(0));
37 count.incrementAndGet();
42 public void emitDirect(
int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
47 public void ack(Tuple input) {
52 public void fail(Tuple input) {
67 return messages.get(streamId).get();
70 public List<Integer>
emit(String streamId, Tuple anchor, List<Object> tuple) {
71 return emit(streamId, Arrays.asList(anchor), tuple);
void emitDirect(int taskId, String streamId, Collection< Tuple > anchors, List< Object > tuple)
List< Integer > emit(String streamId, Collection< Tuple > anchors, List< Object > tuple)
void reportError(Throwable error)
int getMessagesCount(String streamId)
List< Integer > emit(String streamId, Tuple anchor, List< Object > tuple)
void resetTimeout(Tuple input)