Open Kilda Java Documentation
VerificationWaitRecord.java
Go to the documentation of this file.
1 /* Copyright 2018 Telstra Open Source
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 package org.openkilda.wfm.topology.flow.model;
17 
27 
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.UUID;
31 import java.util.stream.Collectors;
32 
33 public class VerificationWaitRecord {
34  private final Constants constants = Constants.instance;
35 
36  private final long createTime;
37  private final String correlationId;
38  private final FlowVerificationResponse.FlowVerificationResponseBuilder response;
39 
40  private final HashMap<UUID, PendingRecord> pendingRequests = new HashMap<>();
41 
42  public VerificationWaitRecord(FlowVerificationRequest request, BidirectionalFlow biFlow, String correlationId) {
43  this.createTime = System.currentTimeMillis();
44  this.correlationId = correlationId;
45 
46  this.response = FlowVerificationResponse.builder();
47  this.response.flowId(biFlow.getFlowId());
48 
49  addPending(request, biFlow.getForward(), FlowDirection.FORWARD);
50  addPending(request, biFlow.getReverse(), FlowDirection.REVERSE);
51  }
52 
56  public boolean consumeResponse(UniFlowVerificationResponse payload) {
57  PendingRecord pending = pendingRequests.remove(payload.getPacketId());
58  if (pending == null) {
59  return false;
60  }
61 
62  saveUniResponse(pending, payload);
63  return true;
64  }
65 
67  return response.build();
68  }
69 
74  UniFlowVerificationResponse errorResponse;
75  for (UUID packetId : pendingRequests.keySet()) {
76  PendingRecord pending = pendingRequests.get(packetId);
77 
78  errorResponse = new UniFlowVerificationResponse(pending.request, errorCode);
79  saveUniResponse(pending, errorResponse);
80  }
81  pendingRequests.clear();
82  }
83 
84  public boolean isFilled() {
85  return pendingRequests.size() == 0;
86  }
87 
91  public boolean isOutdated(long currentTime) {
92  long outdatedLimit = currentTime - constants.getVerificationRequestTimeoutMillis();
93  if (outdatedLimit < 0) {
94  outdatedLimit = 0;
95  }
96 
97  return createTime < outdatedLimit;
98  }
99 
103  public List<UniFlowVerificationRequest> getPendingRequests() {
104  return pendingRequests.values().stream()
105  .map(pending -> pending.request)
106  .collect(Collectors.toList());
107  }
108 
109  public String getCorrelationId() {
110  return correlationId;
111  }
112 
113  private void addPending(FlowVerificationRequest request, Flow flow, FlowDirection direction) {
114  UniFlowVerificationRequest payload = new UniFlowVerificationRequest(request, flow, direction);
115  PendingRecord pending = new PendingRecord(direction, payload);
116  pendingRequests.put(payload.getPacketId(), pending);
117  }
118 
119  private void saveUniResponse(PendingRecord pending, UniFlowVerificationResponse payload) {
120  switch (pending.direction) {
121  case FORWARD:
122  response.forward(payload);
123  break;
124  case REVERSE:
125  response.reverse(payload);
126  break;
127  default:
128  throw new IllegalArgumentException(
129  String.format("Unhandled flow direction value: %s", pending.direction));
130  }
131  }
132 
133  private static class PendingRecord {
134  final FlowDirection direction;
135  final UniFlowVerificationRequest request;
136 
137  PendingRecord(FlowDirection direction, UniFlowVerificationRequest request) {
138  this.direction = direction;
139  this.request = request;
140  }
141  }
142 }
void fillPendingWithError(FlowVerificationErrorCode errorCode)
VerificationWaitRecord(FlowVerificationRequest request, BidirectionalFlow biFlow, String correlationId)
boolean consumeResponse(UniFlowVerificationResponse payload)