package com.jashmore.sqs.retriever.prefetch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
import com.jashmore.sqs.util.properties.PropertyUtils;
import com.jashmore.sqs.util.retriever.RetrieverUtils;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkInterruptedException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.class */
public class PrefetchingMessageRetriever implements AsyncMessageRetriever {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PrefetchingMessageRetriever.class);
    private final SqsAsyncClient sqsAsyncClient;
    private final QueueProperties queueProperties;
    private final PrefetchingMessageRetrieverProperties properties;
    private final BlockingQueue<Message> internalMessageQueue;
    private final int maxPrefetchedMessages;

    public PrefetchingMessageRetriever(SqsAsyncClient sqsAsyncClient, QueueProperties queueProperties, PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties) {
        Preconditions.checkNotNull(sqsAsyncClient, "sqsAsyncClient");
        Preconditions.checkNotNull(queueProperties, "queueProperties");
        Preconditions.checkNotNull(prefetchingMessageRetrieverProperties, "properties");
        this.sqsAsyncClient = sqsAsyncClient;
        this.queueProperties = queueProperties;
        this.properties = prefetchingMessageRetrieverProperties;
        this.maxPrefetchedMessages = prefetchingMessageRetrieverProperties.getMaxPrefetchedMessages();
        int desiredMinPrefetchedMessages = prefetchingMessageRetrieverProperties.getDesiredMinPrefetchedMessages();
        Preconditions.checkArgument(this.maxPrefetchedMessages >= desiredMinPrefetchedMessages, "maxPrefetchedMessages should be greater than or equal to desiredMinPrefetchedMessages");
        Preconditions.checkArgument(desiredMinPrefetchedMessages > 0, "desiredMinPrefetchedMessages must be greater than zero");
        if (prefetchingMessageRetrieverProperties.getDesiredMinPrefetchedMessages() == 1) {
            this.internalMessageQueue = new SynchronousQueue();
        } else {
            this.internalMessageQueue = new LinkedBlockingQueue(desiredMinPrefetchedMessages);
        }
    }

    @VisibleForTesting
    PrefetchingMessageRetriever(SqsAsyncClient sqsAsyncClient, QueueProperties queueProperties, PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties, BlockingQueue<Message> blockingQueue, int i) {
        this.sqsAsyncClient = sqsAsyncClient;
        this.queueProperties = queueProperties;
        this.properties = prefetchingMessageRetrieverProperties;
        this.internalMessageQueue = blockingQueue;
        this.maxPrefetchedMessages = i;
    }

    public Message retrieveMessage() throws InterruptedException {
        return this.internalMessageQueue.take();
    }

    public void run() {
        while (true) {
            try {
                try {
                    ReceiveMessageResponse receiveMessageResponse = (ReceiveMessageResponse) this.sqsAsyncClient.receiveMessage(buildReceiveMessageRequest()).get();
                    log.trace("Retrieved {} new messages for {} existing messages. Total: {}", new Object[]{Integer.valueOf(receiveMessageResponse.messages().size()), Integer.valueOf(this.internalMessageQueue.size()), Integer.valueOf(this.internalMessageQueue.size() + receiveMessageResponse.messages().size())});
                    Iterator it = receiveMessageResponse.messages().iterator();
                    while (it.hasNext()) {
                        this.internalMessageQueue.put((Message) it.next());
                    }
                } catch (InterruptedException e) {
                    log.debug("Thread interrupted while placing messages onto queue. Exiting...");
                    return;
                }
            } catch (RuntimeException | ExecutionException e2) {
                if (e2 instanceof ExecutionException) {
                    Throwable cause = e2.getCause();
                    if ((cause instanceof SdkClientException) && (cause.getCause() instanceof SdkInterruptedException)) {
                        log.debug("Thread interrupted receiving messages");
                        return;
                    }
                }
                log.error("Exception thrown when retrieving messages", e2);
                try {
                    Thread.sleep(getBackoffTimeInMs());
                } catch (InterruptedException e3) {
                    log.debug("Thread interrupted during error backoff. Exiting...");
                    return;
                }
            }
        }
    }

    private ReceiveMessageRequest buildReceiveMessageRequest() {
        int min = Math.min(10, this.maxPrefetchedMessages - this.internalMessageQueue.size());
        log.debug("Retrieving {} messages asynchronously", Integer.valueOf(min));
        ReceiveMessageRequest.Builder messageAttributeNames = ReceiveMessageRequest.builder().queueUrl(this.queueProperties.getQueueUrl()).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{QueueAttributeName.ALL.toString()});
        PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties = this.properties;
        prefetchingMessageRetrieverProperties.getClass();
        ReceiveMessageRequest.Builder maxNumberOfMessages = messageAttributeNames.waitTimeSeconds(Integer.valueOf(RetrieverUtils.safelyGetWaitTimeInSeconds(prefetchingMessageRetrieverProperties::getMessageWaitTimeInSeconds))).maxNumberOfMessages(Integer.valueOf(min));
        Integer visibilityTimeoutForMessagesInSeconds = this.properties.getVisibilityTimeoutForMessagesInSeconds();
        if (visibilityTimeoutForMessagesInSeconds != null) {
            if (visibilityTimeoutForMessagesInSeconds.intValue() < 0) {
                log.warn("Non-positive visibilityTimeoutInSeconds provided: {}", visibilityTimeoutForMessagesInSeconds);
            } else {
                maxNumberOfMessages.visibilityTimeout(visibilityTimeoutForMessagesInSeconds);
            }
        }
        return (ReceiveMessageRequest) maxNumberOfMessages.build();
    }

    @VisibleForTesting
    int getBackoffTimeInMs() {
        PrefetchingMessageRetrieverProperties prefetchingMessageRetrieverProperties = this.properties;
        prefetchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("errorBackoffTimeInMilliseconds", prefetchingMessageRetrieverProperties::getErrorBackoffTimeInMilliseconds, 10000);
    }
}
