Open Kilda Java Documentation
NeoOperationsBolt.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.nbworker.bolts;
17 
22 
23 import org.apache.storm.task.OutputCollector;
24 import org.apache.storm.task.TopologyContext;
25 import org.apache.storm.tuple.Tuple;
26 import org.apache.storm.tuple.Values;
27 import org.neo4j.driver.v1.AccessMode;
28 import org.neo4j.driver.v1.Driver;
29 import org.neo4j.driver.v1.Session;
30 import org.slf4j.Logger;
31 
32 import java.util.List;
33 import java.util.Map;
34 
35 public abstract class NeoOperationsBolt extends AbstractBolt {
36 
37  private Driver driver;
38  private final Auth neoAuth;
39 
40  NeoOperationsBolt(Auth neoAuth) {
41  this.neoAuth = neoAuth;
42  }
43 
44  @Override
45  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
46  this.driver = neoAuth.getDriver();
47  super.prepare(stormConf, context, collector);
48  }
49 
50  protected void handleInput(Tuple input) {
51  BaseRequest request = (BaseRequest) input.getValueByField("request");
52  final String correlationId = input.getStringByField("correlationId");
53  getLogger().debug("Received operation request");
54 
55  try (Session session = driver.session(request.isReadRequest() ? AccessMode.READ : AccessMode.WRITE)) {
56  List<? extends InfoData> result = processRequest(input, request, session);
57  getOutput().emit(input, new Values(result, correlationId));
58  }
59  }
60 
61  abstract List<? extends InfoData> processRequest(Tuple tuple, BaseRequest request, Session session);
62 
63  abstract Logger getLogger();
64 
65  @Override
66  public void cleanup() {
67  driver.close();
68  }
69 }
void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
list result
Definition: plan-d.py:72