package com.jashmore.sqs.broker.concurrent;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jashmore.sqs.broker.MessageBroker;
import com.jashmore.sqs.processor.MessageProcessor;
import com.jashmore.sqs.retriever.MessageRetriever;
import com.jashmore.sqs.util.ResizableSemaphore;
import com.jashmore.sqs.util.properties.PropertyUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.class */
public class ConcurrentMessageBroker implements MessageBroker {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConcurrentMessageBroker.class);
    private final MessageRetriever messageRetriever;
    private final MessageProcessor messageProcessor;
    private final ConcurrentMessageBrokerProperties properties;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker$BrokerStoppedWhileRetrievingMessageException.class */
    public static class BrokerStoppedWhileRetrievingMessageException extends RuntimeException {
        private BrokerStoppedWhileRetrievingMessageException() {
        }
    }

    public ConcurrentMessageBroker(MessageRetriever messageRetriever, MessageProcessor messageProcessor, ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties) {
        this.messageRetriever = messageRetriever;
        this.messageProcessor = messageProcessor;
        this.properties = concurrentMessageBrokerProperties;
    }

    @SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
    public void run() {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ExecutorService buildMessageProcessingExecutorService = buildMessageProcessingExecutorService();
        ResizableSemaphore resizableSemaphore = new ResizableSemaphore(0);
        while (!Thread.currentThread().isInterrupted()) {
            try {
                updateConcurrencyLevelIfChanged(resizableSemaphore);
                try {
                    if (resizableSemaphore.tryAcquire(getNumberOfMillisecondsToObtainPermit(), TimeUnit.MILLISECONDS)) {
                        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                            try {
                                return this.messageRetriever.retrieveMessage();
                            } catch (InterruptedException e) {
                                log.trace("Thread interrupted waiting for a message");
                                throw new BrokerStoppedWhileRetrievingMessageException();
                            }
                        }, newCachedThreadPool);
                        MessageProcessor messageProcessor = this.messageProcessor;
                        messageProcessor.getClass();
                        supplyAsync.thenAcceptAsync(messageProcessor::processMessage, (Executor) buildMessageProcessingExecutorService).whenComplete((r5, th) -> {
                            if (th != null && !(th.getCause() instanceof BrokerStoppedWhileRetrievingMessageException)) {
                                log.error("Error processing message", th.getCause());
                            }
                            resizableSemaphore.release();
                        });
                    }
                } catch (InterruptedException e) {
                    log.debug("Interrupted exception caught while adding more listeners, shutting down!");
                    Thread.currentThread().interrupt();
                }
            } catch (Throwable th2) {
                try {
                    long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
                    log.error("Error thrown while organising threads to process messages. Backing off for {}ms", Long.valueOf(errorBackoffTimeInMilliseconds), th2);
                    Thread.sleep(errorBackoffTimeInMilliseconds);
                } catch (InterruptedException e2) {
                    log.debug("Thread interrupted during backoff period");
                    Thread.currentThread().interrupt();
                }
            }
        }
        log.debug("Shutting down message controller");
        try {
            shutdownConcurrentThreads(newCachedThreadPool, buildMessageProcessingExecutorService);
            log.debug("Message controller shut down");
        } catch (RuntimeException e3) {
            log.error("Exception thrown while waiting for broker to shutdown", e3);
        }
    }

    private void updateConcurrencyLevelIfChanged(ResizableSemaphore resizableSemaphore) {
        int intValue = this.properties.getConcurrencyLevel().intValue();
        Preconditions.checkArgument(intValue >= 0, "concurrencyLevel should be non-negative");
        if (resizableSemaphore.getMaximumPermits() != intValue) {
            log.debug("Changing concurrency from {} to {}", Integer.valueOf(resizableSemaphore.getMaximumPermits()), Integer.valueOf(intValue));
            resizableSemaphore.changePermitSize(intValue);
        }
    }

    private void shutdownConcurrentThreads(ExecutorService executorService, ExecutorService executorService2) {
        executorService.shutdownNow();
        executorService2.shutdown();
        while (true) {
            if (executorService.isTerminated() && executorService2.isTerminated()) {
                return;
            }
            log.debug("Waiting for all threads to finish...");
            try {
                executorService.awaitTermination(1L, TimeUnit.MINUTES);
                executorService2.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                log.warn("Interrupted while waiting for all messages to shutdown, some threads may still be running");
                return;
            }
        }
    }

    private ExecutorService buildMessageProcessingExecutorService() {
        try {
            ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
            String threadNameFormat = this.properties.getThreadNameFormat();
            if (threadNameFormat != null) {
                threadFactoryBuilder.setNameFormat(threadNameFormat);
            }
            return Executors.newCachedThreadPool(threadFactoryBuilder.build());
        } catch (Throwable th) {
            log.error("Error thrown building message processing executor service, returning default");
            return Executors.newCachedThreadPool();
        }
    }

    private long getErrorBackoffTimeInMilliseconds() {
        ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties = this.properties;
        concurrentMessageBrokerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroLongValue("errorBackoffTimeInMilliseconds", concurrentMessageBrokerProperties::getErrorBackoffTimeInMilliseconds, 10000L);
    }

    private long getNumberOfMillisecondsToObtainPermit() {
        ConcurrentMessageBrokerProperties concurrentMessageBrokerProperties = this.properties;
        concurrentMessageBrokerProperties.getClass();
        return PropertyUtils.safelyGetPositiveLongValue("preferredConcurrencyPollingRateInMilliseconds", concurrentMessageBrokerProperties::getPreferredConcurrencyPollingRateInMilliseconds, 10000L);
    }
}
