16 package org.openkilda.wfm.topology.flow.model;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.UUID;
31 import java.util.stream.Collectors;
36 private final long createTime;
37 private final String correlationId;
40 private final HashMap<UUID, PendingRecord> pendingRequests =
new HashMap<>();
43 this.createTime = System.currentTimeMillis();
44 this.correlationId = correlationId;
47 this.response.flowId(biFlow.getFlowId());
57 PendingRecord pending = pendingRequests.remove(payload.
getPacketId());
58 if (pending == null) {
62 saveUniResponse(pending, payload);
67 return response.build();
75 for (UUID packetId : pendingRequests.keySet()) {
76 PendingRecord pending = pendingRequests.get(packetId);
79 saveUniResponse(pending, errorResponse);
81 pendingRequests.clear();
85 return pendingRequests.size() == 0;
92 long outdatedLimit = currentTime - constants.getVerificationRequestTimeoutMillis();
93 if (outdatedLimit < 0) {
97 return createTime < outdatedLimit;
104 return pendingRequests.values().stream()
105 .map(pending -> pending.request)
106 .collect(Collectors.toList());
110 return correlationId;
115 PendingRecord pending =
new PendingRecord(direction, payload);
116 pendingRequests.put(payload.getPacketId(), pending);
119 private void saveUniResponse(PendingRecord pending, UniFlowVerificationResponse payload) {
120 switch (pending.direction) {
122 response.forward(payload);
125 response.reverse(payload);
128 throw new IllegalArgumentException(
129 String.format(
"Unhandled flow direction value: %s", pending.direction));
133 private static class PendingRecord {
134 final FlowDirection direction;
135 final UniFlowVerificationRequest request;
137 PendingRecord(FlowDirection direction, UniFlowVerificationRequest request) {
138 this.direction = direction;
139 this.request = request;
boolean isOutdated(long currentTime)
static Constants instance
List< UniFlowVerificationRequest > getPendingRequests()
void fillPendingWithError(FlowVerificationErrorCode errorCode)
VerificationWaitRecord(FlowVerificationRequest request, BidirectionalFlow biFlow, String correlationId)
String getCorrelationId()
boolean consumeResponse(UniFlowVerificationResponse payload)
FlowVerificationResponse produce()