package kr.jm.utils.kafka.client;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import kr.jm.utils.datastructure.JMCollections;
import kr.jm.utils.enums.OS;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.helper.JMLog;
import kr.jm.utils.helper.JMThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/kafka/client/JMKafkaConsumer.class */
public class JMKafkaConsumer extends KafkaConsumer<String, String> {
    private static final Logger log = LoggerFactory.getLogger(JMKafkaConsumer.class);
    private volatile AtomicBoolean closed;
    private volatile AtomicBoolean paused;
    private int pollIntervalMs;
    private String[] topics;
    private String groupId;
    private ExecutorService kafkaConsumerThreadPool;
    private RecordConsumer recordConsumer;

    @FunctionalInterface
    /* loaded from: input_file:kr/jm/utils/kafka/client/JMKafkaConsumer$RecordConsumer.class */
    public interface RecordConsumer extends Consumer<ConsumerRecord<String, String>> {
    }

    public JMKafkaConsumer(Properties properties, RecordConsumer recordConsumer, String... strArr) {
        super(properties, Serdes.String().deserializer(), Serdes.String().deserializer());
        this.closed = new AtomicBoolean(true);
        this.paused = new AtomicBoolean();
        this.pollIntervalMs = 100;
        this.kafkaConsumerThreadPool = JMThread.newSingleThreadPool();
        this.recordConsumer = recordConsumer;
        this.topics = strArr;
        this.groupId = properties.getProperty("group.id");
        subscribe(strArr);
    }

    public JMKafkaConsumer(String str, String str2, RecordConsumer recordConsumer, String... strArr) {
        this(false, str, str2, recordConsumer, strArr);
    }

    public JMKafkaConsumer(boolean z, String str, String str2, RecordConsumer recordConsumer, String... strArr) {
        this(z, str, str2, 1000, recordConsumer, strArr);
    }

    public JMKafkaConsumer(boolean z, String str, String str2, int i, RecordConsumer recordConsumer, String... strArr) {
        this(buildProperties(Boolean.valueOf(z), str, str2, Integer.valueOf(i)), recordConsumer, strArr);
    }

    public static Properties buildProperties(final Boolean bool, final String str, final String str2, final Integer num) {
        return new Properties() { // from class: kr.jm.utils.kafka.client.JMKafkaConsumer.1
            {
                put("bootstrap.servers", str);
                Optional.ofNullable(bool).ifPresent(bool2 -> {
                    put("auto.offset.reset", bool2.booleanValue() ? "latest" : "earliest");
                });
                Optional.ofNullable(str2).ifPresent(str3 -> {
                    put("group.id", str3);
                });
                Optional.ofNullable(num).ifPresent(num2 -> {
                    put("auto.commit.interval.ms", num2);
                });
            }
        };
    }

    public void subscribe(String... strArr) {
        subscribe(Arrays.asList(strArr));
    }

    public JMKafkaConsumer start() {
        JMLog.info(log, "start", new Object[]{this.groupId, Arrays.asList(this.topics), Integer.valueOf(this.pollIntervalMs)});
        JMThread.runAsync(this::consume, this.kafkaConsumerThreadPool);
        return this;
    }

    private void consume() {
        try {
            Thread.currentThread().setName("JMKafkaConsumer-" + OS.getHostname() + "-" + this.groupId);
            this.closed.set(false);
            while (isRunning()) {
                handleConsumerRecords(poll(this.pollIntervalMs));
                checkPauseStatus();
            }
        } catch (Exception e) {
            if (isRunning()) {
                JMExceptionManager.handleExceptionAndThrowRuntimeEx(log, e, "consume#WakeupException", new Object[0]);
            }
        } finally {
            close();
        }
    }

    private void handleConsumerRecords(ConsumerRecords<String, String> consumerRecords) {
        log.debug("Consume Timestamp = {}, Record Count = {}", Long.valueOf(System.currentTimeMillis()), Integer.valueOf(consumerRecords.count()));
        try {
            consumerRecords.forEach(this.recordConsumer);
        } catch (Exception e) {
            JMExceptionManager.logException(log, e, "handleConsumerRecords", new Object[]{consumerRecords});
        }
    }

    private void checkPauseStatus() {
        while (isPaused()) {
            JMThread.sleep(100L);
        }
    }

    public boolean isRunning() {
        return !this.closed.get();
    }

    public boolean isPaused() {
        return this.paused.get();
    }

    public void setPaused(boolean z) {
        JMLog.info(log, "setPaused", new Object[]{Boolean.valueOf(z)});
        this.paused.set(z);
    }

    public void shutdown() {
        this.closed.set(true);
        wakeup();
        this.kafkaConsumerThreadPool.shutdown();
        while (!this.kafkaConsumerThreadPool.isTerminated()) {
            JMThread.sleep(100L);
        }
    }

    public int getPollIntervalMs() {
        return this.pollIntervalMs;
    }

    public void setPollIntervalMs(int i) {
        this.pollIntervalMs = i;
    }

    public List<String> getTopicList() {
        return JMCollections.buildList(this.topics);
    }

    public String getGroupId() {
        return this.groupId;
    }
}
