package com.jashmore.sqs.resolver.batching;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.resolver.MessageResolver;
import com.jashmore.sqs.util.collections.QueueUtils;
import com.jashmore.sqs.util.thread.ThreadUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;

/* loaded from: input_file:com/jashmore/sqs/resolver/batching/BatchingMessageResolver.class */
public class BatchingMessageResolver implements MessageResolver {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchingMessageResolver.class);
    private final QueueProperties queueProperties;
    private final SqsAsyncClient sqsAsyncClient;
    private final BatchingMessageResolverProperties properties;
    private final BlockingQueue<MessageResolutionBean> messagesToBeResolved;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/jashmore/sqs/resolver/batching/BatchingMessageResolver$MessageResolutionBean.class */
    public static final class MessageResolutionBean {
        private final Message message;
        private final CompletableFuture<Object> completableFuture;

        @Generated
        public Message getMessage() {
            return this.message;
        }

        @Generated
        public CompletableFuture<Object> getCompletableFuture() {
            return this.completableFuture;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof MessageResolutionBean)) {
                return false;
            }
            MessageResolutionBean messageResolutionBean = (MessageResolutionBean) obj;
            Message message = getMessage();
            Message message2 = messageResolutionBean.getMessage();
            if (message == null) {
                if (message2 != null) {
                    return false;
                }
            } else if (!message.equals(message2)) {
                return false;
            }
            CompletableFuture<Object> completableFuture = getCompletableFuture();
            CompletableFuture<Object> completableFuture2 = messageResolutionBean.getCompletableFuture();
            return completableFuture == null ? completableFuture2 == null : completableFuture.equals(completableFuture2);
        }

        @Generated
        public int hashCode() {
            Message message = getMessage();
            int hashCode = (1 * 59) + (message == null ? 43 : message.hashCode());
            CompletableFuture<Object> completableFuture = getCompletableFuture();
            return (hashCode * 59) + (completableFuture == null ? 43 : completableFuture.hashCode());
        }

        @Generated
        public String toString() {
            return "BatchingMessageResolver.MessageResolutionBean(message=" + getMessage() + ", completableFuture=" + getCompletableFuture() + ")";
        }

        @Generated
        @ConstructorProperties({"message", "completableFuture"})
        public MessageResolutionBean(Message message, CompletableFuture<Object> completableFuture) {
            this.message = message;
            this.completableFuture = completableFuture;
        }
    }

    public BatchingMessageResolver(QueueProperties queueProperties, SqsAsyncClient sqsAsyncClient) {
        this(queueProperties, sqsAsyncClient, StaticBatchingMessageResolverProperties.builder().bufferingSizeLimit(1).bufferingTime(Duration.ofHours(1L)).build());
    }

    public BatchingMessageResolver(QueueProperties queueProperties, SqsAsyncClient sqsAsyncClient, BatchingMessageResolverProperties batchingMessageResolverProperties) {
        this.queueProperties = queueProperties;
        this.sqsAsyncClient = sqsAsyncClient;
        this.properties = batchingMessageResolverProperties;
        this.messagesToBeResolved = new LinkedBlockingQueue();
    }

    public CompletableFuture<?> resolveMessage(Message message) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        this.messagesToBeResolved.add(new MessageResolutionBean(message, completableFuture));
        return completableFuture;
    }

    public void run() {
        log.info("Started MessageResolver background thread");
        boolean z = true;
        ExecutorService buildExecutorServiceForSendingBatchDeletion = buildExecutorServiceForSendingBatchDeletion();
        ArrayList arrayList = new ArrayList();
        while (!Thread.currentThread().isInterrupted() && z) {
            LinkedList linkedList = new LinkedList();
            try {
                int batchSize = getBatchSize();
                Duration bufferingTime = this.properties.getBufferingTime();
                log.trace("Waiting {}ms for {} messages to be submitted for deletion", Long.valueOf(bufferingTime.toMillis()), Integer.valueOf(batchSize));
                QueueUtils.drain(this.messagesToBeResolved, linkedList, batchSize, bufferingTime);
            } catch (InterruptedException e) {
                log.info("Shutting down MessageResolver");
                z = false;
            }
            if (!linkedList.isEmpty()) {
                log.debug("Sending batch deletion for {} messages", Integer.valueOf(linkedList.size()));
                CompletableFuture<?> submitMessageDeletionBatch = submitMessageDeletionBatch(linkedList, buildExecutorServiceForSendingBatchDeletion);
                arrayList.add(submitMessageDeletionBatch);
                submitMessageDeletionBatch.whenComplete((obj, th) -> {
                    arrayList.remove(submitMessageDeletionBatch);
                });
            }
        }
        try {
            log.debug("Waiting for {} batches to complete", Integer.valueOf(arrayList.size()));
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
            buildExecutorServiceForSendingBatchDeletion.shutdownNow();
            log.info("MessageResolver has been successfully stopped");
        } catch (InterruptedException e2) {
            log.warn("Thread interrupted while waiting for message batches to be completed");
            Thread.currentThread().interrupt();
        } catch (ExecutionException e3) {
            log.error("Error waiting for all message batches to be published", e3.getCause());
        }
    }

    private ExecutorService buildExecutorServiceForSendingBatchDeletion() {
        return Executors.newCachedThreadPool(ThreadUtils.multiNamedThreadFactory(Thread.currentThread().getName() + "-batch-delete"));
    }

    private int getBatchSize() {
        int bufferingSizeLimit = this.properties.getBufferingSizeLimit();
        if (bufferingSizeLimit < 1) {
            return 1;
        }
        return Math.min(bufferingSizeLimit, 10);
    }

    private CompletableFuture<?> submitMessageDeletionBatch(List<MessageResolutionBean> list, ExecutorService executorService) {
        Map map = (Map) list.stream().map(messageResolutionBean -> {
            return new AbstractMap.SimpleImmutableEntry(messageResolutionBean.getMessage().messageId(), messageResolutionBean.getCompletableFuture());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return buildBatchDeleteMessageRequest(list);
        });
        SqsAsyncClient sqsAsyncClient = this.sqsAsyncClient;
        sqsAsyncClient.getClass();
        return supplyAsync.thenComposeAsync(sqsAsyncClient::deleteMessageBatch, (Executor) executorService).whenComplete((deleteMessageBatchResponse, th) -> {
            if (th != null) {
                log.error("Error deleting messages", th);
                map.values().forEach(completableFuture -> {
                    completableFuture.completeExceptionally(th);
                });
                return;
            }
            log.debug("{} messages successfully deleted, {} failed", Integer.valueOf(deleteMessageBatchResponse.successful().size()), Integer.valueOf(deleteMessageBatchResponse.failed().size()));
            deleteMessageBatchResponse.successful().stream().map(deleteMessageBatchResultEntry -> {
                return (CompletableFuture) map.remove(deleteMessageBatchResultEntry.id());
            }).forEach(completableFuture2 -> {
                completableFuture2.complete("completed");
            });
            deleteMessageBatchResponse.failed().forEach(batchResultErrorEntry -> {
                ((CompletableFuture) map.remove(batchResultErrorEntry.id())).completeExceptionally(new RuntimeException(batchResultErrorEntry.message()));
            });
            if (map.isEmpty()) {
                return;
            }
            log.error("{} messages were not handled in the deletion. This could be a bug in the AWS SDK", Integer.valueOf(map.size()));
            map.values().forEach(completableFuture3 -> {
                completableFuture3.completeExceptionally(new RuntimeException("Message not handled by batch delete. This should not happen"));
            });
        });
    }

    private DeleteMessageBatchRequest buildBatchDeleteMessageRequest(List<MessageResolutionBean> list) {
        return (DeleteMessageBatchRequest) DeleteMessageBatchRequest.builder().queueUrl(this.queueProperties.getQueueUrl()).entries((Collection) list.stream().map((v0) -> {
            return v0.getMessage();
        }).map(message -> {
            return (DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().id(message.messageId()).receiptHandle(message.receiptHandle()).build();
        }).collect(Collectors.toSet())).build();
    }
}
