package org.sdase.commons.server.kafka.consumer.strategies.synccommit;

import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.sdase.commons.server.kafka.consumer.ErrorHandler;
import org.sdase.commons.server.kafka.consumer.MessageHandler;
import org.sdase.commons.server.kafka.consumer.strategies.autocommit.AutocommitMLS;
import org.sdase.commons.server.kafka.exception.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/sdase/commons/server/kafka/consumer/strategies/synccommit/SyncCommitMLS.class */
public class SyncCommitMLS<K, V> extends AutocommitMLS<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncCommitMLS.class);
    private final SyncCommitErrorHandler<K, V> syncCommitErrorHandler;

    public SyncCommitMLS(MessageHandler<K, V> messageHandler, ErrorHandler<K, V> errorHandler) {
        this(messageHandler, errorHandler, (runtimeException, consumer) -> {
            if (!(runtimeException instanceof CommitFailedException)) {
                throw runtimeException;
            }
        });
    }

    public SyncCommitMLS(MessageHandler<K, V> messageHandler, ErrorHandler<K, V> errorHandler, SyncCommitErrorHandler<K, V> syncCommitErrorHandler) {
        super(messageHandler, errorHandler);
        this.syncCommitErrorHandler = syncCommitErrorHandler;
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.autocommit.AutocommitMLS, org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public void processRecords(ConsumerRecords<K, V> consumerRecords, KafkaConsumer<K, V> kafkaConsumer) {
        super.processRecords(consumerRecords, kafkaConsumer);
        commitSync(kafkaConsumer);
    }

    private void commitSync(KafkaConsumer<K, V> kafkaConsumer) {
        try {
            kafkaConsumer.commitSync();
        } catch (RuntimeException e) {
            LOGGER.error("Commit failed", e);
            this.syncCommitErrorHandler.handleError(e, kafkaConsumer);
        }
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.autocommit.AutocommitMLS, org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public void verifyConsumerConfig(Map<String, String> map) {
        if (Boolean.TRUE.equals(Boolean.valueOf(map.getOrDefault("enable.auto.commit", "true")))) {
            throw new ConfigurationException("The strategy should NOT use autocommit but property 'enable.auto.commit' in consumer config is set to 'true' (which is the default and must be disabled).");
        }
    }

    @Override // org.sdase.commons.server.kafka.consumer.strategies.autocommit.AutocommitMLS, org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy
    public Map<String, String> forcedConfigToApply() {
        return Collections.singletonMap("enable.auto.commit", "false");
    }
}
