Open Kilda Java Documentation
FlowCache.java
Go to the documentation of this file.
1 /* Copyright 2017 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.pce.cache;
17 
28 import org.openkilda.pce.Utils;
29 
30 import com.google.common.annotations.VisibleForTesting;
31 import com.google.common.base.MoreObjects;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.LinkedList;
38 import java.util.Map;
39 import java.util.Objects;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.function.Supplier;
43 import java.util.stream.Collectors;
44 import java.util.stream.Stream;
45 
46 public class FlowCache extends Cache {
50  private static final Logger logger = LoggerFactory.getLogger(FlowCache.class);
51 
55  @VisibleForTesting
56  final ResourceCache resourceCache = new ResourceCache();
57 
61  private final Map<String, ImmutablePair<Flow, Flow>> flowPool = new ConcurrentHashMap<>();
62 
68  public void load(Set<ImmutablePair<Flow, Flow>> flows) {
69  logger.debug("Flows: {}", flows);
70  flows.forEach(this::putFlow);
71  }
72 
76  public void clear() {
77  flowPool.clear();
78  resourceCache.clear();
79  }
80 
88  return flowPool.put(flow.getLeft().getFlowId(), flow);
89  }
90 
97  public ImmutablePair<Flow, Flow> removeFlow(String flowId) {
98  return flowPool.remove(flowId);
99  }
100 
107  resourceCache.allocateFlow(flow);
108  putFlow(flow);
109  }
110 
117  public boolean cacheContainsFlow(String flowId) {
118  logger.debug("Is flow {} in cache", flowId);
119 
120  return flowPool.containsKey(flowId);
121  }
122 
129  public Set<ImmutablePair<Flow, Flow>> getActiveFlowsWithAffectedPath(SwitchId switchId) {
130  return flowPool.values().stream().filter(flow ->
131  flow.getLeft().getFlowPath().getPath().stream()
132  .anyMatch(node -> node.getSwitchId().equals(switchId))
133  || flow.getRight().getFlowPath().getPath().stream()
134  .anyMatch(node -> node.getSwitchId().equals(switchId))
135  || isOneSwitchFlow(flow) && flow.getLeft().getSourceSwitch().equals(switchId))
136  .filter(flow -> flow.getLeft().getState().isActiveOrCached())
137  .collect(Collectors.toSet());
138  }
139 
146  public Set<ImmutablePair<Flow, Flow>> getActiveFlowsWithAffectedPath(IslInfoData islData) {
147  return flowPool.values().stream()
148  .filter(flow -> flow.getLeft().getFlowPath().getPath().contains(islData.getPath().get(0))
149  || flow.getRight().getFlowPath().getPath().contains(islData.getPath().get(0)))
150  .filter(flow -> flow.getLeft().getState().isActiveOrCached())
151  .collect(Collectors.toSet());
152  }
153 
160  public Set<ImmutablePair<Flow, Flow>> getActiveFlowsWithAffectedPath(PortInfoData portData) {
161  PathNode node = new PathNode(portData.getSwitchId(), portData.getPortNo(), 0);
162  return flowPool.values().stream()
163  .filter(flow -> flow.getLeft().getFlowPath().getPath().contains(node)
164  || flow.getRight().getFlowPath().getPath().contains(node))
165  .filter(flow -> flow.getLeft().getState().isActiveOrCached())
166  .collect(Collectors.toSet());
167  }
168 
175  public Set<ImmutablePair<Flow, Flow>> getFlowsWithAffectedPath(SwitchId switchId) {
176  return flowPool.values().stream().filter(flow ->
177  flow.getLeft().getFlowPath().getPath().stream()
178  .anyMatch(node -> node.getSwitchId().equals(switchId))
179  || flow.getRight().getFlowPath().getPath().stream()
180  .anyMatch(node -> node.getSwitchId().equals(switchId))
181  || isOneSwitchFlow(flow) && flow.getLeft().getSourceSwitch().equals(switchId))
182  .collect(Collectors.toSet());
183  }
184 
191  public Set<ImmutablePair<Flow, Flow>> getFlowsWithAffectedPath(IslInfoData islData) {
192  return flowPool.values().stream()
193  .filter(flow -> flow.getLeft().getFlowPath().getPath().contains(islData.getPath().get(0))
194  || flow.getRight().getFlowPath().getPath().contains(islData.getPath().get(0)))
195  .collect(Collectors.toSet());
196  }
197 
204  public Set<ImmutablePair<Flow, Flow>> getFlowsWithAffectedPath(PortInfoData portData) {
205  PathNode node = new PathNode(portData.getSwitchId(), portData.getPortNo(), 0);
206  return flowPool.values().stream().filter(flow ->
207  flow.getLeft().getFlowPath().getPath().contains(node)
208  || flow.getRight().getFlowPath().getPath().contains(node))
209  .collect(Collectors.toSet());
210  }
211 
218  public Map<String, String> getFlowsWithAffectedEndpoint(SwitchId switchId) {
219  Map<String, String> response = new HashMap<>();
220 
221  for (ImmutablePair<Flow, Flow> flow : flowPool.values()) {
222  SwitchId endpoint = getFlowLinkedEndpoint(flow, switchId);
223  if (endpoint != null) {
224  response.put(flow.getLeft().getFlowId(), endpoint.toString());
225  }
226  }
227 
228  return response;
229  }
230 
238  return new ImmutablePair<>(getFlow(flowId).left.getFlowPath(), getFlow(flowId).right.getFlowPath());
239  }
240 
247  public ImmutablePair<Flow, Flow> getFlow(String flowId) {
248  logger.debug("Get {} flow", flowId);
249 
250  ImmutablePair<Flow, Flow> flow = flowPool.get(flowId);
251  if (flow == null) {
252  // TODO: Is this really an exception? Should we just return null or empty?
253  // Feels like the caller should address this, and anticipate empty.
254  throw new CacheException(ErrorType.NOT_FOUND, "Can not get flow",
255  String.format("Flow %s not found", flowId));
256  }
257 
258  return flow;
259  }
260 
269  String flowId = flow.getFlowId();
270  logger.debug("Create {} flow with {} parameters", flowId, flow);
271 
272  ImmutablePair<Flow, Flow> oldFlow = flowPool.get(flowId);
273  if (oldFlow != null) {
274  throw new CacheException(ErrorType.ALREADY_EXISTS, "Can not create flow",
275  String.format("Flow %s already exists", flowId));
276  }
277 
278  ImmutablePair<Flow, Flow> newFlow = buildFlow(flow, path);
279  resourceCache.allocateFlow(newFlow);
280  flowPool.put(flowId, newFlow);
281 
282  return newFlow;
283  }
284 
291  public ImmutablePair<Flow, Flow> deleteFlow(String flowId) {
292  logger.debug("Delete {} flow", flowId);
293 
294  ImmutablePair<Flow, Flow> flow = flowPool.remove(flowId);
295  if (flow == null) {
296  throw new CacheException(ErrorType.NOT_FOUND, "Can not delete flow",
297  String.format("Flow %s not found", flowId));
298  }
299 
300  resourceCache.deallocateFlow(flow);
301 
302  return flow;
303  }
304 
313  String flowId = flow.getFlowId();
314  logger.debug("Update {} flow with {} parameters", flowId, flow);
315 
316  ImmutablePair<Flow, Flow> oldFlow = flowPool.remove(flowId);
317  if (oldFlow == null) {
318  throw new CacheException(ErrorType.NOT_FOUND, "Can not update flow",
319  String.format("Flow %s not found", flowId));
320  }
321 
323  try {
324  newFlow = buildFlow(flow, path);
325  resourceCache.deallocateFlow(oldFlow);
326 
327  resourceCache.allocateFlow(newFlow);
328  flowPool.put(flowId, newFlow);
329  } catch (Throwable e) {
330  flowPool.put(flowId, oldFlow);
331  throw e;
332  }
333 
334  return newFlow;
335  }
336 
342  public Set<ImmutablePair<Flow, Flow>> dumpFlows() {
343  logger.debug("Get all flows");
344  return new HashSet<>(flowPool.values());
345  }
346 
354  public Set<PathNode> getPathIntersection(PathInfoData firstPath, PathInfoData secondPath) {
355  logger.debug("Get single path intersection between {} and {}", firstPath, secondPath);
356  Set<PathNode> intersection = new HashSet<>(firstPath.getPath());
357  intersection.retainAll(secondPath.getPath());
358  return intersection;
359  }
360 
371  logger.debug("Get path intersection between {} and {}", firstPath, secondPath);
372 
373  Set<PathNode> forwardIntersection = getPathIntersection(firstPath.left, secondPath.left);
374  Set<PathNode> reverseIntersection = getPathIntersection(firstPath.right, secondPath.right);
375 
376  ImmutablePair<Set<PathNode>, Set<PathNode>> intersection =
377  new ImmutablePair<>(forwardIntersection, reverseIntersection);
378 
379  logger.debug("Path intersection is {}", intersection);
380 
381  return intersection;
382  }
383 
387  private ImmutablePair<Flow, Flow> buildFlow(final Flow flow,
389  String timestamp = Utils.getIsoTimestamp();
390  int cookie = resourceCache.allocateCookie();
391 
392  /*
393  * If either side is a SingleSwitchFlow .. don't allocate a vlan.
394  */
395  int forwardVlan;
396  int reverseVlan;
397  if (!flow.isOneSwitchFlow()) {
398  forwardVlan = resourceCache.allocateVlanId();
399  reverseVlan = resourceCache.allocateVlanId();
400  } else {
401  forwardVlan = reverseVlan = 0;
402  }
403 
404  Flow.FlowBuilder forwardBuilder = Flow.builder()
405  .flowId(flow.getFlowId())
406  .cookie(cookie | ResourceCache.FORWARD_FLOW_COOKIE_MASK)
407  .description(flow.getDescription())
408  .lastUpdated(timestamp)
409  .sourceSwitch(flow.getSourceSwitch())
410  .destinationSwitch(flow.getDestinationSwitch())
411  .sourcePort(flow.getSourcePort())
412  .destinationPort(flow.getDestinationPort())
413  .sourceVlan(flow.getSourceVlan())
414  .destinationVlan(flow.getDestinationVlan())
415  .transitVlan(forwardVlan)
416  .flowPath(path.getLeft())
417  .state(FlowState.ALLOCATED);
418  setBandwidthAndMeter(forwardBuilder, flow.getBandwidth(), flow.isIgnoreBandwidth(),
419  () -> resourceCache.allocateMeterId(flow.getSourceSwitch()));
420  Flow forward = forwardBuilder.build();
421 
422  Flow.FlowBuilder reverseBuilder = Flow.builder()
423  .flowId(flow.getFlowId())
424  .cookie(cookie | ResourceCache.REVERSE_FLOW_COOKIE_MASK)
425  .description(flow.getDescription())
426  .lastUpdated(timestamp)
427  .sourceSwitch(flow.getDestinationSwitch())
428  .destinationSwitch(flow.getSourceSwitch())
429  .sourcePort(flow.getDestinationPort())
430  .destinationPort(flow.getSourcePort())
431  .sourceVlan(flow.getDestinationVlan())
432  .destinationVlan(flow.getSourceVlan())
433  .transitVlan(reverseVlan)
434  .flowPath(path.getRight())
435  .state(FlowState.ALLOCATED);
436  setBandwidthAndMeter(reverseBuilder, flow.getBandwidth(), flow.isIgnoreBandwidth(),
437  () -> resourceCache.allocateMeterId(flow.getDestinationSwitch()));
438  Flow reverse = reverseBuilder.build();
439 
440  return new ImmutablePair<>(forward, reverse);
441  }
442 
443  private void setBandwidthAndMeter(Flow.FlowBuilder builder, long bandwidth, boolean isIgnoreBandwidth,
444  Supplier<Integer> meterIdSupplier) {
445  builder.bandwidth(bandwidth);
446 
447  if (bandwidth > 0L) {
448  builder.ignoreBandwidth(isIgnoreBandwidth);
449  builder.meterId(meterIdSupplier.get());
450  } else {
451  // When the flow is unmetered.
452  builder.ignoreBandwidth(true);
453  builder.meterId(0);
454  }
455  }
456 
469  return flow.getLeft().getSourceSwitch().equals(flow.getLeft().getDestinationSwitch())
470  && flow.getRight().getSourceSwitch().equals(flow.getRight().getDestinationSwitch());
471  }
472 
480  private SwitchId getFlowLinkedEndpoint(ImmutablePair<Flow, Flow> flow, SwitchId switchId) {
481  Flow forward = flow.getLeft();
482  Flow reverse = flow.getRight();
483  SwitchId linkedSwitch = null;
484 
485  if (forward.getSourceSwitch().equals(switchId) && reverse.getDestinationSwitch().equals(switchId)) {
486  linkedSwitch = forward.getDestinationSwitch();
487  } else if (forward.getDestinationSwitch().equals(switchId) && reverse.getSourceSwitch().equals(switchId)) {
488  linkedSwitch = forward.getSourceSwitch();
489  }
490  return linkedSwitch;
491  }
492 
500  public Set<Flow> getFlowsForEndpoint(SwitchId switchId, int port) {
501  return flowPool.values().stream()
502  .flatMap(pair -> Stream.of(pair.getLeft(), pair.getRight()))
503  .filter(flow -> flow.getSourceSwitch().equals(switchId) && flow.getSourcePort() == port
504  || flow.getDestinationSwitch().equals(switchId) && flow.getDestinationPort() == port)
505  .collect(Collectors.toSet());
506  }
507 
520  public Set<Flow> getFlowsForEndpoint(SwitchId switchId, int port, int vlan) {
521  return flowPool.values().stream()
522  .flatMap(pair -> Stream.of(pair.getLeft(), pair.getRight()))
523  .filter(flow -> flow.getSourceSwitch().equals(switchId) && flow.getSourcePort() == port
524  && (flow.getSourceVlan() == vlan || flow.getSourceVlan() == 0)
525  || flow.getDestinationSwitch().equals(switchId) && flow.getDestinationPort() == port
526  && (flow.getDestinationVlan() == vlan || flow.getDestinationVlan() == 0))
527  .collect(Collectors.toSet());
528  }
529 
530 
534  public Set<ImmutablePair<Flow, Flow>> getIngressAndEgressFlows(SwitchId switchId) {
535  return flowPool.values().stream()
536  .filter(flowPair -> Objects.nonNull(getFlowLinkedEndpoint(flowPair, switchId)))
537  .collect(Collectors.toSet());
538  }
539 
540 
541  public Set<Integer> getAllocatedVlans() {
542  return resourceCache.getAllVlanIds();
543  }
544 
545  public Set<Integer> getAllocatedCookies() {
546  return resourceCache.getAllCookies();
547  }
548 
549  public Map<SwitchId, Set<Integer>> getAllocatedMeters() {
550  return resourceCache.getAllMeterIds();
551  }
552 
556  @Override
557  public String toString() {
558  return MoreObjects.toStringHelper(this)
559  .add("resources", resourceCache)
560  .add("flows", flowPool)
561  .toString();
562  }
563 }
void pushFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:106
Set< ImmutablePair< Flow, Flow > > getActiveFlowsWithAffectedPath(IslInfoData islData)
Definition: FlowCache.java:146
ImmutablePair< PathInfoData, PathInfoData > getFlowPath(String flowId)
Definition: FlowCache.java:237
void load(Set< ImmutablePair< Flow, Flow >> flows)
Definition: FlowCache.java:68
ImmutablePair< Flow, Flow > removeFlow(String flowId)
Definition: FlowCache.java:97
Set< ImmutablePair< Flow, Flow > > getIngressAndEgressFlows(SwitchId switchId)
Definition: FlowCache.java:534
Map< String, String > getFlowsWithAffectedEndpoint(SwitchId switchId)
Definition: FlowCache.java:218
ImmutablePair< Flow, Flow > getFlow(String flowId)
Definition: FlowCache.java:247
Set< Flow > getFlowsForEndpoint(SwitchId switchId, int port)
Definition: FlowCache.java:500
Set< PathNode > getPathIntersection(PathInfoData firstPath, PathInfoData secondPath)
Definition: FlowCache.java:354
static String getIsoTimestamp()
Definition: Utils.java:34
Map< SwitchId, Set< Integer > > getAllocatedMeters()
Definition: FlowCache.java:549
Set< Integer > getAllocatedVlans()
Definition: FlowCache.java:541
Set< ImmutablePair< Flow, Flow > > dumpFlows()
Definition: FlowCache.java:342
ImmutablePair< Flow, Flow > updateFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
Definition: FlowCache.java:312
ImmutablePair< Flow, Flow > putFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:87
Set< Flow > getFlowsForEndpoint(SwitchId switchId, int port, int vlan)
Definition: FlowCache.java:520
Set< ImmutablePair< Flow, Flow > > getActiveFlowsWithAffectedPath(PortInfoData portData)
Definition: FlowCache.java:160
Set< ImmutablePair< Flow, Flow > > getFlowsWithAffectedPath(IslInfoData islData)
Definition: FlowCache.java:191
ImmutablePair< Flow, Flow > deleteFlow(String flowId)
Definition: FlowCache.java:291
Set< ImmutablePair< Flow, Flow > > getActiveFlowsWithAffectedPath(SwitchId switchId)
Definition: FlowCache.java:129
Set< ImmutablePair< Flow, Flow > > getFlowsWithAffectedPath(SwitchId switchId)
Definition: FlowCache.java:175
ImmutablePair< Flow, Flow > createFlow(Flow flow, ImmutablePair< PathInfoData, PathInfoData > path)
Definition: FlowCache.java:268
Set< Integer > getAllocatedCookies()
Definition: FlowCache.java:545
boolean isOneSwitchFlow(ImmutablePair< Flow, Flow > flow)
Definition: FlowCache.java:468
Set< ImmutablePair< Flow, Flow > > getFlowsWithAffectedPath(PortInfoData portData)
Definition: FlowCache.java:204
boolean cacheContainsFlow(String flowId)
Definition: FlowCache.java:117
ImmutablePair< Set< PathNode >, Set< PathNode > > getPathIntersection(ImmutablePair< PathInfoData, PathInfoData > firstPath, ImmutablePair< PathInfoData, PathInfoData > secondPath)
Definition: FlowCache.java:368
boolean equals(Object object)
Definition: Flow.java:351