Open Kilda Java Documentation
CacheFilterBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.stats.bolts;
17 
18 import static org.openkilda.messaging.Utils.MAPPER;
20 
21 import org.apache.storm.task.OutputCollector;
22 import org.apache.storm.task.TopologyContext;
23 import org.apache.storm.topology.OutputFieldsDeclarer;
24 import org.apache.storm.topology.base.BaseRichBolt;
25 import org.apache.storm.tuple.Fields;
26 import org.apache.storm.tuple.Tuple;
27 import org.apache.storm.tuple.Values;
37 
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 
41 import java.io.IOException;
42 import java.util.Map;
43 
44 public class CacheFilterBolt extends BaseRichBolt {
45 
46  public enum FieldsNames {
51  MEASURE_POINT
52  }
53 
54  public enum Commands {
56  REMOVE
57  }
58 
59  public static final Fields fieldsMessageUpdateCache =
60  new Fields(
61  FieldsNames.COMMAND.name(),
62  FieldsNames.FLOW.name(),
63  FieldsNames.SWITCH.name(),
64  FieldsNames.COOKIE.name(),
65  FieldsNames.MEASURE_POINT.name());
66 
67 
68  private static final Logger logger = LoggerFactory.getLogger(CacheFilterBolt.class);
69 
70  private TopologyContext context;
71  private OutputCollector outputCollector;
72 
73 
77  @Override
78  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
79  this.context = topologyContext;
80  this.outputCollector = outputCollector;
81  }
82 
86  @Override
87  public void execute(Tuple tuple) {
88 
89  String json = tuple.getString(0);
90 
91  try {
92  BaseMessage bm = MAPPER.readValue(json, BaseMessage.class);
93  if (bm instanceof CommandMessage) {
94  CommandMessage message = (CommandMessage) bm;
95  CommandData data = message.getData();
96  if (data instanceof InstallIngressFlow) {
98  logMatchedRecord(command);
100  } else if (data instanceof InstallEgressFlow) {
102  logMatchedRecord(command);
103  emit(tuple, Commands.UPDATE, command, MeasurePoint.EGRESS);
104  } else if (data instanceof InstallOneSwitchFlow) {
106  logMatchedRecord(command);
107  emit(tuple, Commands.UPDATE, command, MeasurePoint.INGRESS);
108  emit(tuple, Commands.UPDATE, command, MeasurePoint.EGRESS);
109  } else if (data instanceof RemoveFlow) {
111  logMatchedRecord(command);
112  emit(tuple, Commands.REMOVE, command);
113  }
114  }
115  } catch (IOException exception) {
116  logger.error("Could not deserialize message {}", tuple, exception);
117  } catch (Exception e) {
118  logger.error(String.format("Unhandled exception in %s", getClass().getName()), e);
119  } finally {
120  outputCollector.ack(tuple);
121  }
122  }
123 
124  private void emit(Tuple tuple, Commands action, BaseFlow command) {
125  emit(tuple, action, command, null);
126  }
127 
128  private void emit(Tuple tuple, Commands action, BaseFlow command, MeasurePoint point) {
129  Values values = new Values(
130  action,
131  command.getId(),
132  command.getSwitchId(),
133  command.getCookie(),
134  point);
135  outputCollector.emit(CACHE_UPDATE.name(), tuple, values);
136  }
137 
141  @Override
142  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
143  outputFieldsDeclarer.declareStream(CACHE_UPDATE.name(), fieldsMessageUpdateCache);
144  }
145 
146  private void logMatchedRecord(BaseFlow flowCommand) {
147  logger.debug("Catch {} command flow_id={} sw={} cookie={}",
148  flowCommand.getClass().getCanonicalName(),
149  flowCommand.getId(), flowCommand.getSwitchId(), flowCommand.getCookie());
150  }
151 }
static final ObjectMapper MAPPER
Definition: Utils.java:31
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
def command(payload, fields)
Definition: share.py:102
void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector)