package com.jashmore.sqs.container;

import com.jashmore.sqs.broker.MessageBroker;
import com.jashmore.sqs.processor.MessageProcessor;
import com.jashmore.sqs.resolver.MessageResolver;
import com.jashmore.sqs.retriever.MessageRetriever;
import com.jashmore.sqs.util.Preconditions;
import com.jashmore.sqs.util.properties.PropertyUtils;
import com.jashmore.sqs.util.thread.ThreadUtils;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.utils.StringUtils;

/* loaded from: input_file:com/jashmore/sqs/container/CoreMessageListenerContainer.class */
public class CoreMessageListenerContainer implements MessageListenerContainer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CoreMessageListenerContainer.class);
    private final String identifier;
    private final Supplier<MessageBroker> messageBrokerSupplier;
    private final Supplier<MessageRetriever> messageRetrieverSupplier;
    private final Supplier<MessageProcessor> messageProcessorSupplier;
    private final Supplier<MessageResolver> messageResolverSupplier;
    private final CoreMessageListenerContainerProperties properties;
    private ExecutorService executorService;
    private CompletableFuture<?> containerFuture;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:com/jashmore/sqs/container/CoreMessageListenerContainer$BlockingRunnable.class */
    public interface BlockingRunnable {
        void run() throws InterruptedException;
    }

    public CoreMessageListenerContainer(String str, Supplier<MessageBroker> supplier, Supplier<MessageRetriever> supplier2, Supplier<MessageProcessor> supplier3, Supplier<MessageResolver> supplier4) {
        this(str, supplier, supplier2, supplier3, supplier4, StaticCoreMessageListenerContainerProperties.builder().build());
    }

    public CoreMessageListenerContainer(String str, Supplier<MessageBroker> supplier, Supplier<MessageRetriever> supplier2, Supplier<MessageProcessor> supplier3, Supplier<MessageResolver> supplier4, CoreMessageListenerContainerProperties coreMessageListenerContainerProperties) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "identifier should not be empty");
        this.identifier = str;
        this.messageBrokerSupplier = supplier;
        this.messageRetrieverSupplier = supplier2;
        this.messageProcessorSupplier = supplier3;
        this.messageResolverSupplier = supplier4;
        this.properties = coreMessageListenerContainerProperties;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public synchronized CompletableFuture<?> start() {
        if (this.executorService != null) {
            log.info("Container '{}' has already been started. No action taken", this.identifier);
        } else {
            log.info("Container '{}' is being started", this.identifier);
            this.executorService = Executors.newSingleThreadExecutor(ThreadUtils.singleNamedThreadFactory(this.identifier + "-message-container"));
            this.containerFuture = CompletableFuture.runAsync(this::runContainer, this.executorService);
        }
        return this.containerFuture;
    }

    public synchronized void stop() {
        stop(Duration.of(1L, ChronoUnit.MINUTES));
    }

    public synchronized void stop(Duration duration) {
        long j = duration.get(ChronoUnit.SECONDS);
        log.info("Container '{}' is being stopped and will wait {} seconds", this.identifier, Long.valueOf(j));
        try {
            try {
                if (this.executorService != null) {
                    this.executorService.shutdownNow();
                    if (!this.executorService.awaitTermination(j, TimeUnit.SECONDS)) {
                        log.error("Container '{}' did not shutdown in timeout", this.identifier);
                    }
                }
                this.executorService = null;
                this.containerFuture = null;
            } catch (InterruptedException e) {
                log.warn("Thread interrupted waiting for container to stop. All threads may not be successfully completed");
                Thread.currentThread().interrupt();
                this.executorService = null;
                this.containerFuture = null;
            }
        } catch (Throwable th) {
            this.executorService = null;
            this.containerFuture = null;
            throw th;
        }
    }

    void runContainer() {
        try {
            MessageRetriever messageRetriever = this.messageRetrieverSupplier.get();
            MessageResolver messageResolver = this.messageResolverSupplier.get();
            MessageBroker messageBroker = this.messageBrokerSupplier.get();
            MessageProcessor messageProcessor = this.messageProcessorSupplier.get();
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(ThreadUtils.singleNamedThreadFactory(this.identifier + "-message-broker"));
            BlockingRunnable startupMessageResolver = startupMessageResolver(messageResolver);
            ExecutorService buildMessageProcessingExecutorService = buildMessageProcessingExecutorService();
            LinkedList linkedList = new LinkedList();
            linkedList.getClass();
            BlockingRunnable startupMessageRetriever = startupMessageRetriever(messageRetriever, (v1) -> {
                r2.addAll(v1);
            });
            log.info("Container '{}' is beginning to process messages", this.identifier);
            processMessagesFromRetriever(messageBroker, messageRetriever, messageProcessor, messageResolver, newSingleThreadExecutor, buildMessageProcessingExecutorService);
            log.info("Container '{}' is being shutdown", this.identifier);
            log.debug("Container '{}' is shutting down MessageRetriever", this.identifier);
            startupMessageRetriever.run();
            log.debug("Container '{}' has stopped the MessageRetriever", this.identifier);
            if (!linkedList.isEmpty() && shouldProcessAnyExtraRetrievedMessagesOnShutdown()) {
                log.info("Container '{}' is processing {} extra messages before shutdown", this.identifier, Integer.valueOf(linkedList.size()));
                processExtraMessages(messageBroker, messageProcessor, messageResolver, newSingleThreadExecutor, buildMessageProcessingExecutorService, linkedList);
            }
            log.debug("Container '{}' is shutting down MessageProcessor threads", this.identifier);
            shutdownMessageProcessingThreads(buildMessageProcessingExecutorService);
            log.debug("Container '{}' has shutdown the MessageProcessor threads", this.identifier);
            log.debug("Container '{}' is shutting down MessageResolver", this.identifier);
            startupMessageResolver.run();
            log.debug("Container '{}' has shutdown the MessageResolver", this.identifier);
            log.debug("Container '{}' is shutting down MessageBroker", this.identifier);
            shutdownMessageBroker(newSingleThreadExecutor);
            log.debug("Container '{}' has shutdown the MessageBroker", this.identifier);
            log.info("Container '{}' has stopped", this.identifier);
        } catch (InterruptedException e) {
            log.error("Container '{}' was interrupted during the shutdown process.", this.identifier);
        } catch (RuntimeException e2) {
            log.error("Unexpected error trying to start/stop the container", e2);
        }
    }

    private void processMessagesFromRetriever(MessageBroker messageBroker, MessageRetriever messageRetriever, MessageProcessor messageProcessor, MessageResolver messageResolver, ExecutorService executorService, ExecutorService executorService2) throws InterruptedException {
        try {
            runUntilInterruption(executorService, () -> {
                messageRetriever.getClass();
                messageBroker.processMessages(executorService2, messageRetriever::retrieveMessage, message -> {
                    return messageProcessor.processMessage(message, () -> {
                        return messageResolver.resolveMessage(message);
                    });
                });
            });
        } catch (ExecutionException e) {
            log.error("Error processing messages", e.getCause());
        }
    }

    private void processExtraMessages(MessageBroker messageBroker, MessageProcessor messageProcessor, MessageResolver messageResolver, ExecutorService executorService, ExecutorService executorService2, Queue<Message> queue) throws InterruptedException {
        try {
            runUntilInterruption(executorService, () -> {
                messageBroker.processMessages(executorService2, () -> {
                    return !queue.isEmpty();
                }, () -> {
                    return CompletableFuture.completedFuture(queue.poll());
                }, message -> {
                    return messageProcessor.processMessage(message, () -> {
                        return messageResolver.resolveMessage(message);
                    });
                });
            });
        } catch (ExecutionException e) {
            log.error("Exception thrown processing extra messages", e.getCause());
        }
    }

    private void runUntilInterruption(ExecutorService executorService, BlockingRunnable blockingRunnable) throws InterruptedException, ExecutionException {
        CompletableFuture completableFuture = new CompletableFuture();
        Future<?> future = null;
        try {
            future = executorService.submit(() -> {
                try {
                    blockingRunnable.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    completableFuture.complete(false);
                }
            });
            future.get();
        } catch (InterruptedException e) {
            future.cancel(true);
        }
        completableFuture.get();
    }

    private void shutdownMessageProcessingThreads(ExecutorService executorService) throws InterruptedException {
        if (shouldInterruptMessageProcessingThreadsOnShutdown()) {
            log.debug("Container '{}' is interrupting and then waiting for all message processing threads to finish", this.identifier);
            executorService.shutdownNow();
        } else {
            log.debug("Container '{}' is waiting for all message processing threads to finish", this.identifier);
            executorService.shutdown();
        }
        int messageProcessingShutdownTimeoutInSeconds = getMessageProcessingShutdownTimeoutInSeconds();
        if (executorService.awaitTermination(messageProcessingShutdownTimeoutInSeconds, TimeUnit.SECONDS)) {
            return;
        }
        log.error("Container '{}' did not shutdown MessageProcessor threads within {} seconds", this.identifier, Integer.valueOf(messageProcessingShutdownTimeoutInSeconds));
    }

    private void shutdownMessageBroker(ExecutorService executorService) throws InterruptedException {
        executorService.shutdownNow();
        int messageBrokerShutdownTimeoutInSeconds = getMessageBrokerShutdownTimeoutInSeconds();
        if (executorService.awaitTermination(messageBrokerShutdownTimeoutInSeconds, TimeUnit.SECONDS)) {
            return;
        }
        log.error("Container '{}' did not shutdown MessageBroker within {} seconds", getIdentifier(), Integer.valueOf(messageBrokerShutdownTimeoutInSeconds));
    }

    private BlockingRunnable startupMessageRetriever(MessageRetriever messageRetriever, Consumer<List<Message>> consumer) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(ThreadUtils.singleNamedThreadFactory(getIdentifier() + "-message-retriever"));
        messageRetriever.getClass();
        CompletableFuture.supplyAsync(messageRetriever::run, newSingleThreadExecutor).thenAccept((Consumer) consumer);
        return () -> {
            newSingleThreadExecutor.shutdownNow();
            int messageRetrieverShutdownTimeoutInSeconds = getMessageRetrieverShutdownTimeoutInSeconds();
            if (newSingleThreadExecutor.awaitTermination(messageRetrieverShutdownTimeoutInSeconds, TimeUnit.SECONDS)) {
                return;
            }
            log.error("Container '{}' did not shutdown MessageRetriever within {} seconds", getIdentifier(), Integer.valueOf(messageRetrieverShutdownTimeoutInSeconds));
        };
    }

    private BlockingRunnable startupMessageResolver(MessageResolver messageResolver) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(ThreadUtils.singleNamedThreadFactory(getIdentifier() + "-message-resolver"));
        messageResolver.getClass();
        CompletableFuture.runAsync(messageResolver::run, newSingleThreadExecutor);
        return () -> {
            newSingleThreadExecutor.shutdownNow();
            long messageResolverShutdownTimeoutInSeconds = getMessageResolverShutdownTimeoutInSeconds();
            if (newSingleThreadExecutor.awaitTermination(getMessageResolverShutdownTimeoutInSeconds(), TimeUnit.SECONDS)) {
                return;
            }
            log.error("Container '{}' did not shutdown MessageResolver within {} seconds", getIdentifier(), Long.valueOf(messageResolverShutdownTimeoutInSeconds));
        };
    }

    private ExecutorService buildMessageProcessingExecutorService() {
        return Executors.newCachedThreadPool(ThreadUtils.multiNamedThreadFactory(getIdentifier() + "-message-processing"));
    }

    private int getMessageProcessingShutdownTimeoutInSeconds() {
        CoreMessageListenerContainerProperties coreMessageListenerContainerProperties = this.properties;
        coreMessageListenerContainerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("messageProcessingShutdownTimeoutInSeconds", coreMessageListenerContainerProperties::getMessageProcessingShutdownTimeoutInSeconds, 60);
    }

    private int getMessageBrokerShutdownTimeoutInSeconds() {
        CoreMessageListenerContainerProperties coreMessageListenerContainerProperties = this.properties;
        coreMessageListenerContainerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("messageBrokerShutdownTimeoutInSeconds", coreMessageListenerContainerProperties::getMessageBrokerShutdownTimeoutInSeconds, 60);
    }

    private int getMessageRetrieverShutdownTimeoutInSeconds() {
        CoreMessageListenerContainerProperties coreMessageListenerContainerProperties = this.properties;
        coreMessageListenerContainerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("messageRetrieverShutdownTimeoutInSeconds", coreMessageListenerContainerProperties::getMessageRetrieverShutdownTimeoutInSeconds, 60);
    }

    private int getMessageResolverShutdownTimeoutInSeconds() {
        CoreMessageListenerContainerProperties coreMessageListenerContainerProperties = this.properties;
        coreMessageListenerContainerProperties.getClass();
        return PropertyUtils.safelyGetPositiveOrZeroIntegerValue("messageResolverShutdownTimeoutInSeconds", coreMessageListenerContainerProperties::getMessageResolverShutdownTimeoutInSeconds, 60);
    }

    private boolean shouldInterruptMessageProcessingThreadsOnShutdown() {
        return ((Boolean) Optional.ofNullable(this.properties.shouldInterruptThreadsProcessingMessagesOnShutdown()).orElse(Boolean.valueOf(CoreMessageListenerContainerConstants.DEFAULT_SHOULD_INTERRUPT_MESSAGE_PROCESSING_ON_SHUTDOWN))).booleanValue();
    }

    private boolean shouldProcessAnyExtraRetrievedMessagesOnShutdown() {
        return ((Boolean) Optional.ofNullable(this.properties.shouldProcessAnyExtraRetrievedMessagesOnShutdown()).orElse(Boolean.valueOf(CoreMessageListenerContainerConstants.DEFAULT_SHOULD_PROCESS_EXTRA_MESSAGES_ON_SHUTDOWN))).booleanValue();
    }
}
