package com.jashmore.sqs.retriever.batching;

import com.google.common.annotations.VisibleForTesting;
import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.retriever.AsyncMessageRetriever;
import com.jashmore.sqs.util.properties.PropertyUtils;
import com.jashmore.sqs.util.retriever.RetrieverUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkInterruptedException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:com/jashmore/sqs/retriever/batching/BatchingMessageRetriever.class */
public class BatchingMessageRetriever implements AsyncMessageRetriever {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchingMessageRetriever.class);
    private final QueueProperties queueProperties;
    private final SqsAsyncClient sqsAsyncClient;
    private final BatchingMessageRetrieverProperties properties;
    private final AtomicInteger numberWaitingForMessages;
    private final BlockingQueue<Message> messagesDownloaded;
    private final Object shouldObtainMessagesLock;

    public BatchingMessageRetriever(QueueProperties queueProperties, SqsAsyncClient sqsAsyncClient, BatchingMessageRetrieverProperties batchingMessageRetrieverProperties) {
        this.queueProperties = queueProperties;
        this.sqsAsyncClient = sqsAsyncClient;
        this.properties = batchingMessageRetrieverProperties;
        this.numberWaitingForMessages = new AtomicInteger();
        this.messagesDownloaded = new LinkedBlockingQueue();
        this.shouldObtainMessagesLock = new Object();
    }

    @VisibleForTesting
    BatchingMessageRetriever(QueueProperties queueProperties, SqsAsyncClient sqsAsyncClient, BatchingMessageRetrieverProperties batchingMessageRetrieverProperties, AtomicInteger atomicInteger, BlockingQueue<Message> blockingQueue, Object obj) {
        this.queueProperties = queueProperties;
        this.sqsAsyncClient = sqsAsyncClient;
        this.properties = batchingMessageRetrieverProperties;
        this.numberWaitingForMessages = atomicInteger;
        this.messagesDownloaded = blockingQueue;
        this.shouldObtainMessagesLock = obj;
    }

    public Message retrieveMessage() throws InterruptedException {
        try {
            incrementWaitingCountAndNotify();
            log.trace("Waiting for message");
            return this.messagesDownloaded.take();
        } finally {
            this.numberWaitingForMessages.decrementAndGet();
        }
    }

    private void incrementWaitingCountAndNotify() {
        synchronized (this.shouldObtainMessagesLock) {
            int incrementAndGet = this.numberWaitingForMessages.incrementAndGet();
            int numberOfThreadsWaitingTrigger = getNumberOfThreadsWaitingTrigger();
            if (incrementAndGet >= numberOfThreadsWaitingTrigger) {
                log.debug("Maximum number of threads({}) waiting has arrived requesting any sleeping threads to wake up to process", Integer.valueOf(numberOfThreadsWaitingTrigger));
                this.shouldObtainMessagesLock.notifyAll();
            }
        }
    }

    public void run() {
        int min;
        log.debug("Started background thread");
        while (true) {
            try {
                synchronized (this.shouldObtainMessagesLock) {
                    if (this.numberWaitingForMessages.get() - this.messagesDownloaded.size() < getNumberOfThreadsWaitingTrigger()) {
                        try {
                            waitForEnoughThreadsToRequestMessages(getPollingPeriodInMs());
                        } catch (InterruptedException e) {
                            log.debug("Thread interrupted while waiting for messages");
                            return;
                        }
                    }
                    min = Math.min(this.numberWaitingForMessages.get() - this.messagesDownloaded.size(), 10);
                }
                if (min <= 0) {
                    log.debug("Requesting 0 messages");
                } else {
                    log.debug("Requesting {} messages", Integer.valueOf(min));
                    try {
                        try {
                            Iterator it = ((ReceiveMessageResponse) this.sqsAsyncClient.receiveMessage(buildReceiveMessageRequest(min)).get()).messages().iterator();
                            while (it.hasNext()) {
                                this.messagesDownloaded.put((Message) it.next());
                            }
                        } catch (InterruptedException e2) {
                            log.debug("Thread interrupted while placing messages on internal queue");
                            return;
                        }
                    } catch (InterruptedException e3) {
                        log.debug("Thread interrupted while obtaining messages from SQS");
                        return;
                    }
                }
            } catch (RuntimeException | ExecutionException e4) {
                if (e4 instanceof ExecutionException) {
                    Throwable cause = e4.getCause();
                    if ((cause instanceof SdkClientException) && (cause.getCause() instanceof SdkInterruptedException)) {
                        log.debug("Thread interrupted while receiving messages");
                        return;
                    }
                }
                log.error("Exception thrown when retrieving messages", e4);
                try {
                    long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
                    log.error("Error thrown while organising threads to process messages. Backing off for {}ms", Long.valueOf(errorBackoffTimeInMilliseconds), e4);
                    backoff(errorBackoffTimeInMilliseconds);
                } catch (InterruptedException e5) {
                    log.debug("Thread interrupted during backoff period");
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    @SuppressFBWarnings({"WA_NOT_IN_LOOP"})
    void waitForEnoughThreadsToRequestMessages(long j) throws InterruptedException {
        this.shouldObtainMessagesLock.wait(j);
    }

    private long getErrorBackoffTimeInMilliseconds() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroLongValue("errorBackoffTimeInMilliseconds", batchingMessageRetrieverProperties::getErrorBackoffTimeInMilliseconds, 10000L);
    }

    private int getNumberOfThreadsWaitingTrigger() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetIntegerValue("numberOfThreadsWaitingTrigger", batchingMessageRetrieverProperties::getNumberOfThreadsWaitingTrigger, 1);
    }

    @VisibleForTesting
    void backoff(long j) throws InterruptedException {
        Thread.sleep(j);
    }

    private ReceiveMessageRequest buildReceiveMessageRequest(int i) {
        ReceiveMessageRequest.Builder maxNumberOfMessages = ReceiveMessageRequest.builder().queueUrl(this.queueProperties.getQueueUrl()).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{QueueAttributeName.ALL.toString()}).maxNumberOfMessages(Integer.valueOf(i));
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        ReceiveMessageRequest.Builder waitTimeSeconds = maxNumberOfMessages.waitTimeSeconds(Integer.valueOf(RetrieverUtils.safelyGetWaitTimeInSeconds(batchingMessageRetrieverProperties::getMessageWaitTimeInSeconds)));
        Integer visibilityTimeoutInSeconds = this.properties.getVisibilityTimeoutInSeconds();
        if (visibilityTimeoutInSeconds != null) {
            if (visibilityTimeoutInSeconds.intValue() < 0) {
                log.warn("Non-positive visibilityTimeoutInSeconds provided: {}", visibilityTimeoutInSeconds);
            } else {
                waitTimeSeconds.visibilityTimeout(visibilityTimeoutInSeconds);
            }
        }
        return (ReceiveMessageRequest) waitTimeSeconds.build();
    }

    private long getPollingPeriodInMs() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetLongValue("messageRetrievalPollingPeriodInMs", batchingMessageRetrieverProperties::getMessageRetrievalPollingPeriodInMs, 0L);
    }
}
