Open Kilda Java Documentation
LinkOperationsBolt.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.nbworker.bolts;
17 
27 
28 import org.apache.storm.topology.OutputFieldsDeclarer;
29 import org.apache.storm.tuple.Fields;
30 import org.apache.storm.tuple.Tuple;
31 import org.neo4j.driver.v1.Session;
32 import org.neo4j.driver.v1.StatementResult;
33 import org.neo4j.driver.v1.Value;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Optional;
41 import java.util.stream.Collectors;
42 
43 public class LinkOperationsBolt extends NeoOperationsBolt {
44 
45  private static final Logger logger = LoggerFactory.getLogger(LinkOperationsBolt.class);
46 
47  public LinkOperationsBolt(Auth neoAuth) {
48  super(neoAuth);
49  }
50 
51  @Override
52  List<? extends InfoData> processRequest(Tuple tuple, BaseRequest request, Session session) {
53  List<? extends InfoData> result = null;
54  if (request instanceof GetLinksRequest) {
55  result = getAllLinks(session);
56  } else if (request instanceof LinkPropsGet) {
57  result = getLinkProps((LinkPropsGet) request, session);
58  } else {
59  unhandledInput(tuple);
60  }
61 
62  return result;
63  }
64 
65  private List<IslInfoData> getAllLinks(Session session) {
66  logger.debug("Processing get all links request");
67  String q =
68  "MATCH (:switch)-[isl:isl]->(:switch) "
69  + "RETURN isl";
70 
71  StatementResult queryResults = session.run(q);
72  List<IslInfoData> results = queryResults.list()
73  .stream()
74  .map(record -> record.get("isl"))
75  .map(Value::asRelationship)
77  .collect(Collectors.toList());
78  logger.debug("Found {} links in the database", results.size());
79  return results;
80  }
81 
82  private List<LinkPropsData> getLinkProps(LinkPropsGet request, Session session) {
83  logger.debug("Processing get link props request");
84  String q = "MATCH (props:link_props) "
85  + "WHERE ({src_switch} IS NULL OR props.src_switch={src_switch}) "
86  + "AND ({src_port} IS NULL OR props.src_port={src_port}) "
87  + "AND ({dst_switch} IS NULL OR props.dst_switch={dst_switch}) "
88  + "AND ({dst_port} IS NULL OR props.dst_port={dst_port}) "
89  + "RETURN props";
90 
91  Map<String, Object> parameters = new HashMap<>();
92  String srcSwitch = Optional.ofNullable(request.getSource().getDatapath())
93  .map(SwitchId::toString)
94  .orElse(null);
95  parameters.put("src_switch", srcSwitch);
96  parameters.put("src_port", request.getSource().getPortNumber());
97  String dstSwitch = Optional.ofNullable(request.getDestination().getDatapath())
98  .map(SwitchId::toString)
99  .orElse(null);
100  parameters.put("dst_switch", dstSwitch);
101  parameters.put("dst_port", request.getDestination().getPortNumber());
102 
103  StatementResult queryResults = session.run(q, parameters);
104  List<LinkPropsData> results = queryResults.list()
105  .stream()
106  .map(record -> record.get("props"))
107  .map(Value::asNode)
109  .collect(Collectors.toList());
110 
111  logger.debug("Found {} link props in the database", results.size());
112  return results;
113  }
114 
115  @Override
116  public void declareOutputFields(OutputFieldsDeclarer declarer) {
117  declarer.declare(new Fields("response", "correlationId"));
118  }
119 
120  @Override
121  Logger getLogger() {
122  return logger;
123  }
124 
125 }
list result
Definition: plan-d.py:72
void unhandledInput(Tuple input)