package kr.jm.metric.input;

import java.util.Properties;
import java.util.function.Consumer;
import kr.jm.metric.config.input.KafkaInputConfig;
import kr.jm.metric.data.Transfer;
import kr.jm.utils.kafka.client.JMKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:kr/jm/metric/input/KafkaInput.class */
public class KafkaInput extends AbstractInput<KafkaInputConfig> {
    private JMKafkaConsumer kafkaConsumer;
    private Consumer<Transfer<String>> inputConsumer;

    public KafkaInput(KafkaInputConfig kafkaInputConfig) {
        super(kafkaInputConfig);
    }

    private Properties buildProperties(KafkaInputConfig kafkaInputConfig) {
        Properties buildProperties = JMKafkaConsumer.buildProperties(Boolean.valueOf(kafkaInputConfig.isLatest()), kafkaInputConfig.getBootstrapServers(), kafkaInputConfig.getGroupId(), 1000);
        buildProperties.putAll(kafkaInputConfig.getProperties());
        return buildProperties;
    }

    @Override // kr.jm.metric.input.AbstractInput
    protected void startImpl(Consumer<Transfer<String>> consumer) {
        this.inputConsumer = consumer;
        this.kafkaConsumer = new JMKafkaConsumer(buildProperties((KafkaInputConfig) this.inputConfig), this::convertTransfer, ((KafkaInputConfig) this.inputConfig).getTopics()).start();
    }

    private void convertTransfer(ConsumerRecord<String, String> consumerRecord) {
        this.inputConsumer.accept(newTransfer((String) consumerRecord.value(), consumerRecord.timestamp()));
    }

    @Override // kr.jm.metric.input.AbstractInput
    protected void closeImpl() {
        this.kafkaConsumer.shutdown();
    }

    @Override // kr.jm.metric.input.AbstractInput
    public String toString() {
        return "KafkaInput(super=" + super.toString() + ", kafkaConsumer=" + this.kafkaConsumer + ", inputConsumer=" + this.inputConsumer + ")";
    }
}
