package org.sdase.commons.server.kafka.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.sdase.commons.server.kafka.config.ListenerConfig;
import org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/sdase/commons/server/kafka/consumer/MessageListener.class */
public class MessageListener<K, V> implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class);
    private final long configuredPollIntervalMillis;
    private final long maxPollIntervalMillis;
    private final AtomicLong currentPollIntervalMillis;
    private final long pollIntervalFactorOnError;
    private final long topicMissingRetryMs;
    private final MessageListenerStrategy<K, V> strategy;
    private final Collection<String> topics;
    private final String joinedTopics;
    private final AtomicBoolean shouldStop = new AtomicBoolean(false);
    private final KafkaConsumer<K, V> consumer;

    public MessageListener(Collection<String> collection, KafkaConsumer<K, V> kafkaConsumer, ListenerConfig listenerConfig, MessageListenerStrategy<K, V> messageListenerStrategy) {
        this.topics = collection;
        this.joinedTopics = String.join(",", collection);
        this.consumer = kafkaConsumer;
        kafkaConsumer.subscribe(collection);
        this.strategy = messageListenerStrategy;
        this.configuredPollIntervalMillis = listenerConfig.getPollInterval();
        this.topicMissingRetryMs = listenerConfig.getTopicMissingRetryMs();
        this.currentPollIntervalMillis = new AtomicLong(this.configuredPollIntervalMillis);
        this.pollIntervalFactorOnError = listenerConfig.getPollIntervalFactorOnError();
        this.maxPollIntervalMillis = listenerConfig.getMaxPollInterval();
    }

    @Override // java.lang.Runnable
    public void run() {
        waitForTopic(this.joinedTopics);
        while (!this.shouldStop.get()) {
            try {
                ConsumerRecords<K, V> poll = this.consumer.poll(Duration.ofMillis(this.currentPollIntervalMillis.get()));
                if (poll.count() > 0) {
                    LOGGER.debug("Received {} messages from topics [{}]", Integer.valueOf(poll.count()), this.joinedTopics);
                } else {
                    LOGGER.trace("Received {} messages from topics [{}]", Integer.valueOf(poll.count()), this.joinedTopics);
                }
                this.strategy.resetOffsetsToCommitOnClose();
                this.strategy.processRecords(poll, this.consumer);
                configureAfterSuccess();
            } catch (StopListenerException e) {
                LOGGER.error("Stopping listener for topics [{}] due to exception", this.joinedTopics, e);
            } catch (WakeupException e2) {
                if (this.shouldStop.get()) {
                    LOGGER.info("Woke up to stop consuming.");
                } else {
                    LOGGER.warn("Woke up before polling returned but shouldStop is {}.", Boolean.valueOf(this.shouldStop.get()), e2);
                }
            } catch (RuntimeException e3) {
                LOGGER.error("Unauthorized or other runtime exception.", e3);
                configureAfterError();
            }
        }
        LOGGER.info("MessageListener closing Consumer for [{}]", this.joinedTopics);
        try {
            this.strategy.commitOnClose(this.consumer);
        } catch (RuntimeException e4) {
            LOGGER.error("Exception caught while committing offsets on close.", e4);
        } finally {
            doCloseConsumer();
        }
    }

    private void doCloseConsumer() {
        try {
            this.consumer.close();
        } catch (RuntimeException e) {
            LOGGER.error("Exception caught while closing consumer.", e);
        }
    }

    private void waitForTopic(String str) {
        if (this.topicMissingRetryMs > 0) {
            while (!this.shouldStop.get() && !topicsReady()) {
                LOGGER.warn("Topics {} are not ready yet. Waiting {} ms for retry", str, Long.valueOf(this.topicMissingRetryMs));
                try {
                    Thread.sleep(this.topicMissingRetryMs);
                } catch (InterruptedException e) {
                    LOGGER.error("Thread interrupted when waiting for topic to come up");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    boolean topicsReady() {
        Stream<String> stream = this.topics.stream();
        KafkaConsumer<K, V> kafkaConsumer = this.consumer;
        Objects.requireNonNull(kafkaConsumer);
        return !((Set) stream.map(kafkaConsumer::partitionsFor).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet())).isEmpty();
    }

    public void stopConsumer() {
        this.shouldStop.set(true);
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public KafkaConsumer<K, V> getConsumer() {
        return this.consumer;
    }

    public String toString() {
        return "ML ".concat(String.join("", this.topics));
    }

    private void configureAfterSuccess() {
        if (this.currentPollIntervalMillis.getAndSet(this.configuredPollIntervalMillis) != this.configuredPollIntervalMillis) {
            LOGGER.info("Resetting poll interval to {}ms after success", this.currentPollIntervalMillis);
        }
    }

    private void configureAfterError() {
        this.currentPollIntervalMillis.set(Math.min(this.maxPollIntervalMillis, this.currentPollIntervalMillis.get() * this.pollIntervalFactorOnError));
        LOGGER.info("Setting poll interval to {}ms after error", this.currentPollIntervalMillis);
    }
}
