package org.apache.nifi.cluster.coordination.http.replication;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.http.SecurityCookieName;
import org.apache.nifi.web.security.http.SecurityHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.class */
public class ThreadPoolRequestReplicator implements RequestReplicator {
    private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
    private final int maxConcurrentRequests;
    private final HttpResponseMapper responseMapper;
    private final EventReporter eventReporter;
    private final RequestCompletionCallback callback;
    private final ClusterCoordinator clusterCoordinator;
    private final NiFiProperties nifiProperties;
    private ThreadPoolExecutor executorService;
    private ScheduledExecutorService maintenanceExecutor;
    private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap();
    private final ConcurrentMap<NodeIdentifier, AtomicInteger> sequentialLongRequestCounts = new ConcurrentHashMap();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();
    private HttpReplicationClient httpClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator$NodeHttpRequest.class */
    public class NodeHttpRequest implements Runnable {
        private final NodeIdentifier nodeId;
        private final String method;
        private final URI uri;
        private final NodeRequestCompletionCallback callback;
        private final StandardAsyncClusterResponse clusterResponse;
        private final long creationNanos;
        private final PreparedRequest request;

        private NodeHttpRequest(PreparedRequest preparedRequest, NodeIdentifier nodeIdentifier, URI uri, NodeRequestCompletionCallback nodeRequestCompletionCallback, StandardAsyncClusterResponse standardAsyncClusterResponse) {
            this.creationNanos = System.nanoTime();
            this.request = preparedRequest;
            this.nodeId = nodeIdentifier;
            this.method = preparedRequest.getMethod();
            this.uri = uri;
            this.callback = nodeRequestCompletionCallback;
            this.clusterResponse = standardAsyncClusterResponse;
        }

        @Override // java.lang.Runnable
        public void run() {
            NodeResponse nodeResponse;
            this.clusterResponse.addTiming("Wait for HTTP Request Replication to be triggered", this.nodeId.toString(), System.nanoTime() - this.creationNanos);
            try {
                String str = this.request.getHeaders().get("x-nifi-request-id");
                ThreadPoolRequestReplicator.logger.debug("Replicating request {} {} to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                nodeResponse = ThreadPoolRequestReplicator.this.replicateRequest(this.request, this.nodeId, this.uri, str, this.clusterResponse);
            } catch (Throwable th) {
                nodeResponse = new NodeResponse(this.nodeId, this.method, this.uri, th);
                ThreadPoolRequestReplicator.logger.warn("Failed to replicate request {} {} to {} due to {}", new Object[]{this.method, this.uri.getPath(), this.nodeId, th.toString()});
                ThreadPoolRequestReplicator.logger.warn("", th);
            }
            if (this.callback != null) {
                ThreadPoolRequestReplicator.logger.debug("Request {} {} completed for {}", new Object[]{this.method, this.uri.getPath(), this.nodeId});
                this.callback.onCompletion(nodeResponse);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator$NodeRequestCompletionCallback.class */
    public interface NodeRequestCompletionCallback {
        void onCompletion(NodeResponse nodeResponse);
    }

    public ThreadPoolRequestReplicator(int i, int i2, HttpReplicationClient httpReplicationClient, ClusterCoordinator clusterCoordinator, RequestCompletionCallback requestCompletionCallback, EventReporter eventReporter, NiFiProperties niFiProperties) {
        if (i < 2) {
            throw new IllegalArgumentException("Max Pool Size must be >= 2");
        }
        if (httpReplicationClient == null) {
            throw new IllegalArgumentException("Client may not be null.");
        }
        this.clusterCoordinator = clusterCoordinator;
        this.maxConcurrentRequests = i2;
        this.responseMapper = new StandardHttpResponseMapper(niFiProperties);
        this.eventReporter = eventReporter;
        this.callback = requestCompletionCallback;
        this.nifiProperties = niFiProperties;
        this.httpClient = httpReplicationClient;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.executorService = new ThreadPoolExecutor(i, i, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), runnable -> {
            Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
            newThread.setDaemon(true);
            newThread.setName("Replicate Request Thread-" + atomicInteger.incrementAndGet());
            return newThread;
        });
        this.executorService.allowCoreThreadTimeOut(true);
        this.maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable2) {
                Thread newThread = Executors.defaultThreadFactory().newThread(runnable2);
                newThread.setDaemon(true);
                newThread.setName(ThreadPoolRequestReplicator.class.getSimpleName() + " Maintenance Thread");
                return newThread;
            }
        });
        this.maintenanceExecutor.scheduleWithFixedDelay(() -> {
            purgeExpiredRequests();
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public void shutdown() {
        this.executorService.shutdown();
        this.maintenanceExecutor.shutdown();
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse replicate(String str, URI uri, Object obj, Map<String, String> map) {
        return replicate(NiFiUserUtils.getNiFiUser(), str, uri, obj, map);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse replicate(NiFiUser niFiUser, String str, URI uri, Object obj, Map<String, String> map) {
        Map connectionStates = this.clusterCoordinator.getConnectionStates();
        if (isMutableRequest(str, uri.getPath())) {
            List list = (List) connectionStates.get(NodeConnectionState.OFFLOADED);
            if (list != null && !list.isEmpty()) {
                if (list.size() == 1) {
                    throw new OffloadedNodeMutableRequestException("Node " + list.iterator().next() + " is currently offloaded");
                }
                throw new OffloadedNodeMutableRequestException(list.size() + " Nodes are currently offloaded");
            }
            List list2 = (List) connectionStates.get(NodeConnectionState.OFFLOADING);
            if (list2 != null && !list2.isEmpty()) {
                if (list2.size() == 1) {
                    throw new OffloadedNodeMutableRequestException("Node " + list2.iterator().next() + " is currently offloading");
                }
                throw new OffloadedNodeMutableRequestException(list2.size() + " Nodes are currently offloading");
            }
            List list3 = (List) connectionStates.get(NodeConnectionState.DISCONNECTED);
            if (list3 != null && !list3.isEmpty()) {
                if (list3.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + list3.iterator().next() + " is currently disconnected");
                }
                throw new DisconnectedNodeMutableRequestException(list3.size() + " Nodes are currently disconnected");
            }
            List list4 = (List) connectionStates.get(NodeConnectionState.DISCONNECTING);
            if (list4 != null && !list4.isEmpty()) {
                if (list4.size() == 1) {
                    throw new DisconnectedNodeMutableRequestException("Node " + list4.iterator().next() + " is currently disconnecting");
                }
                throw new DisconnectedNodeMutableRequestException(list4.size() + " Nodes are currently disconnecting");
            }
            List list5 = (List) connectionStates.get(NodeConnectionState.CONNECTING);
            if (list5 != null && !list5.isEmpty()) {
                if (list5.size() == 1) {
                    throw new ConnectingNodeMutableRequestException("Node " + list5.iterator().next() + " is currently connecting");
                }
                throw new ConnectingNodeMutableRequestException(list5.size() + " Nodes are currently connecting");
            }
        }
        List list6 = (List) connectionStates.get(NodeConnectionState.CONNECTED);
        if (list6 == null || list6.isEmpty()) {
            throw new NoConnectedNodesException();
        }
        return replicate(new HashSet(list6), niFiUser, str, uri, obj, map, true, true);
    }

    void updateRequestHeaders(Map<String, String> map, NiFiUser niFiUser) {
        if (niFiUser == null) {
            throw new AccessDeniedException("Unknown user");
        }
        map.put("X-ProxiedEntitiesChain", ProxiedEntitiesUtils.buildProxiedEntitiesChainString(niFiUser));
        map.put("X-ProxiedEntityGroups", ProxiedEntitiesUtils.buildProxiedEntityGroupsString(niFiUser.getIdentityProviderGroups()));
        map.remove(SecurityHeader.AUTHORIZATION.getHeader());
        removeCookie(map, this.nifiProperties.getKnoxCookieName());
        removeCookie(map, SecurityCookieName.AUTHORIZATION_BEARER.getName());
        removeCookie(map, SecurityCookieName.REQUEST_TOKEN.getName());
        map.remove("Host");
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse replicate(Set<NodeIdentifier> set, String str, URI uri, Object obj, Map<String, String> map, boolean z, boolean z2) {
        return replicate(set, NiFiUserUtils.getNiFiUser(), str, uri, obj, map, z, z2);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse replicate(Set<NodeIdentifier> set, NiFiUser niFiUser, String str, URI uri, Object obj, Map<String, String> map, boolean z, boolean z2) {
        AsyncClusterResponse replicate;
        HashMap hashMap = new HashMap(map);
        hashMap.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString());
        if (z) {
            hashMap.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
        }
        updateRequestHeaders(hashMap, niFiUser);
        if (!z) {
            return replicate(set, str, uri, obj, hashMap, z2, null, !z2, true, null);
        }
        Lock lock = isMutableRequest(str, uri.getPath()) ? this.writeLock : this.readLock;
        logger.debug("Obtaining lock {} in order to replicate request {} {}", new Object[]{lock, str, uri});
        lock.lock();
        try {
            logger.debug("Lock {} obtained in order to replicate request {} {}", new Object[]{lock, str, uri});
            Object obj2 = new Object();
            synchronized (obj2) {
                replicate = replicate(set, str, uri, obj, hashMap, z2, null, !z2, true, obj2);
                try {
                    obj2.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            lock.unlock();
            logger.debug("Unlocked {} after replication completed for {} {}", new Object[]{lock, str, uri});
            return replicate;
        } catch (Throwable th) {
            lock.unlock();
            logger.debug("Unlocked {} after replication completed for {} {}", new Object[]{lock, str, uri});
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse forwardToCoordinator(NodeIdentifier nodeIdentifier, String str, URI uri, Object obj, Map<String, String> map) {
        return forwardToCoordinator(nodeIdentifier, NiFiUserUtils.getNiFiUser(), str, uri, obj, map);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse forwardToCoordinator(NodeIdentifier nodeIdentifier, NiFiUser niFiUser, String str, URI uri, Object obj, Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        updateRequestHeaders(hashMap, niFiUser);
        return replicate(Collections.singleton(nodeIdentifier), str, uri, obj, hashMap, false, null, false, false, null);
    }

    AsyncClusterResponse replicate(Set<NodeIdentifier> set, String str, URI uri, Object obj, Map<String, String> map, boolean z, StandardAsyncClusterResponse standardAsyncClusterResponse, boolean z2, boolean z3, Object obj2) {
        try {
            Objects.requireNonNull(set);
            Objects.requireNonNull(str);
            Objects.requireNonNull(uri);
            Objects.requireNonNull(obj);
            Objects.requireNonNull(map);
            if (set.isEmpty()) {
                throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
            }
            for (NodeIdentifier nodeIdentifier : set) {
                NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(nodeIdentifier);
                if (connectionStatus == null) {
                    throw new UnknownNodeException("Node " + nodeIdentifier + " does not exist in this cluster");
                }
                if (connectionStatus.getState() != NodeConnectionState.CONNECTED) {
                    throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeIdentifier + " because the node is not connected");
                }
            }
            logger.debug("Replicating request {} {} with entity {} to {}; response is {}", new Object[]{str, uri, obj, set, standardAsyncClusterResponse});
            HashMap hashMap = new HashMap(map);
            String computeIfAbsent = hashMap.computeIfAbsent(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER, str2 -> {
                return UUID.randomUUID().toString();
            });
            long j = -1;
            if (z) {
                long nanoTime = System.nanoTime();
                verifyClusterState(str, uri.getPath());
                j = System.nanoTime() - nanoTime;
            }
            int size = this.responseMap.size();
            if (size >= this.maxConcurrentRequests) {
                size = purgeExpiredRequests();
            }
            if (size >= this.maxConcurrentRequests) {
                logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", new Object[]{str, uri.getPath(), Integer.valueOf(size), (Map) this.responseMap.values().stream().collect(Collectors.groupingBy((v0) -> {
                    return v0.getURIPath();
                }, Collectors.counting()))});
                throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + size + " outstanding requests");
            }
            if (standardAsyncClusterResponse == null) {
                standardAsyncClusterResponse = new StandardAsyncClusterResponse(computeIfAbsent, uri, str, set, this.responseMapper, asyncClusterResponse -> {
                    try {
                        onCompletedResponse(computeIfAbsent);
                        if (obj2 != null) {
                            synchronized (obj2) {
                                obj2.notify();
                            }
                            logger.debug("Notified monitor {} because request {} {} has completed", new Object[]{obj2, str, uri});
                        }
                    } catch (Throwable th) {
                        if (obj2 != null) {
                            synchronized (obj2) {
                                obj2.notify();
                                logger.debug("Notified monitor {} because request {} {} has completed", new Object[]{obj2, str, uri});
                            }
                        }
                        throw th;
                    }
                }, () -> {
                    onResponseConsumed(computeIfAbsent);
                }, z3);
                this.responseMap.put(computeIfAbsent, standardAsyncClusterResponse);
            }
            if (j > -1) {
                standardAsyncClusterResponse.addTiming("Verify Cluster State", "All Nodes", j);
            }
            logger.debug("For Request ID {}, response object is {}", computeIfAbsent, standardAsyncClusterResponse);
            boolean isMutableRequest = isMutableRequest(str, uri.getPath());
            if (isMutableRequest && z) {
                logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", computeIfAbsent);
                performVerification(set, str, uri, obj, hashMap, standardAsyncClusterResponse, z3, obj2);
                return standardAsyncClusterResponse;
            }
            if (isMutableRequest) {
                standardAsyncClusterResponse.setPhase(StandardAsyncClusterResponse.COMMIT_PHASE);
            }
            StandardAsyncClusterResponse standardAsyncClusterResponse2 = standardAsyncClusterResponse;
            NodeRequestCompletionCallback nodeRequestCompletionCallback = nodeResponse -> {
                logger.debug("Received response from {} for {} {}", new Object[]{nodeResponse.getNodeId(), str, uri.getPath()});
                standardAsyncClusterResponse2.add(nodeResponse);
            };
            if (isMutableRequest && z2) {
                hashMap.put(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true");
            }
            PreparedRequest prepareRequest = this.httpClient.prepareRequest(str, hashMap, obj);
            submitAsyncRequest(set, uri.getScheme(), uri.getPath(), nodeIdentifier2 -> {
                return new NodeHttpRequest(prepareRequest, nodeIdentifier2, createURI(uri, nodeIdentifier2), nodeRequestCompletionCallback, standardAsyncClusterResponse2);
            }, hashMap);
            return standardAsyncClusterResponse;
        } catch (Throwable th) {
            if (obj2 != null) {
                synchronized (obj2) {
                    obj2.notify();
                    logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", new Object[]{obj2, str, uri, th});
                }
            }
            if (standardAsyncClusterResponse != null) {
                standardAsyncClusterResponse.setFailure(th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException("Failed to submit Replication Request to background thread", th), new NodeIdentifier());
            }
            throw th;
        }
    }

    private void performVerification(final Set<NodeIdentifier> set, final String str, final URI uri, final Object obj, final Map<String, String> map, final StandardAsyncClusterResponse standardAsyncClusterResponse, final boolean z, final Object obj2) {
        logger.debug("Verifying that mutable request {} {} can be made", str, uri.getPath());
        HashMap hashMap = new HashMap(map);
        hashMap.put(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER, RequestReplicator.NODE_CONTINUE);
        final long nanoTime = System.nanoTime();
        final int size = set.size();
        NodeRequestCompletionCallback nodeRequestCompletionCallback = new NodeRequestCompletionCallback() { // from class: org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator.2
            final Set<NodeResponse> nodeResponses = Collections.synchronizedSet(new HashSet());

            @Override // org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator.NodeRequestCompletionCallback
            public void onCompletion(NodeResponse nodeResponse) {
                boolean z2;
                String str2;
                synchronized (this.nodeResponses) {
                    this.nodeResponses.add(nodeResponse);
                    z2 = this.nodeResponses.size() == size;
                }
                try {
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    standardAsyncClusterResponse.addTiming("Completed Verification", nodeResponse.getNodeId().toString(), nanoTime2);
                    if (z2) {
                        standardAsyncClusterResponse.addTiming("Verification Completed", "All Nodes", nanoTime2);
                        final long count = this.nodeResponses.stream().filter(nodeResponse2 -> {
                            return nodeResponse2.getStatus() != 202;
                        }).count();
                        if (count == 0) {
                            ThreadPoolRequestReplicator.logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", new Object[]{Integer.valueOf(size), str, uri.getPath()});
                            ThreadPoolRequestReplicator.this.replicate(set, str, uri, obj, map, false, standardAsyncClusterResponse, true, z, obj2);
                            return;
                        }
                        try {
                            final HashMap hashMap2 = new HashMap(map);
                            hashMap2.put(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
                            Thread thread = new Thread(new Runnable() { // from class: org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator.2.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    ThreadPoolRequestReplicator.logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", new Object[]{Long.valueOf(count), str, uri.getPath()});
                                    PreparedRequest prepareRequest = ThreadPoolRequestReplicator.this.httpClient.prepareRequest(str, hashMap2, obj);
                                    URI uri2 = uri;
                                    StandardAsyncClusterResponse standardAsyncClusterResponse2 = standardAsyncClusterResponse;
                                    ThreadPoolRequestReplicator.this.submitAsyncRequest(set, uri.getScheme(), uri.getPath(), nodeIdentifier -> {
                                        return new NodeHttpRequest(prepareRequest, nodeIdentifier, ThreadPoolRequestReplicator.this.createURI(uri2, nodeIdentifier), null, standardAsyncClusterResponse2);
                                    }, hashMap2);
                                }
                            });
                            thread.setName("Cancel Flow Locks");
                            thread.start();
                            for (NodeResponse nodeResponse3 : this.nodeResponses) {
                                if (nodeResponse3.getStatus() != 202) {
                                    Response clientResponse = nodeResponse3.getClientResponse();
                                    if (clientResponse == null) {
                                        str2 = "Node " + nodeResponse3.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + nodeResponse3.getStatus();
                                        ThreadPoolRequestReplicator.logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", new Object[]{Integer.valueOf(nodeResponse3.getStatus()), nodeResponse3.getNodeId(), str, uri.getPath()});
                                    } else {
                                        String str3 = (String) clientResponse.readEntity(String.class);
                                        str2 = "Node " + nodeResponse3.getNodeId() + " is unable to fulfill this request due to: " + str3;
                                        ThreadPoolRequestReplicator.logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", new Object[]{Integer.valueOf(nodeResponse3.getStatus()), nodeResponse3.getNodeId(), str, uri.getPath(), str3});
                                    }
                                    standardAsyncClusterResponse.setFailure(nodeResponse3.getStatus() == Response.Status.FORBIDDEN.getStatusCode() ? nodeResponse3.hasThrowable() ? new AccessDeniedException(str2, nodeResponse3.getThrowable()) : new AccessDeniedException(str2) : nodeResponse3.hasThrowable() ? new IllegalClusterStateException(str2, nodeResponse3.getThrowable()) : new IllegalClusterStateException(str2), nodeResponse3.getNodeId());
                                }
                            }
                            if (obj2 != null) {
                                synchronized (obj2) {
                                    obj2.notify();
                                }
                                ThreadPoolRequestReplicator.logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", new Object[]{obj2, str, uri});
                            }
                        } catch (Throwable th) {
                            if (obj2 != null) {
                                synchronized (obj2) {
                                    obj2.notify();
                                    ThreadPoolRequestReplicator.logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", new Object[]{obj2, str, uri});
                                }
                            }
                            throw th;
                        }
                    }
                } catch (Exception e) {
                    standardAsyncClusterResponse.add(new NodeResponse(nodeResponse.getNodeId(), str, uri, e));
                    for (NodeResponse nodeResponse4 : this.nodeResponses) {
                        if (!nodeResponse4.getNodeId().equals(nodeResponse.getNodeId())) {
                            standardAsyncClusterResponse.add(nodeResponse4);
                        }
                    }
                }
            }
        };
        PreparedRequest prepareRequest = this.httpClient.prepareRequest(str, hashMap, obj);
        submitAsyncRequest(set, uri.getScheme(), uri.getPath(), nodeIdentifier -> {
            return new NodeHttpRequest(prepareRequest, nodeIdentifier, createURI(uri, nodeIdentifier), nodeRequestCompletionCallback, standardAsyncClusterResponse);
        }, hashMap);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestReplicator
    public AsyncClusterResponse getClusterResponse(String str) {
        StandardAsyncClusterResponse standardAsyncClusterResponse = this.responseMap.get(str);
        if (standardAsyncClusterResponse == null) {
            return null;
        }
        return standardAsyncClusterResponse;
    }

    protected NodeResponse replicateRequest(PreparedRequest preparedRequest, NodeIdentifier nodeIdentifier, URI uri, String str, StandardAsyncClusterResponse standardAsyncClusterResponse) throws IOException {
        int length;
        long nanoTime = System.nanoTime();
        logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", new Object[]{preparedRequest.getMethod(), uri, str, preparedRequest.getHeaders()});
        Response replicate = this.httpClient.replicate(preparedRequest, uri.toString());
        standardAsyncClusterResponse.addTiming("Perform HTTP Request", nodeIdentifier.toString(), System.nanoTime() - nanoTime);
        NodeResponse nodeResponse = new NodeResponse(nodeIdentifier, preparedRequest.getMethod(), uri, replicate, System.nanoTime() - nanoTime, str);
        if (nodeResponse.is2xx() && (length = nodeResponse.getClientResponse().getLength()) > 0 && standardAsyncClusterResponse.requestBuffer(length)) {
            nodeResponse.bufferResponse();
        }
        return nodeResponse;
    }

    private boolean isMutableRequest(String str, String str2) {
        String upperCase = str.toUpperCase();
        boolean z = -1;
        switch (upperCase.hashCode()) {
            case -531492226:
                if (upperCase.equals("OPTIONS")) {
                    z = 2;
                    break;
                }
                break;
            case 70454:
                if (upperCase.equals("GET")) {
                    z = false;
                    break;
                }
                break;
            case 2213344:
                if (upperCase.equals("HEAD")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
            case true:
                return false;
            default:
                return true;
        }
    }

    private void verifyClusterState(String str, String str2) {
        if ("DELETE".equals(str) || "POST".equals(str) || "PUT".equals(str)) {
            Map connectionStates = this.clusterCoordinator.getConnectionStates();
            if (connectionStates.containsKey(NodeConnectionState.DISCONNECTED) || connectionStates.containsKey(NodeConnectionState.DISCONNECTING)) {
                throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + str + " " + str2 + "] while a node is disconnected from the cluster");
            }
            if (connectionStates.containsKey(NodeConnectionState.CONNECTING)) {
                throw new ConnectingNodeMutableRequestException("Received a mutable request [" + str + " " + str2 + "] while a node is trying to connect to the cluster");
            }
        }
    }

    private void onResponseConsumed(String str) {
        this.responseMap.remove(str);
    }

    private void onCompletedResponse(String str) {
        StandardAsyncClusterResponse standardAsyncClusterResponse = this.responseMap.get(str);
        if (standardAsyncClusterResponse != null && this.callback != null) {
            try {
                this.callback.afterRequest(standardAsyncClusterResponse.getURIPath(), standardAsyncClusterResponse.getMethod(), standardAsyncClusterResponse.getCompletedNodeResponses());
            } catch (Exception e) {
                logger.warn("Completed request {} {} but failed to properly handle the Request Completion Callback due to {}", new Object[]{standardAsyncClusterResponse.getMethod(), standardAsyncClusterResponse.getURIPath(), e.toString()});
                logger.warn("", e);
            }
        }
        if (standardAsyncClusterResponse != null && logger.isDebugEnabled()) {
            logTimingInfo(standardAsyncClusterResponse);
        }
        Set<NodeIdentifier> findLongResponseTimes = ResponseUtils.findLongResponseTimes(standardAsyncClusterResponse, 1.5d);
        for (NodeIdentifier nodeIdentifier : standardAsyncClusterResponse.getNodesInvolved()) {
            AtomicInteger computeIfAbsent = this.sequentialLongRequestCounts.computeIfAbsent(nodeIdentifier, nodeIdentifier2 -> {
                return new AtomicInteger(0);
            });
            if (!findLongResponseTimes.contains(nodeIdentifier)) {
                computeIfAbsent.set(0);
            } else if (computeIfAbsent.incrementAndGet() >= 3) {
                String str2 = "Response time from " + nodeIdentifier + " was slow for each of the last 3 requests made. To see more information about timing, enable DEBUG logging for " + logger.getName();
                logger.warn(str2);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.WARNING, "Node Response Time", str2);
                }
                computeIfAbsent.set(0);
            }
        }
    }

    private void logTimingInfo(AsyncClusterResponse asyncClusterResponse) {
        LongSummaryStatistics longSummaryStatistics = (LongSummaryStatistics) asyncClusterResponse.getNodesInvolved().stream().map(nodeIdentifier -> {
            return Long.valueOf(asyncClusterResponse.getNodeResponse(nodeIdentifier).getRequestDuration(TimeUnit.MILLISECONDS));
        }).collect(Collectors.summarizingLong((v0) -> {
            return v0.longValue();
        }));
        StringBuilder sb = new StringBuilder();
        sb.append("Node Responses for ").append(asyncClusterResponse.getMethod()).append(" ").append(asyncClusterResponse.getURIPath()).append(" (Request ID ").append(asyncClusterResponse.getRequestIdentifier()).append("):\n");
        for (NodeIdentifier nodeIdentifier2 : asyncClusterResponse.getNodesInvolved()) {
            sb.append(nodeIdentifier2).append(": ").append(asyncClusterResponse.getNodeResponse(nodeIdentifier2).getRequestDuration(TimeUnit.MILLISECONDS)).append(" millis\n");
        }
        logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", new Object[]{asyncClusterResponse.getMethod(), asyncClusterResponse.getURIPath(), asyncClusterResponse.getRequestIdentifier(), Long.valueOf(longSummaryStatistics.getMin()), Long.valueOf(longSummaryStatistics.getMax()), Double.valueOf(longSummaryStatistics.getAverage())});
        logger.debug(sb.toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitAsyncRequest(Set<NodeIdentifier> set, String str, String str2, Function<NodeIdentifier, NodeHttpRequest> function, Map<String, String> map) {
        if (set.isEmpty()) {
            return;
        }
        Iterator<NodeIdentifier> it = set.iterator();
        while (it.hasNext()) {
            this.executorService.submit(function.apply(it.next()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URI createURI(URI uri, NodeIdentifier nodeIdentifier) {
        return createURI(uri.getScheme(), nodeIdentifier.getApiAddress(), nodeIdentifier.getApiPort(), uri.getPath(), uri.getQuery());
    }

    private URI createURI(String str, String str2, int i, String str3, String str4) {
        try {
            return new URI(str, null, str2, i, str3, str4, null);
        } catch (URISyntaxException e) {
            throw new UriConstructionException(e);
        }
    }

    private synchronized int purgeExpiredRequests() {
        ((Set) this.responseMap.entrySet().stream().filter(entry -> {
            return ((StandardAsyncClusterResponse) entry.getValue()).isOlderThan(30L, TimeUnit.SECONDS);
        }).filter(entry2 -> {
            return ((StandardAsyncClusterResponse) entry2.getValue()).isComplete();
        }).map(entry3 -> {
            return (String) entry3.getKey();
        }).collect(Collectors.toSet())).forEach(str -> {
            onResponseConsumed(str);
        });
        return this.responseMap.size();
    }

    private void removeCookie(Map<String, String> map, String str) {
        if (map.containsKey("Cookie") && StringUtils.isNotBlank(str)) {
            Set set = (Set) Stream.of((Object[]) map.get("Cookie").split(";")).map((v0) -> {
                return v0.trim();
            }).filter(str2 -> {
                return !str2.startsWith(new StringBuilder().append(str).append("=").toString());
            }).collect(Collectors.toSet());
            if (set.isEmpty()) {
                map.remove("Cookie");
            } else {
                map.put("Cookie", StringUtils.join(set, "; "));
            }
        }
    }
}
