16 package org.openkilda.wfm.topology.nbworker.bolts;
28 import org.apache.storm.topology.OutputFieldsDeclarer;
29 import org.apache.storm.tuple.Fields;
30 import org.apache.storm.tuple.Tuple;
31 import org.neo4j.driver.v1.Session;
32 import org.neo4j.driver.v1.StatementResult;
33 import org.neo4j.driver.v1.Value;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.Optional;
41 import java.util.stream.Collectors;
45 private static final Logger logger = LoggerFactory.getLogger(
LinkOperationsBolt.class);
52 List<? extends InfoData> processRequest(Tuple tuple,
BaseRequest request, Session session) {
53 List<? extends InfoData>
result = null;
55 result = getAllLinks(session);
56 }
else if (request instanceof LinkPropsGet) {
57 result = getLinkProps((LinkPropsGet) request, session);
65 private List<IslInfoData> getAllLinks(Session session) {
66 logger.debug(
"Processing get all links request");
68 "MATCH (:switch)-[isl:isl]->(:switch) " 71 StatementResult queryResults = session.run(q);
72 List<IslInfoData> results = queryResults.list()
74 .map(record -> record.get(
"isl"))
75 .map(Value::asRelationship)
77 .collect(Collectors.toList());
78 logger.debug(
"Found {} links in the database", results.size());
82 private List<LinkPropsData> getLinkProps(LinkPropsGet request, Session session) {
83 logger.debug(
"Processing get link props request");
84 String q =
"MATCH (props:link_props) " 85 +
"WHERE ({src_switch} IS NULL OR props.src_switch={src_switch}) " 86 +
"AND ({src_port} IS NULL OR props.src_port={src_port}) " 87 +
"AND ({dst_switch} IS NULL OR props.dst_switch={dst_switch}) " 88 +
"AND ({dst_port} IS NULL OR props.dst_port={dst_port}) " 91 Map<String, Object> parameters =
new HashMap<>();
92 String srcSwitch = Optional.ofNullable(request.getSource().getDatapath())
95 parameters.put(
"src_switch", srcSwitch);
96 parameters.put(
"src_port", request.getSource().getPortNumber());
97 String dstSwitch = Optional.ofNullable(request.getDestination().getDatapath())
100 parameters.put(
"dst_switch", dstSwitch);
101 parameters.put(
"dst_port", request.getDestination().getPortNumber());
103 StatementResult queryResults = session.run(q, parameters);
104 List<LinkPropsData> results = queryResults.list()
106 .map(record -> record.get(
"props"))
109 .collect(Collectors.toList());
111 logger.debug(
"Found {} link props in the database", results.size());
117 declarer.declare(
new Fields(
"response",
"correlationId"));
static IslInfoData toIslInfoData(Relationship relationship)
void declareOutputFields(OutputFieldsDeclarer declarer)
static LinkPropsData toLinkPropsData(Node node)
void unhandledInput(Tuple input)
LinkOperationsBolt(Auth neoAuth)