package com.redis.riot.stream.kafka;

import com.redis.spring.batch.support.PollableItemReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/riot/stream/kafka/KafkaItemReader.class */
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<ConsumerRecord<K, V>> implements PollableItemReader<ConsumerRecord<K, V>> {
    private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
    private static final String PROPERTY_MUST_BE_PROVIDED = " property must be provided";
    private final List<TopicPartition> topicPartitions;
    private final Properties consumerProperties;
    private Map<TopicPartition, Long> partitionOffsets;
    private KafkaConsumer<K, V> kafkaConsumer;
    private Iterator<ConsumerRecord<K, V>> consumerRecords;
    private boolean saveState = true;
    private boolean open;

    public KafkaItemReader(Properties properties, String str, List<Integer> list) {
        Assert.notNull(properties, "Consumer properties must not be null");
        Assert.isTrue(properties.containsKey("bootstrap.servers"), "bootstrap.servers property must be provided");
        Assert.isTrue(properties.containsKey("group.id"), "group.id property must be provided");
        Assert.isTrue(properties.containsKey("key.deserializer"), "key.deserializer property must be provided");
        Assert.isTrue(properties.containsKey("value.deserializer"), "value.deserializer property must be provided");
        this.consumerProperties = properties;
        Assert.hasLength(str, "Topic name must not be null or empty");
        Assert.notEmpty(list, "At least one partition must be provided");
        this.topicPartitions = new ArrayList();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            this.topicPartitions.add(new TopicPartition(str, it.next().intValue()));
        }
    }

    public void setSaveState(boolean z) {
        this.saveState = z;
    }

    public boolean isSaveState() {
        return this.saveState;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        Map map;
        this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
        this.partitionOffsets = new HashMap();
        Iterator<TopicPartition> it = this.topicPartitions.iterator();
        while (it.hasNext()) {
            this.partitionOffsets.put(it.next(), 0L);
        }
        if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS) && (map = (Map) executionContext.get(TOPIC_PARTITION_OFFSETS)) != null) {
            for (Map.Entry<K, V> entry : map.entrySet()) {
                this.partitionOffsets.put((TopicPartition) entry.getKey(), Long.valueOf(((Long) entry.getValue()).longValue() == 0 ? 0L : ((Long) entry.getValue()).longValue() + 1));
            }
        }
        this.kafkaConsumer.assign(this.topicPartitions);
        Map<TopicPartition, Long> map2 = this.partitionOffsets;
        KafkaConsumer<K, V> kafkaConsumer = this.kafkaConsumer;
        Objects.requireNonNull(kafkaConsumer);
        map2.forEach((v1, v2) -> {
            r1.seek(v1, v2);
        });
        super.open(executionContext);
        this.open = true;
    }

    /* renamed from: poll, reason: merged with bridge method [inline-methods] */
    public ConsumerRecord<K, V> m4poll(long j, TimeUnit timeUnit) {
        if (this.consumerRecords == null || !this.consumerRecords.hasNext()) {
            this.consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(timeUnit.convert(j, TimeUnit.MILLISECONDS))).iterator();
        }
        if (!this.consumerRecords.hasNext()) {
            return null;
        }
        ConsumerRecord<K, V> next = this.consumerRecords.next();
        this.partitionOffsets.put(new TopicPartition(next.topic(), next.partition()), Long.valueOf(next.offset()));
        return next;
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public ConsumerRecord<K, V> m3read() throws Exception {
        throw new IllegalAccessException("read method is not supposed to be called");
    }

    public void update(ExecutionContext executionContext) {
        if (this.saveState) {
            executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap(this.partitionOffsets));
        }
        this.kafkaConsumer.commitSync();
    }

    public void close() {
        super.close();
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
        this.open = false;
    }

    public boolean isOpen() {
        return this.open;
    }
}
