Open Kilda Java Documentation
PopulateIslFilterAction.java
Go to the documentation of this file.
1 package org.openkilda.wfm.topology.event;
2 
15 
16 import com.fasterxml.jackson.core.JsonProcessingException;
17 import org.apache.storm.tuple.Tuple;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
22  private final Logger logger = LoggerFactory.getLogger(PopulateIslFilterAction.class);
23  private final DummyIIslFilter filter;
24 
25  public PopulateIslFilterAction(IKildaBolt bolt, Tuple tuple, DummyIIslFilter filter) {
26  super(bolt, tuple);
27  this.filter = filter;
28  }
29 
30  @Override
31  protected void handle() throws MessageFormatException, UnsupportedActionException, JsonProcessingException {
32  KafkaMessage input = new KafkaMessage(getTuple());
33  Message message = input.getPayload();
34 
35  if (message.getDestination() != Destination.WFM_OF_DISCOVERY) {
36  return;
37  }
38  if (! (message instanceof CommandMessage)) {
39  return;
40  }
41 
43  if (!(command.getData() instanceof DiscoveryFilterPopulateData)) {
44  return;
45  }
46 
48 
49  logger.info("Clean ISL filter");
50  filter.clear();
51  for (DiscoveryFilterEntity entity : payload.getFilter()) {
52  logger.info("Add ISL filter record - switcID=\"{}\" portId=\"{}\"", entity.switchId, entity.portId);
53  filter.add(new SwitchId(entity.switchId), entity.portId);
54  }
55  }
56 
57  @Override
58  protected Boolean handleError(Exception e) {
59  boolean isHandled = true;
60 
61  try {
62  throw e;
63  } catch (MessageFormatException exc) {
64  logger.error("Can\'t unpack input tuple: {}", exc.getCause().getMessage());
65 
66  for (int i = 0; i < getTuple().size(); i++) {
67  logger.error("Field #{}: {}", i, getTuple().getValue(i));
68  }
69  } catch (Exception exc) {
70  isHandled = false;
71  }
72 
73  return isHandled;
74  }
75 }
void add(SwitchId switchId, int portId)
def command(payload, fields)
Definition: share.py:102
PopulateIslFilterAction(IKildaBolt bolt, Tuple tuple, DummyIIslFilter filter)