package com.jashmore.sqs.retriever.batching;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.retriever.MessageRetriever;
import com.jashmore.sqs.util.collections.QueueUtils;
import com.jashmore.sqs.util.properties.PropertyUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkInterruptedException;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:com/jashmore/sqs/retriever/batching/BatchingMessageRetriever.class */
public class BatchingMessageRetriever implements MessageRetriever {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchingMessageRetriever.class);
    private final QueueProperties queueProperties;
    private final SqsAsyncClient sqsAsyncClient;
    private final BatchingMessageRetrieverProperties properties;
    private final LinkedBlockingDeque<CompletableFuture<Message>> futuresWaitingForMessages = new LinkedBlockingDeque<>();

    public BatchingMessageRetriever(QueueProperties queueProperties, SqsAsyncClient sqsAsyncClient, BatchingMessageRetrieverProperties batchingMessageRetrieverProperties) {
        this.queueProperties = queueProperties;
        this.sqsAsyncClient = sqsAsyncClient;
        this.properties = batchingMessageRetrieverProperties;
    }

    public CompletableFuture<Message> retrieveMessage() {
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        this.futuresWaitingForMessages.add(completableFuture);
        return completableFuture;
    }

    public List<Message> run() {
        log.info("Started MessageRetriever");
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            try {
                Queue<CompletableFuture<Message>> obtainRequestForMessagesBatch = obtainRequestForMessagesBatch();
                log.debug("Requesting {} messages", Integer.valueOf(obtainRequestForMessagesBatch.size()));
                if (!obtainRequestForMessagesBatch.isEmpty()) {
                    try {
                        obtainRequestForMessagesBatch.getClass();
                        CompletableFuture thenApply = CompletableFuture.supplyAsync(obtainRequestForMessagesBatch::size).thenApply((v1) -> {
                            return buildReceiveMessageRequest(v1);
                        });
                        SqsAsyncClient sqsAsyncClient = this.sqsAsyncClient;
                        sqsAsyncClient.getClass();
                        List<Message> list = (List) thenApply.thenComposeAsync(sqsAsyncClient::receiveMessage).thenApply((v0) -> {
                            return v0.messages();
                        }).get();
                        log.debug("Downloaded {} messages", Integer.valueOf(list.size()));
                        if (list.size() > obtainRequestForMessagesBatch.size()) {
                            log.error("More messages were downloaded than requested, this shouldn't happen");
                        }
                        for (Message message : list) {
                            CompletableFuture<Message> poll = obtainRequestForMessagesBatch.poll();
                            if (poll != null) {
                                poll.complete(message);
                            }
                        }
                        this.futuresWaitingForMessages.addAll(obtainRequestForMessagesBatch);
                    } catch (InterruptedException e) {
                        log.debug("Thread interrupted while waiting for batch of messages");
                    } catch (RuntimeException | ExecutionException e2) {
                        if (e2 instanceof ExecutionException) {
                            Throwable cause = e2.getCause();
                            if ((cause instanceof SdkClientException) && (cause.getCause() instanceof SdkInterruptedException)) {
                                log.debug("Thread interrupted while receiving messages");
                                break;
                            }
                        }
                        log.error("Error request messages", e2);
                        this.futuresWaitingForMessages.addAll(obtainRequestForMessagesBatch);
                        performBackoff();
                    }
                }
            } catch (InterruptedException e3) {
                log.debug("Thread interrupted waiting for batch");
            }
        }
        this.futuresWaitingForMessages.forEach(completableFuture -> {
            completableFuture.cancel(true);
        });
        log.info("MessageRetriever has been successfully stopped");
        return Collections.emptyList();
    }

    private Queue<CompletableFuture<Message>> obtainRequestForMessagesBatch() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        int i = getbatchSize();
        long maxBatchingPeriodInMs = getMaxBatchingPeriodInMs();
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(i);
            objArr[1] = maxBatchingPeriodInMs == Long.MAX_VALUE ? "until batch size reached" : "within " + maxBatchingPeriodInMs + "ms";
            objArr[2] = Integer.valueOf(this.futuresWaitingForMessages.size());
            logger.debug("Waiting for {} requests for messages {}. Total currently waiting: {}", objArr);
        }
        QueueUtils.drain(this.futuresWaitingForMessages, linkedList, i, Duration.ofMillis(maxBatchingPeriodInMs));
        return linkedList;
    }

    private void performBackoff() {
        try {
            long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
            log.debug("Backing off for {}ms", Long.valueOf(errorBackoffTimeInMilliseconds));
            Thread.sleep(errorBackoffTimeInMilliseconds);
        } catch (InterruptedException e) {
            log.debug("Thread interrupted during backoff period");
            Thread.currentThread().interrupt();
        }
    }

    private long getErrorBackoffTimeInMilliseconds() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroLongValue("errorBackoffTimeInMilliseconds", batchingMessageRetrieverProperties::getErrorBackoffTimeInMilliseconds, 10000L);
    }

    private int getbatchSize() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        int safelyGetIntegerValue = PropertyUtils.safelyGetIntegerValue("batchSize", batchingMessageRetrieverProperties::getBatchSize, 1);
        if (safelyGetIntegerValue < 0) {
            return 0;
        }
        return Math.min(safelyGetIntegerValue, 10);
    }

    private ReceiveMessageRequest buildReceiveMessageRequest(int i) {
        ReceiveMessageRequest.Builder waitTimeSeconds = ReceiveMessageRequest.builder().queueUrl(this.queueProperties.getQueueUrl()).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{QueueAttributeName.ALL.toString()}).maxNumberOfMessages(Integer.valueOf(i)).waitTimeSeconds(20);
        try {
            Integer messageVisibilityTimeoutInSeconds = this.properties.getMessageVisibilityTimeoutInSeconds();
            if (messageVisibilityTimeoutInSeconds != null) {
                if (messageVisibilityTimeoutInSeconds.intValue() <= 0) {
                    log.warn("Non-positive visibilityTimeoutInSeconds provided: {}", messageVisibilityTimeoutInSeconds);
                } else {
                    waitTimeSeconds.visibilityTimeout(messageVisibilityTimeoutInSeconds);
                }
            }
        } catch (RuntimeException e) {
            log.error("Error getting visibility timeout, none will be supplied in request", e);
        }
        return (ReceiveMessageRequest) waitTimeSeconds.build();
    }

    private long getMaxBatchingPeriodInMs() {
        BatchingMessageRetrieverProperties batchingMessageRetrieverProperties = this.properties;
        batchingMessageRetrieverProperties.getClass();
        return PropertyUtils.safelyGetLongValue("batchingPeriodInMs", batchingMessageRetrieverProperties::getBatchingPeriodInMs, Long.MAX_VALUE);
    }
}
