package com.jashmore.sqs.retriever.prefetch;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.retriever.MessageRetriever;
import com.jashmore.sqs.util.Preconditions;
import com.jashmore.sqs.util.collections.CollectionUtils;
import com.jashmore.sqs.util.properties.PropertyUtils;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkInterruptedException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.class */
public class PrefetchingMessageRetriever implements MessageRetriever {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PrefetchingMessageRetriever.class);
    private final SqsAsyncClient sqsAsyncClient;
    private final QueueProperties queueProperties;
    private final PrefetchingMessageRetrieverProperties properties;
    private final PrefetchingMessageFutureConsumerQueue pairConsumerQueue;
    private final int maxPrefetchedMessages;

    public PrefetchingMessageRetriever(SqsAsyncClient sqsAsyncClient, QueueProperties queueProperties, PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties) {
        Preconditions.checkNotNull(sqsAsyncClient, "sqsAsyncClient");
        Preconditions.checkNotNull(queueProperties, "queueProperties");
        Preconditions.checkNotNull(prefetchingMessageRetrieverProperties, "properties");
        this.sqsAsyncClient = sqsAsyncClient;
        this.queueProperties = queueProperties;
        this.properties = prefetchingMessageRetrieverProperties;
        this.maxPrefetchedMessages = prefetchingMessageRetrieverProperties.getMaxPrefetchedMessages();
        int desiredMinPrefetchedMessages = prefetchingMessageRetrieverProperties.getDesiredMinPrefetchedMessages();
        Preconditions.checkArgument(this.maxPrefetchedMessages >= desiredMinPrefetchedMessages, "maxPrefetchedMessages should be greater than or equal to desiredMinPrefetchedMessages");
        Preconditions.checkArgument(desiredMinPrefetchedMessages > 0, "desiredMinPrefetchedMessages must be greater than zero");
        this.pairConsumerQueue = new PrefetchingMessageFutureConsumerQueue(Integer.valueOf(desiredMinPrefetchedMessages));
    }

    public CompletableFuture<Message> retrieveMessage() {
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        this.pairConsumerQueue.pushCompletableFuture(completableFuture);
        return completableFuture;
    }

    public List<Message> run() {
        log.info("Started MessageRetriever");
        LinkedList linkedList = new LinkedList();
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            try {
                this.pairConsumerQueue.blockUntilFreeSlotForMessage();
                CompletableFuture supplyAsync = CompletableFuture.supplyAsync(this::buildReceiveMessageRequest);
                SqsAsyncClient sqsAsyncClient = this.sqsAsyncClient;
                sqsAsyncClient.getClass();
                List list = (List) supplyAsync.thenCompose(sqsAsyncClient::receiveMessage).thenApply((v0) -> {
                    return v0.messages();
                }).get();
                log.debug("Received {} messages", Integer.valueOf(list.size()));
                ListIterator listIterator = list.listIterator();
                while (listIterator.hasNext()) {
                    Message message = (Message) listIterator.next();
                    try {
                        this.pairConsumerQueue.pushMessage(message);
                    } catch (InterruptedException e) {
                        log.debug("Thread interrupted while adding messages into internal queue. Exiting...");
                        linkedList.add(message);
                        linkedList.getClass();
                        listIterator.forEachRemaining((v1) -> {
                            r1.add(v1);
                        });
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (InterruptedException e2) {
                log.debug("Thread interrupted while requesting messages. Exiting...");
            } catch (RuntimeException | ExecutionException e3) {
                if (e3 instanceof ExecutionException) {
                    Throwable cause = e3.getCause();
                    if ((cause instanceof SdkClientException) && (cause.getCause() instanceof SdkInterruptedException)) {
                        log.debug("Thread interrupted receiving messages");
                        break;
                    }
                }
                log.error("Exception thrown when retrieving messages", e3);
                performBackoff();
            }
        }
        QueueDrain drain = this.pairConsumerQueue.drain();
        drain.getFuturesWaitingForMessages().forEach(completableFuture -> {
            completableFuture.cancel(true);
        });
        return CollectionUtils.immutableListFrom(drain.getMessagesAvailableForProcessing(), linkedList);
    }

    private ReceiveMessageRequest buildReceiveMessageRequest() {
        int min = Math.min(10, this.maxPrefetchedMessages - this.pairConsumerQueue.getNumberOfBatchedMessages());
        log.debug("Retrieving {} messages asynchronously", Integer.valueOf(min));
        ReceiveMessageRequest.Builder maxNumberOfMessages = ReceiveMessageRequest.builder().queueUrl(this.queueProperties.getQueueUrl()).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{QueueAttributeName.ALL.toString()}).waitTimeSeconds(20).maxNumberOfMessages(Integer.valueOf(min));
        Duration messageVisibilityTimeout = this.properties.getMessageVisibilityTimeout();
        if (messageVisibilityTimeout != null && messageVisibilityTimeout.getSeconds() > 0) {
            maxNumberOfMessages.visibilityTimeout(Integer.valueOf((int) messageVisibilityTimeout.getSeconds()));
        }
        return (ReceiveMessageRequest) maxNumberOfMessages.build();
    }

    private void performBackoff() {
        try {
            PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties = this.properties;
            prefetchingMessageRetrieverProperties.getClass();
            Duration safelyGetPositiveOrZeroDuration = PropertyUtils.safelyGetPositiveOrZeroDuration("errorBackoffTime", prefetchingMessageRetrieverProperties::getErrorBackoffTime, PrefetchingMessageRetrieverConstants.DEFAULT_ERROR_BACKOFF_TIMEOUT);
            log.debug("Backing off for {}ms", Long.valueOf(safelyGetPositiveOrZeroDuration.toMillis()));
            Thread.sleep(safelyGetPositiveOrZeroDuration.toMillis());
        } catch (InterruptedException e) {
            log.debug("Thread interrupted during backoff period");
            Thread.currentThread().interrupt();
        }
    }
}
