package com.mercateo.sqs.utils.message.handling;

import com.mercateo.sqs.utils.queue.Queue;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.class */
public class LongRunningMessageHandler<I, O> {
    private static final Logger log = LoggerFactory.getLogger(LongRunningMessageHandler.class);
    private final ThreadPoolTaskExecutor messageProcessingExecutor;
    private final MessageHandlingRunnableFactory messageHandlingRunnableFactory;
    private final VisibilityTimeoutExtenderFactory timeoutExtenderFactory;
    private final MessageWorkerWithHeaders<I, O> worker;
    private final Queue queue;
    private final FinishedMessageCallback<I, O> finishedMessageCallback;
    private final SetWithUpperBound<String> messagesInProcessing;
    private final Duration timeUntilVisibilityTimeoutExtension;
    private final ScheduledExecutorService timeoutExtensionExecutor;
    private final ErrorHandlingStrategy<I> errorHandlingStrategy;
    private final Duration awaitShutDown;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LongRunningMessageHandler(@NonNull ScheduledExecutorService scheduledExecutorService, int i, int i2, @NonNull MessageHandlingRunnableFactory messageHandlingRunnableFactory, @NonNull VisibilityTimeoutExtenderFactory visibilityTimeoutExtenderFactory, @NonNull MessageWorkerWithHeaders<I, O> messageWorkerWithHeaders, @NonNull Queue queue, @NonNull FinishedMessageCallback<I, O> finishedMessageCallback, @NonNull Duration duration, @NonNull Duration duration2, @NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {
        if (scheduledExecutorService == null) {
            throw new NullPointerException("timeoutExtensionExecutor is marked non-null but is null");
        }
        if (messageHandlingRunnableFactory == null) {
            throw new NullPointerException("messageHandlingRunnableFactory is marked non-null but is null");
        }
        if (visibilityTimeoutExtenderFactory == null) {
            throw new NullPointerException("timeoutExtenderFactory is marked non-null but is null");
        }
        if (messageWorkerWithHeaders == null) {
            throw new NullPointerException("worker is marked non-null but is null");
        }
        if (queue == null) {
            throw new NullPointerException("queue is marked non-null but is null");
        }
        if (finishedMessageCallback == null) {
            throw new NullPointerException("finishedMessageCallback is marked non-null but is null");
        }
        if (duration == null) {
            throw new NullPointerException("timeUntilVisibilityTimeoutExtension is marked non-null but is null");
        }
        if (duration2 == null) {
            throw new NullPointerException("awaitShutDown is marked non-null but is null");
        }
        if (errorHandlingStrategy == null) {
            throw new NullPointerException("errorHandlingStrategy is marked non-null but is null");
        }
        if (duration.isZero() || duration.isNegative()) {
            throw new IllegalArgumentException("the timeout has to be > 0");
        }
        this.timeoutExtensionExecutor = scheduledExecutorService;
        this.messageHandlingRunnableFactory = messageHandlingRunnableFactory;
        this.timeoutExtenderFactory = visibilityTimeoutExtenderFactory;
        this.worker = messageWorkerWithHeaders;
        this.queue = queue;
        this.finishedMessageCallback = finishedMessageCallback;
        this.timeUntilVisibilityTimeoutExtension = duration;
        this.awaitShutDown = duration2;
        this.errorHandlingStrategy = errorHandlingStrategy;
        this.messageProcessingExecutor = new ThreadPoolTaskExecutor();
        this.messageProcessingExecutor.setMaxPoolSize(i2);
        this.messageProcessingExecutor.setCorePoolSize(i2);
        this.messageProcessingExecutor.setThreadNamePrefix(getClass().getSimpleName() + "-" + queue.getName().getId() + "-");
        this.messageProcessingExecutor.setQueueCapacity(i - 1);
        this.messageProcessingExecutor.afterPropertiesSet();
        this.messagesInProcessing = new SetWithUpperBound<>(i2);
        if (queue.getDefaultVisibilityTimeout().minusSeconds(5L).compareTo(duration) < 0) {
            throw new IllegalStateException("The extension interval of " + duration.getSeconds() + " is too close to the VisibilityTimeout of " + queue.getDefaultVisibilityTimeout().getSeconds() + " seconds of the queue, has to be at least 5 seconds less.");
        }
    }

    public void handleMessage(@NonNull Message<I> message) {
        if (message == null) {
            throw new NullPointerException("message is marked non-null but is null");
        }
        String str = (String) message.getHeaders().get("MessageId", String.class);
        if (this.messagesInProcessing.contains(str)) {
            return;
        }
        this.messagesInProcessing.add(str);
        try {
            ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender = scheduleNewVisibilityTimeoutExtender(message);
            try {
                scheduleNewMessageTask(message, scheduleNewVisibilityTimeoutExtender);
                this.messagesInProcessing.waitUntilAtLeastOneFree();
            } catch (RuntimeException e) {
                this.messagesInProcessing.remove(str);
                scheduleNewVisibilityTimeoutExtender.cancel(true);
                log.error("error while trying to submit message processing task", e);
                throw new RuntimeException(e);
            }
        } catch (RuntimeException e2) {
            this.messagesInProcessing.remove(str);
            log.error("error while trying to schedule timeout extender", e2);
            throw new RuntimeException(e2);
        }
    }

    private void scheduleNewMessageTask(@NonNull Message<I> message, ScheduledFuture<?> scheduledFuture) {
        if (message == null) {
            throw new NullPointerException("message is marked non-null but is null");
        }
        this.messageProcessingExecutor.submit(this.messageHandlingRunnableFactory.get(this.worker, message, this.finishedMessageCallback, this.messagesInProcessing, scheduledFuture, this.errorHandlingStrategy));
    }

    private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull Message<I> message) {
        if (message == null) {
            throw new NullPointerException("message is marked non-null but is null");
        }
        return this.timeoutExtensionExecutor.scheduleAtFixedRate(this.timeoutExtenderFactory.get(message, this.queue), this.timeUntilVisibilityTimeoutExtension.toMillis(), this.timeUntilVisibilityTimeoutExtension.toMillis(), TimeUnit.MILLISECONDS);
    }

    SetWithUpperBound<String> getMessagesInProcessing() {
        return this.messagesInProcessing;
    }

    public void shutdown() {
        this.messageProcessingExecutor.getThreadPoolExecutor().shutdown();
        try {
            this.messageProcessingExecutor.getThreadPoolExecutor().awaitTermination(this.awaitShutDown.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("wait for termination failure after " + this.awaitShutDown + ", call shutdownNow", e);
            this.messageProcessingExecutor.getThreadPoolExecutor().shutdownNow();
        }
    }
}
