package com.jashmore.sqs.broker.concurrent;

import com.jashmore.sqs.broker.MessageBroker;
import com.jashmore.sqs.util.ResizableSemaphore;
import com.jashmore.sqs.util.properties.PropertyUtils;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;

/* loaded from: input_file:com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.class */
public class ConcurrentMessageBroker implements MessageBroker {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConcurrentMessageBroker.class);
    private final ConcurrentMessageBrokerProperties properties;
    private final ResizableSemaphore concurrentMessagesBeingProcessedSemaphore = new ResizableSemaphore(0);

    public ConcurrentMessageBroker(ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties) {
        this.properties = concurrentMessageBrokerProperties;
    }

    public void processMessages(ExecutorService executorService, BooleanSupplier booleanSupplier, Supplier<CompletableFuture<Message>> supplier, Function<Message, CompletableFuture<?>> function) throws InterruptedException {
        log.debug("Beginning processing of messages");
        while (!Thread.currentThread().isInterrupted() && booleanSupplier.getAsBoolean()) {
            try {
                updateConcurrencyLevelIfChanged(this.concurrentMessagesBeingProcessedSemaphore);
                if (this.concurrentMessagesBeingProcessedSemaphore.tryAcquire(getPermitWaitTime().toMillis(), TimeUnit.MILLISECONDS)) {
                    try {
                        CompletableFuture<Message> completableFuture = supplier.get();
                        function.getClass();
                        completableFuture.thenComposeAsync((v1) -> {
                            return r1.apply(v1);
                        }, (Executor) executorService).whenComplete((BiConsumer<? super U, ? super Throwable>) (obj, th) -> {
                            if (th != null && !(th.getCause() instanceof CancellationException)) {
                                log.error("Error processing message", th.getCause());
                            }
                            this.concurrentMessagesBeingProcessedSemaphore.release();
                        });
                    } catch (RuntimeException e) {
                        this.concurrentMessagesBeingProcessedSemaphore.release();
                        throw e;
                        break;
                    }
                }
            } catch (RuntimeException e2) {
                ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties = this.properties;
                concurrentMessageBrokerProperties.getClass();
                long millis = PropertyUtils.safelyGetPositiveOrZeroDuration("errorBackoffTime", concurrentMessageBrokerProperties::getErrorBackoffTime, ConcurrentMessageBrokerConstants.DEFAULT_BACKOFF_TIME).toMillis();
                log.error("Error thrown while organising threads to process messages. Backing off for {}ms", Long.valueOf(millis), e2);
                Thread.sleep(millis);
            }
        }
        log.debug("Ending processing of messages");
    }

    private Duration getPermitWaitTime() {
        Duration concurrencyPollingRate = this.properties.getConcurrencyPollingRate();
        return (concurrencyPollingRate == null || concurrencyPollingRate.isNegative()) ? ConcurrentMessageBrokerConstants.DEFAULT_CONCURRENCY_POLLING : concurrencyPollingRate;
    }

    private void updateConcurrencyLevelIfChanged(ResizableSemaphore resizableSemaphore) {
        int concurrencyLevel = getConcurrencyLevel();
        if (resizableSemaphore.getMaximumPermits() != concurrencyLevel) {
            log.info("Changing concurrency from {} to {}", Integer.valueOf(resizableSemaphore.getMaximumPermits()), Integer.valueOf(concurrencyLevel));
            resizableSemaphore.changePermitSize(concurrencyLevel);
        }
    }

    private int getConcurrencyLevel() {
        ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties = this.properties;
        concurrentMessageBrokerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("concurrencyLevel", concurrentMessageBrokerProperties::getConcurrencyLevel, 0);
    }
}
