package org.sdase.commons.server.kafka.consumer.strategies.autocommit;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.sdase.commons.server.kafka.consumer.ErrorHandler;
import org.sdase.commons.server.kafka.consumer.KafkaHelper;
import org.sdase.commons.server.kafka.consumer.MessageHandler;
import org.sdase.commons.server.kafka.consumer.StopListenerException;
import org.sdase.commons.server.kafka.consumer.strategies.MessageHandlerContextCloseable;
import org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy;
import org.sdase.commons.server.kafka.exception.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/sdase/commons/server/kafka/consumer/strategies/autocommit/AutocommitMLS.class */
public class AutocommitMLS<K, V> extends MessageListenerStrategy<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AutocommitMLS.class);
    private final MessageHandler<K, V> handler;
    private final ErrorHandler<K, V> errorHandler;
    private String consumerName;

    public AutocommitMLS(MessageHandler<K, V> messageHandler, ErrorHandler<K, V> errorHandler) {
        this.handler = messageHandler;
        this.errorHandler = errorHandler;
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public void processRecords(ConsumerRecords<K, V> consumerRecords, KafkaConsumer<K, V> kafkaConsumer) {
        if (this.consumerName == null) {
            this.consumerName = KafkaHelper.getClientId(kafkaConsumer);
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
            LOGGER.debug("Handling message for {}", consumerRecord.key());
            MessageHandlerContextCloseable messageHandlerContextFor = messageHandlerContextFor(consumerRecord);
            try {
                try {
                    Instant now = Instant.now();
                    this.handler.handle(consumerRecord);
                    addOffsetToCommitOnClose(consumerRecord);
                    Instant now2 = Instant.now();
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("calculated duration {} for message consumed by {} from {}", new Object[]{Long.valueOf(Duration.between(now, now2).toSeconds()), this.consumerName, consumerRecord.topic()});
                    }
                } catch (RuntimeException e) {
                    LOGGER.error("Error while handling record {} in message handler {}", new Object[]{consumerRecord.key(), this.handler.getClass(), e});
                    if (!this.errorHandler.handleError(consumerRecord, e, kafkaConsumer)) {
                        throw new StopListenerException(e);
                    }
                    addOffsetToCommitOnClose(consumerRecord);
                }
                if (messageHandlerContextFor != null) {
                    messageHandlerContextFor.close();
                }
            } catch (Throwable th) {
                if (messageHandlerContextFor != null) {
                    try {
                        messageHandlerContextFor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public void verifyConsumerConfig(Map<String, String> map) {
        if (Boolean.FALSE.equals(Boolean.valueOf(map.getOrDefault("enable.auto.commit", "true")))) {
            throw new ConfigurationException("The strategy should use autocommit but property 'enable.auto.commit' in consumer config is set to 'false'");
        }
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public Map<String, String> forcedConfigToApply() {
        return Collections.singletonMap("enable.auto.commit", "true");
    }
}
