Open Kilda Java Documentation
NorthboundReplyBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
19 
26 
27 import com.fasterxml.jackson.core.JsonProcessingException;
28 import org.slf4j.LoggerFactory;
29 import org.slf4j.Logger;
30 import org.apache.storm.task.OutputCollector;
31 import org.apache.storm.task.TopologyContext;
32 import org.apache.storm.topology.OutputFieldsDeclarer;
33 import org.apache.storm.topology.base.BaseRichBolt;
34 import org.apache.storm.tuple.Tuple;
35 import org.apache.storm.tuple.Values;
36 
37 import java.util.Map;
38 
42 public class NorthboundReplyBolt extends BaseRichBolt {
46  private static final Logger logger = LoggerFactory.getLogger(NorthboundReplyBolt.class);
47 
51  private OutputCollector outputCollector;
52 
56  @Override
57  public void execute(Tuple tuple) {
58  ComponentType componentId = ComponentType.valueOf(tuple.getSourceComponent());
59  String streamId = tuple.getSourceStreamId();
60  Message message = (Message) tuple.getValueByField(AbstractTopology.MESSAGE_FIELD);
61  Values values = null;
62 
63  try {
64  logger.debug("Request tuple={}", tuple);
65 
66  switch (componentId) {
67 
68  case VERIFICATION_JOINT_BOLT:
69  case CRUD_BOLT:
70  case ERROR_BOLT:
71  logger.debug("Flow response: {}={}, component={}, stream={}, message={}",
72  Utils.CORRELATION_ID, message.getCorrelationId(), componentId, streamId, message);
73 
75  values = new Values(MAPPER.writeValueAsString(message));
76  outputCollector.emit(StreamType.RESPONSE.toString(), tuple, values);
77 
78  break;
79 
80  default:
81  logger.debug("Flow UNKNOWN response: {}={}, component={}, stream={}, message={}",
82  Utils.CORRELATION_ID, message.getCorrelationId(), componentId, streamId, message);
83  break;
84  }
85  } catch (JsonProcessingException exception) {
86  logger.error("Could not serialize message: component={}, stream={}, message={}",
87  componentId, streamId, message);
88  } catch (Exception e) {
89  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
90  } finally {
91  outputCollector.ack(tuple);
92 
93  logger.debug("Northbound-Reply message ack: component={}, stream={}, tuple={}, values={}",
94  tuple.getSourceComponent(), tuple.getSourceStreamId(), tuple, values);
95  }
96  }
97 
101  @Override
102  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
103  outputFieldsDeclarer.declareStream(StreamType.RESPONSE.toString(), AbstractTopology.fieldMessage);
104  }
105 
109  @Override
110  public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
111  this.outputCollector = outputCollector;
112  }
113 }
114 
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
static final ObjectMapper MAPPER
Definition: Utils.java:31
void setDestination(final Destination destination)
Definition: Message.java:111
static final String CORRELATION_ID
Definition: Utils.java:43
void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector)