15 package org.openkilda.pce.janitor;
17 import org.apache.commons.cli.*;
18 import org.neo4j.driver.v1.*;
19 import sun.misc.BASE64Encoder;
20 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
22 import javax.ws.rs.client.*;
23 import javax.ws.rs.core.MediaType;
24 import javax.ws.rs.core.Response;
26 import java.util.concurrent.TimeUnit;
38 "MATCH (:switch) -[rel:flow]-> (:switch)" +
39 " WITH rel.cookie as affected_cookie, COUNT(rel.cookie) as cookie_num" +
40 " WHERE cookie_num > 1" +
41 " MATCH (:switch) -[rel2:flow]-> (:switch)" +
42 " WHERE rel2.cookie = affected_cookie" +
43 " RETURN affected_cookie, rel2.flowid as affected_flow_id" +
44 " ORDER BY affected_cookie ";
50 "MATCH (:switch) -[rel:flow]-> (:switch)" +
51 " WITH rel.transit_vlan as affected_cookie, COUNT(rel.transit_vlan) as cookie_num" +
52 " WHERE cookie_num > 1" +
53 " MATCH (:switch) -[rel2:flow]-> (:switch)" +
54 " WHERE rel2.transit_vlan = affected_cookie and rel2.transit_vlan > 0" +
55 " RETURN affected_cookie, rel.transit_vlan AS transit_vlan, rel2.flowid as affected_flow_id" +
56 " ORDER BY affected_cookie";
62 "MATCH (:switch) -[rel:flow]-> (:switch)"+
63 " WITH rel.flowid as affected_flow_id, COUNT(rel.flowid) as flow_num"+
64 " WHERE flow_num > 2"+
65 " MATCH (:switch) -[rel2:flow]-> (:switch)"+
66 " WHERE rel2.flowid = affected_flow_id"+
67 " RETURN affected_flow_id, rel2.cookie as affected_flow_cookie" +
68 " ORDER BY affected_flow_id";
74 "MATCH (:switch) -[rel:flow]-> (:switch)"+
75 " WHERE rel.flowid = %s AND rel.cookie = %d"+
78 public static final class Config {
79 public String neo_url;
80 public String neo_user;
81 public String neo_pswd;
83 public String nb_user;
84 public String nb_pswd;
94 throw new NotImplementedException();
102 throw new NotImplementedException();
107 String authString =
config.nb_user +
":" +
config.nb_pswd;
108 String authStringEnc =
new BASE64Encoder().encode(authString.getBytes());
110 Client client = ClientBuilder.newClient();
112 for (String flowid : flowsToUpdate){
117 TimeUnit.SECONDS.sleep(1);
118 }
catch (Exception e) {}
120 System.out.println(
"RUNNING: flowid = " + flowid);
122 WebTarget webTarget = client.target(
config.nb_url +
"/api/v1/flows").path(flowid);
123 Invocation.Builder invocationBuilder =
124 webTarget.request(MediaType.APPLICATION_JSON)
125 .header(
"Authorization",
"Basic " + authStringEnc);
127 Response response = invocationBuilder.get(Response.class);
129 if (response.getStatus() != 200) {
130 throw new RuntimeException(
"Failed : HTTP error code : " 131 + response.getStatus());
134 String output = response.readEntity(String.class);
139 String[] split = output.split(
"PUSHED FLOW");
140 if (split.length == 2)
141 output = split[0] +
"PUSHED FLOW. FIX cookie dupe." + split[1];
145 response = invocationBuilder.put(Entity.entity(output,MediaType.APPLICATION_JSON));
147 if (response.getStatus() != 200) {
148 System.out.println(
"FAILURE: flowid = " + flowid +
"; response = " + response.getStatus());
159 Options options =
new Options();
160 options.addOption(Option.builder(
"url").required(
true).hasArg()
161 .desc(
"The URL of the Neo4J DB - i.e. bolt://neo..:7474").build());
162 options.addOption(Option.builder(
"u").required(
true).hasArg().longOpt(
"user")
163 .desc(
"The Neo4J username - e.g. neo4j").build());
164 options.addOption(Option.builder(
"p").required(
true).hasArg().longOpt(
"password")
165 .desc(
"The Neo4J password - e.g. neo4j").build());
167 options.addOption(Option.builder(
"nburl").required(
true).hasArg()
168 .desc(
"The URL of the Neo4J DB - i.e. http://northboud..:8080").build());
169 options.addOption(Option.builder(
"nbu").required(
true).hasArg().longOpt(
"user")
170 .desc(
"The Neo4J username - e.g. kilda").build());
171 options.addOption(Option.builder(
"nbp").required(
true).hasArg().longOpt(
"password")
172 .desc(
"The Neo4J password - e.g. kilda").build());
174 options.addOption(Option.builder(
"a").required(
true).hasArg().longOpt(
"action")
175 .desc(
"The action to take - e.g. CountDuplicateCookies").build());
176 options.addOption(Option.builder(
"v").required(
false).longOpt(
"verbose")
177 .desc(
"Where appropriate, return a verbose response").build());
179 CommandLine commandLine;
180 CommandLineParser
parser =
new DefaultParser();
181 Driver driver = null;
186 config.neo_url = commandLine.getOptionValue(
"url");
187 config.neo_user = commandLine.getOptionValue(
"u");
188 config.neo_pswd = commandLine.getOptionValue(
"p");
189 config.nb_url = commandLine.getOptionValue(
"nburl");
190 config.nb_user = commandLine.getOptionValue(
"nbu");
191 config.nb_pswd = commandLine.getOptionValue(
"nbp");
192 config.action = commandLine.getOptionValue(
"a");
194 driver = GraphDatabase.driver(
config.neo_url, AuthTokens.basic(
config.neo_user,
config.neo_pswd));
196 if (
config.action.equals(
"DeDupeFlows")) {
198 Session session = driver.session();
200 Map<String,List<Long>> flowsToUpdate =
new HashMap<>();
201 for (Record record :
result.list()) {
202 String flowid = record.get(
"affected_flow_id").asString();
203 List<Long> priors = flowsToUpdate.computeIfAbsent(flowid, empty ->
new ArrayList<>());
204 priors.add(record.get(
"affected_flow_cookie").asLong());
207 System.out.println(
"flowsToUpdate.size() = " + flowsToUpdate.size());
208 System.out.println(
"flowsToUpdate = " + flowsToUpdate);
210 System.out.println(
"Will De-Dupe the Flows");
212 String authString =
config.nb_user +
":" +
config.nb_pswd;
213 String authStringEnc =
new BASE64Encoder().encode(authString.getBytes());
214 Client client = ClientBuilder.newClient();
216 for (String flowid : flowsToUpdate.keySet()){
221 TimeUnit.SECONDS.sleep(1);
222 }
catch (Exception e) {}
224 System.out.println(
"RUNNING: flowid = " + flowid);
226 WebTarget webTarget = client.target(
config.nb_url +
"flows").path(flowid);
227 Invocation.Builder invocationBuilder =
228 webTarget.request(MediaType.APPLICATION_JSON)
229 .header(
"Authorization",
"Basic " + authStringEnc);
231 Response response = invocationBuilder.get(Response.class);
233 if (response.getStatus() != 200) {
234 throw new RuntimeException(
"Failed : HTTP error code : " 235 + response.getStatus());
238 String output = response.readEntity(String.class);
239 System.out.println(
"output = " + output);
246 Session session = driver.session();
248 List<String> flowsToUpdate =
new ArrayList<>();
249 for (Record record :
result.list()) {
250 flowsToUpdate.add(record.get(
"affected_flow_id").asString());
253 System.out.println(
"flowsToUpdate.size() = " + flowsToUpdate.size());
254 System.out.println(
"flowsToUpdate = " + flowsToUpdate);
263 }
catch (ParseException exception) {
264 System.out.print(
"Parse error: ");
265 System.out.println(exception.getMessage());
267 HelpFormatter formatter =
new HelpFormatter();
268 formatter.printHelp(
"FlowJanitor", options );
List< String > FlowsWithDuplicateCookies(boolean verbose)
static final void updateFlows(FlowJanitor.Config config, List< String > flowsToUpdate)
static final String DUPLICATE_VLAN_QUERY
static void main(String[] args)
static final String DUPLICATE_COOKIES_QUERY
int CountDuplicateCookies()
static final String DELETE_DUPLICATE_FLOW
static final String DUPLICATE_FLOWS_QUERY