package com.yahoo.vespa.http.client.core.communication;

import com.yahoo.vespa.feeder.shaded.internal.apache.http.protocol.HttpRequestExecutor;
import com.yahoo.vespa.http.client.FeedConnectException;
import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread.class */
public class IOThread implements Runnable, AutoCloseable {
    private static final Logger log = Logger.getLogger(IOThread.class.getName());
    private final Endpoint endpoint;
    private final GatewayConnectionFactory connectionFactory;
    private final DocumentQueue documentQueue;
    private final EndpointResultQueue resultQueue;
    private final Thread thread;
    private final int clusterId;
    private final int maxChunkSizeBytes;
    private final int maxInFlightRequests;
    private final Duration localQueueTimeOut;
    private final GatewayThrottler gatewayThrottler;
    private final Duration connectionTimeToLive;
    private final long pollIntervalUS;
    private final Clock clock;
    private final OldConnectionsDrainer oldConnectionsDrainer;
    private volatile GatewayConnection currentConnection;
    private final CountDownLatch running = new CountDownLatch(1);
    private final CountDownLatch stopSignal = new CountDownLatch(1);
    private final Random random = new Random();
    private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
    private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0);
    private final AtomicInteger executeProblemsCounter = new AtomicInteger(0);
    private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
    private final AtomicInteger successfulHandshakes = new AtomicInteger(0);
    private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ConnectionState.class */
    public enum ConnectionState {
        DISCONNECTED,
        CONNECTED,
        SESSION_SYNCED
    }

    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ConnectionStats.class */
    public static class ConnectionStats {
        public final int wrongSessionDetectedCounter;
        public final int wrongVersionDetectedCounter;
        public final int problemStatusCodeFromServerCounter;
        public final int executeProblemsCounter;
        public final int docsReceivedCounter;
        public final int statusReceivedCounter;
        public final int pendingDocumentStatusCount;
        public final int successfullHandshakes;
        public final int lastGatewayProcessTimeMillis;

        ConnectionStats(int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9) {
            this.wrongSessionDetectedCounter = i;
            this.wrongVersionDetectedCounter = i2;
            this.problemStatusCodeFromServerCounter = i3;
            this.executeProblemsCounter = i4;
            this.docsReceivedCounter = i5;
            this.statusReceivedCounter = i6;
            this.pendingDocumentStatusCount = i7;
            this.successfullHandshakes = i8;
            this.lastGatewayProcessTimeMillis = i9;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$OldConnectionsDrainer.class */
    public static class OldConnectionsDrainer implements Runnable {
        private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName());
        private final Endpoint endpoint;
        private final int clusterId;
        private final Duration pollInterval;
        private final Duration connectionTimeToLive;
        private final Duration localQueueTimeOut;
        private final AtomicInteger statusReceivedCounter;
        private final EndpointResultQueue resultQueue;
        private final CountDownLatch stopSignal;
        private final Clock clock;
        private final List<GatewayConnection> connections = new CopyOnWriteArrayList();

        OldConnectionsDrainer(Endpoint endpoint, int i, Duration duration, Duration duration2, Duration duration3, AtomicInteger atomicInteger, EndpointResultQueue endpointResultQueue, CountDownLatch countDownLatch, Clock clock) {
            this.endpoint = endpoint;
            this.clusterId = i;
            this.pollInterval = duration;
            this.connectionTimeToLive = duration2;
            this.localQueueTimeOut = duration3;
            this.statusReceivedCounter = atomicInteger;
            this.resultQueue = endpointResultQueue;
            this.stopSignal = countDownLatch;
            this.clock = clock;
        }

        public void add(GatewayConnection gatewayConnection) {
            this.connections.add(gatewayConnection);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.stopSignal.getCount() > 0) {
                try {
                    checkOldConnections();
                    Thread.sleep(this.pollInterval.toMillis());
                } catch (InterruptedException e) {
                    log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), (Throwable) e);
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception e2) {
                    log.log(Level.WARNING, "Connection draining failed: " + e2.getMessage(), (Throwable) e2);
                }
            }
        }

        public void checkOldConnections() {
            for (GatewayConnection gatewayConnection : this.connections) {
                if (!this.resultQueue.hasInflightOperations(gatewayConnection)) {
                    log.fine(() -> {
                        return gatewayConnection + " no longer has inflight operations";
                    });
                    closeConnection(gatewayConnection);
                } else if (closingTime(gatewayConnection).isBefore(this.clock.instant())) {
                    log.fine(() -> {
                        return gatewayConnection + " still has inflight operations, but drain period is over";
                    });
                    tryPollAndDrainInflightOperations(gatewayConnection);
                    closeConnection(gatewayConnection);
                } else if (timeToPoll(gatewayConnection)) {
                    tryPollAndDrainInflightOperations(gatewayConnection);
                }
            }
        }

        private void closeConnection(GatewayConnection gatewayConnection) {
            log.fine(() -> {
                return "Closing " + gatewayConnection;
            });
            gatewayConnection.close();
            this.connections.remove(gatewayConnection);
        }

        private void tryPollAndDrainInflightOperations(GatewayConnection gatewayConnection) {
            try {
                log.fine(() -> {
                    return "Polling and draining inflight operations for " + gatewayConnection;
                });
                IOThread.processResponse(gatewayConnection.poll(), this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
            } catch (Exception e) {
                log.log(Level.FINE, e, () -> {
                    return "Polling status of inflight operations failed: " + e.getMessage();
                });
            }
        }

        private boolean timeToPoll(GatewayConnection gatewayConnection) {
            Instant instant = this.clock.instant();
            Instant plus = gatewayConnection.connectionTime().plus((TemporalAmount) this.connectionTimeToLive);
            if (gatewayConnection.lastPollTime() == null) {
                return plus.plus((TemporalAmount) this.pollInterval).isBefore(instant);
            }
            if (gatewayConnection.lastPollTime().plus((TemporalAmount) this.pollInterval).isAfter(instant)) {
                return false;
            }
            double epochMilli = plus.toEpochMilli();
            return ((double) instant.toEpochMilli()) - epochMilli > 2.0d * (((double) gatewayConnection.lastPollTime().toEpochMilli()) - epochMilli);
        }

        private Instant closingTime(GatewayConnection gatewayConnection) {
            return gatewayConnection.connectionTime().plus((TemporalAmount) this.connectionTimeToLive).plus((TemporalAmount) this.localQueueTimeOut);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            int pendingSize = this.resultQueue.getPendingSize();
            if (pendingSize > 0) {
                log.info("We have outstanding operations (" + pendingSize + ") , trying to fetch responses.");
                Iterator<GatewayConnection> it = this.connections.iterator();
                while (it.hasNext()) {
                    try {
                        IOThread.processResponse(it.next().poll(), this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
                    } catch (Throwable th) {
                        log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", th);
                    }
                }
            }
            Iterator<GatewayConnection> it2 = this.connections.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }

        public List<GatewayConnection> connections() {
            return Collections.unmodifiableList(this.connections);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ProcessResponse.class */
    public static class ProcessResponse {
        private final int transitiveErrorCount;
        private final int processResultsCount;

        ProcessResponse(int i, int i2) {
            this.transitiveErrorCount = i;
            this.processResultsCount = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IOThread(ThreadGroup threadGroup, Endpoint endpoint, EndpointResultQueue endpointResultQueue, GatewayConnectionFactory gatewayConnectionFactory, int i, int i2, int i3, Duration duration, DocumentQueue documentQueue, long j, Duration duration2, boolean z, double d, Clock clock) {
        this.endpoint = endpoint;
        this.documentQueue = documentQueue;
        this.connectionFactory = gatewayConnectionFactory;
        this.currentConnection = gatewayConnectionFactory.newConnection();
        this.resultQueue = endpointResultQueue;
        this.clusterId = i;
        this.maxChunkSizeBytes = i2;
        this.maxInFlightRequests = i3;
        this.connectionTimeToLive = duration2;
        this.gatewayThrottler = new GatewayThrottler(j);
        this.pollIntervalUS = Math.max(1000L, (long) (1000000.0d / Math.max(0.1d, d)));
        this.clock = clock;
        this.localQueueTimeOut = duration;
        this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, i, Duration.ofMillis(this.pollIntervalUS / 1000), duration2, duration, this.statusReceivedCounter, this.resultQueue, this.stopSignal, clock);
        if (!z) {
            this.thread = null;
            return;
        }
        this.thread = new Thread(threadGroup, this, "IOThread " + endpoint);
        this.thread.setDaemon(true);
        this.thread.start();
        Thread thread = new Thread(threadGroup, this.oldConnectionsDrainer, "IOThread " + endpoint + " drainer");
        thread.setDaemon(true);
        thread.start();
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public ConnectionStats getConnectionStats() {
        return new ConnectionStats(this.wrongSessionDetectedCounter.get(), this.wrongVersionDetectedCounter.get(), this.problemStatusCodeFromServerCounter.get(), this.executeProblemsCounter.get(), this.docsReceivedCounter.get(), this.statusReceivedCounter.get(), this.pendingDocumentStatusCount.get(), this.successfulHandshakes.get(), this.lastGatewayProcessTimeMillis.get());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.documentQueue.close();
        if (this.stopSignal.getCount() == 0) {
            return;
        }
        this.stopSignal.countDown();
        log.finer("Closed called.");
        this.oldConnectionsDrainer.close();
        int pendingSize = this.resultQueue.getPendingSize();
        if (pendingSize > 0) {
            log.info("We have outstanding operations (" + pendingSize + ") , trying to fetch responses.");
            try {
                processResponse(this.currentConnection.drain());
            } catch (Throwable th) {
                log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", th);
            }
        }
        try {
            this.currentConnection.close();
            log.fine("Session to " + this.endpoint + " closed.");
        } finally {
            drainDocumentQueueWhenFailingPermanently(new Exception("Closed call, did not manage to process everything so failing this document."));
        }
    }

    public void post(Document document) throws InterruptedException {
        this.documentQueue.put(document, true);
    }

    public String toString() {
        return "I/O thread (for " + this.endpoint + ")";
    }

    List<Document> getNextDocsForFeeding(long j, TimeUnit timeUnit) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        try {
            drainFirstDocumentsInQueueIfOld();
            Document poll = this.thread != null ? this.documentQueue.poll(j, timeUnit) : this.documentQueue.poll();
            if (poll != null) {
                arrayList.add(poll);
                i = poll.size();
            }
            int randomize = randomize(this.maxChunkSizeBytes);
            int randomize2 = randomize(this.maxInFlightRequests);
            for (int pendingSize = 1 + this.resultQueue.getPendingSize(); i < randomize && pendingSize < randomize2; pendingSize++) {
                drainFirstDocumentsInQueueIfOld();
                Document poll2 = this.documentQueue.poll();
                if (poll2 == null) {
                    break;
                }
                arrayList.add(poll2);
                i += poll2.size();
            }
            if (log.isLoggable(Level.FINEST)) {
                log.finest("Chunk has " + arrayList.size() + " docs with a size " + i + " bytes");
            }
            this.docsReceivedCounter.addAndGet(arrayList.size());
            return arrayList;
        } catch (InterruptedException e) {
            log.fine("Got break signal while waiting for new documents to feed");
            return arrayList;
        }
    }

    private int randomize(int i) {
        return Math.max(1, (int) (i * (0.75d + (0.25d * this.random.nextDouble()))));
    }

    private void addDocumentsToResultQueue(List<Document> list) {
        Iterator<Document> it = list.iterator();
        while (it.hasNext()) {
            this.resultQueue.operationSent(it.next().getOperationId(), this.currentConnection);
        }
    }

    private void markDocumentAsFailed(List<Document> list, ServerResponseException serverResponseException) {
        Iterator<Document> it = list.iterator();
        while (it.hasNext()) {
            this.resultQueue.failOperation(EndPointResultFactory.createTransientError(this.endpoint, it.next().getOperationId(), serverResponseException), this.clusterId);
        }
    }

    private InputStream sendAndReceive(List<Document> list) throws IOException, ServerResponseException {
        try {
            return this.currentConnection.write(list);
        } catch (ServerResponseException e) {
            markDocumentAsFailed(list, e);
            throw e;
        } catch (Exception e2) {
            markDocumentAsFailed(list, new ServerResponseException(Exceptions.toMessageString(e2)));
            throw e2;
        }
    }

    private ProcessResponse processResponse(InputStream inputStream) throws IOException {
        return processResponse(inputStream, this.endpoint, this.clusterId, this.statusReceivedCounter, this.resultQueue);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ProcessResponse processResponse(InputStream inputStream, Endpoint endpoint, int i, AtomicInteger atomicInteger, EndpointResultQueue endpointResultQueue) throws IOException {
        Collection<EndpointResult> createResult = EndPointResultFactory.createResult(endpoint, inputStream);
        atomicInteger.addAndGet(createResult.size());
        int i2 = 0;
        for (EndpointResult endpointResult : createResult) {
            if (endpointResult.getDetail().getResultType() == Result.ResultType.TRANSITIVE_ERROR) {
                i2++;
            }
            endpointResultQueue.resultReceived(endpointResult, i);
        }
        return new ProcessResponse(i2, createResult.size());
    }

    private ProcessResponse feedDocumentAndProcessResults(List<Document> list) throws ServerResponseException, IOException {
        addDocumentsToResultQueue(list);
        long millis = this.clock.millis();
        ProcessResponse processResponse = processResponse(sendAndReceive(list));
        this.lastGatewayProcessTimeMillis.set((int) (this.clock.millis() - millis));
        return processResponse;
    }

    private ProcessResponse pullAndProcessData(long j) throws ServerResponseException, IOException {
        int pendingSize = this.resultQueue.getPendingSize();
        this.pendingDocumentStatusCount.set(pendingSize);
        List<Document> arrayList = pendingSize > this.maxInFlightRequests ? new ArrayList<>() : getNextDocsForFeeding(j, TimeUnit.MICROSECONDS);
        if (arrayList.isEmpty() && pendingSize == 0) {
            log.finest("No document awaiting feeding, not waiting for results.");
            return new ProcessResponse(0, 0);
        }
        log.finest("Awaiting " + pendingSize + " results.");
        ProcessResponse feedDocumentAndProcessResults = feedDocumentAndProcessResults(arrayList);
        if (pendingSize > this.maxInFlightRequests && feedDocumentAndProcessResults.processResultsCount == 0) {
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
            }
        }
        return feedDocumentAndProcessResults;
    }

    private ConnectionState cycle(ConnectionState connectionState) {
        switch (connectionState) {
            case DISCONNECTED:
                try {
                    if (this.currentConnection.connect()) {
                        return ConnectionState.CONNECTED;
                    }
                    log.log(Level.WARNING, "Could not connect to endpoint: '" + this.endpoint + "'. Will re-try.");
                    drainFirstDocumentsInQueueIfOld();
                    return ConnectionState.DISCONNECTED;
                } catch (Throwable th) {
                    drainFirstDocumentsInQueueIfOld();
                    log.log(Level.INFO, "Failed connecting to endpoint: '" + this.endpoint + "'. Will re-try connecting.", th);
                    this.executeProblemsCounter.incrementAndGet();
                    return ConnectionState.DISCONNECTED;
                }
            case CONNECTED:
                try {
                    if (isStale(this.currentConnection)) {
                        return refreshConnection(connectionState);
                    }
                    this.currentConnection.handshake();
                    this.successfulHandshakes.getAndIncrement();
                    return ConnectionState.SESSION_SYNCED;
                } catch (ServerResponseException e) {
                    int responseCode = e.getResponseCode();
                    if (responseCode == 401 || responseCode == 403) {
                        drainDocumentQueueWhenFailingPermanently(new Exception("Denied access by endpoint:" + e.getResponseString()));
                        log.log(Level.SEVERE, "Failed authentication or authorization with '" + this.endpoint + "': " + Exceptions.toMessageString(e));
                        return ConnectionState.CONNECTED;
                    }
                    this.executeProblemsCounter.incrementAndGet();
                    log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + this.endpoint + "' failed -- will re-try handshake: " + Exceptions.toMessageString(e));
                    drainFirstDocumentsInQueueIfOld();
                    this.resultQueue.onEndpointError(new FeedProtocolException(e.getResponseCode(), e.getResponseString(), e, this.endpoint));
                    return ConnectionState.CONNECTED;
                } catch (Throwable th2) {
                    this.executeProblemsCounter.incrementAndGet();
                    this.resultQueue.onEndpointError(new FeedConnectException(th2, this.endpoint));
                    log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + this.endpoint + "' failed. Will re-try handshake.", th2);
                    drainFirstDocumentsInQueueIfOld();
                    this.currentConnection.close();
                    return ConnectionState.DISCONNECTED;
                }
            case SESSION_SYNCED:
                try {
                    if (isStale(this.currentConnection)) {
                        return refreshConnection(connectionState);
                    }
                    this.gatewayThrottler.handleCall(pullAndProcessData(this.pollIntervalUS).transitiveErrorCount);
                    return ConnectionState.SESSION_SYNCED;
                } catch (ServerResponseException e2) {
                    log.log(Level.INFO, "Problems while handing data over to endpoint '" + this.endpoint + "'. Will re-try. Endpoint responded with an unexpected HTTP response code.", (Throwable) e2);
                    return ConnectionState.CONNECTED;
                } catch (Throwable th3) {
                    log.log(Level.INFO, "Connection level error handing data over to endpoint '" + this.endpoint + "'. Will re-try.", th3);
                    this.currentConnection.close();
                    return ConnectionState.DISCONNECTED;
                }
            default:
                log.severe("Should never get here.");
                this.currentConnection.close();
                return ConnectionState.DISCONNECTED;
        }
    }

    private void sleepIfProblemsGettingSyncedConnection(ConnectionState connectionState, ConnectionState connectionState2) {
        if (connectionState == ConnectionState.SESSION_SYNCED) {
            return;
        }
        if (connectionState == ConnectionState.CONNECTED && connectionState2 == ConnectionState.DISCONNECTED) {
            return;
        }
        try {
            if (this.stopSignal.getCount() > 0 || !this.documentQueue.isEmpty()) {
                Thread.sleep(this.gatewayThrottler.distribute(HttpRequestExecutor.DEFAULT_WAIT_FOR_CONTINUE));
            }
        } catch (InterruptedException e) {
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            if (this.stopSignal.getCount() <= 0 && this.documentQueue.isEmpty()) {
                log.finer(toString() + " exiting, documentQueue.size()=" + this.documentQueue.size());
                this.running.countDown();
                return;
            }
            tick();
        }
    }

    public void tick() {
        ConnectionState connectionState = this.connectionState;
        this.connectionState = cycle(this.connectionState);
        if (this.thread == null) {
            this.oldConnectionsDrainer.checkOldConnections();
        }
        if (this.thread != null) {
            sleepIfProblemsGettingSyncedConnection(this.connectionState, connectionState);
        }
    }

    private void drainFirstDocumentsInQueueIfOld() {
        while (true) {
            Optional<Document> pollDocumentIfTimedoutInQueue = this.documentQueue.pollDocumentIfTimedoutInQueue(this.localQueueTimeOut);
            if (!pollDocumentIfTimedoutInQueue.isPresent()) {
                return;
            }
            this.resultQueue.failOperation(EndPointResultFactory.createTransientError(this.endpoint, pollDocumentIfTimedoutInQueue.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " + (this.clock.millis() - pollDocumentIfTimedoutInQueue.get().getQueueInsertTime().toEpochMilli()) + " ms.")), this.clusterId);
        }
    }

    private void drainDocumentQueueWhenFailingPermanently(Exception exc) {
        this.resultQueue.failPending(exc);
        Iterator<Document> it = this.documentQueue.removeAllDocuments().iterator();
        while (it.hasNext()) {
            this.resultQueue.failOperation(EndPointResultFactory.createError(this.endpoint, it.next().getOperationId(), exc), this.clusterId);
        }
    }

    private boolean isStale(GatewayConnection gatewayConnection) {
        return gatewayConnection.connectionTime() != null && gatewayConnection.connectionTime().plus((TemporalAmount) this.connectionTimeToLive).isBefore(this.clock.instant());
    }

    private ConnectionState refreshConnection(ConnectionState connectionState) {
        if (connectionState == ConnectionState.SESSION_SYNCED) {
            this.oldConnectionsDrainer.add(this.currentConnection);
        }
        this.currentConnection = this.connectionFactory.newConnection();
        return ConnectionState.DISCONNECTED;
    }

    public GatewayConnection currentConnection() {
        return this.currentConnection;
    }

    public List<GatewayConnection> oldConnections() {
        return this.oldConnectionsDrainer.connections();
    }

    public EndpointResultQueue resultQueue() {
        return this.resultQueue;
    }
}
