package com.redis.spring.batch;

import com.redis.spring.batch.builder.ScanRedisItemReaderBuilder;
import com.redis.spring.batch.builder.StreamItemReaderBuilder;
import com.redis.spring.batch.support.DataStructure;
import com.redis.spring.batch.support.DataStructureValueReader;
import com.redis.spring.batch.support.JobRunner;
import com.redis.spring.batch.support.KeyDumpValueReader;
import com.redis.spring.batch.support.KeyValue;
import com.redis.spring.batch.support.RedisValueEnqueuer;
import com.redis.spring.batch.support.Utils;
import com.redis.spring.batch.support.ValueReader;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, T extends KeyValue<K, ?>> extends AbstractItemStreamItemReader<T> {
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_SKIP_LIMIT = 3;
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final ItemReader<K> keyReader;
    private final ValueReader<K, T> valueReader;
    private int threads = 1;
    private int chunkSize = 50;
    private int queueCapacity = 10000;
    private Duration queuePollTimeout = DEFAULT_QUEUE_POLL_TIMEOUT;
    private SkipPolicy skipPolicy = DEFAULT_SKIP_POLICY;
    protected BlockingQueue<T> valueQueue;
    protected RedisValueEnqueuer<K, T> enqueuer;
    private JobExecution jobExecution;
    private String name;
    private static final Logger log = LoggerFactory.getLogger(RedisItemReader.class);
    public static final Duration DEFAULT_QUEUE_POLL_TIMEOUT = Duration.ofMillis(100);
    public static final SkipPolicy DEFAULT_SKIP_POLICY = limitCheckingSkipPolicy(3);

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ItemReaderBuilder.class */
    public static class ItemReaderBuilder {
        private final AbstractRedisClient client;

        public ItemReaderBuilder(AbstractRedisClient abstractRedisClient) {
            this.client = abstractRedisClient;
        }

        public ScanRedisItemReaderBuilder<DataStructure<String>, DataStructureValueReader<String, String>> dataStructure() {
            return new ScanRedisItemReaderBuilder<>(this.client, new DataStructureValueReader.DataStructureValueReaderFactory());
        }

        public ScanRedisItemReaderBuilder<KeyValue<String, byte[]>, KeyDumpValueReader<String, String>> keyDump() {
            return new ScanRedisItemReaderBuilder<>(this.client, new KeyDumpValueReader.KeyDumpValueReaderFactory());
        }

        public StreamItemReaderBuilder stream(String str) {
            return new StreamItemReaderBuilder(this.client, str);
        }
    }

    public RedisItemReader(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, ItemReader<K> itemReader, ValueReader<K, T> valueReader) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(jobRepository, "A job repository is required");
        Assert.notNull(platformTransactionManager, "A platform transaction manager is required");
        Assert.notNull(itemReader, "A key reader is required");
        Assert.notNull(valueReader, "A value reader is required");
        this.jobRepository = jobRepository;
        this.transactionManager = platformTransactionManager;
        this.keyReader = itemReader;
        this.valueReader = valueReader;
    }

    public static SkipPolicy limitCheckingSkipPolicy(int i) {
        return new LimitCheckingItemSkipPolicy(i, (Map) Stream.of((Object[]) new Class[]{RedisCommandExecutionException.class, RedisCommandTimeoutException.class, TimeoutException.class}).collect(Collectors.toMap(cls -> {
            return cls;
        }, cls2 -> {
            return true;
        })));
    }

    public void setThreads(int i) {
        Utils.assertPositive(Integer.valueOf(i), "Thread count");
        this.threads = i;
    }

    public void setChunkSize(int i) {
        Utils.assertPositive(Integer.valueOf(i), "Chunk size");
        this.chunkSize = i;
    }

    public void setQueueCapacity(int i) {
        Utils.assertPositive(Integer.valueOf(i), "Value queue capacity");
        this.queueCapacity = i;
    }

    public void setQueuePollTimeout(Duration duration) {
        Utils.assertPositive(duration, "Queue poll timeout");
        this.queuePollTimeout = duration;
    }

    public void setSkipPolicy(SkipPolicy skipPolicy) {
        Assert.notNull(skipPolicy, "A skip policy is required");
        this.skipPolicy = skipPolicy;
    }

    public ValueReader<K, T> getValueReader() {
        return this.valueReader;
    }

    public void setName(String str) {
        this.name = str;
        super.setName(str);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.jobExecution != null) {
            log.info("Already opened, skipping");
            return;
        }
        log.info("Opening {}", this.name);
        this.valueQueue = new LinkedBlockingQueue(this.queueCapacity);
        this.enqueuer = new RedisValueEnqueuer<>(this.valueReader, this.valueQueue);
        Utils.createGaugeCollectionSize("reader.queue.size", this.valueQueue, new Tag[0]);
        JobRunner jobRunner = new JobRunner(this.jobRepository, this.transactionManager);
        FaultTolerantStepBuilder<K, K> faultTolerantStepBuilder = faultTolerantStepBuilder(jobRunner.step(this.name).chunk(this.chunkSize));
        faultTolerantStepBuilder.skipPolicy(this.skipPolicy);
        faultTolerantStepBuilder.reader(this.keyReader).writer(this.enqueuer);
        if (this.threads > 1) {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(this.threads);
            threadPoolTaskExecutor.setCorePoolSize(this.threads);
            threadPoolTaskExecutor.afterPropertiesSet();
            faultTolerantStepBuilder.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.threads);
        }
        try {
            this.jobExecution = jobRunner.runAsync(jobRunner.job(this.name).start(faultTolerantStepBuilder.build()).build());
            super.open(executionContext);
        } catch (Exception e) {
            throw new ItemStreamException("Could not run job for reader " + this.name, e);
        }
    }

    protected FaultTolerantStepBuilder<K, K> faultTolerantStepBuilder(SimpleStepBuilder<K, K> simpleStepBuilder) {
        return simpleStepBuilder.faultTolerant();
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public T m1read() throws Exception {
        T poll;
        do {
            poll = this.valueQueue.poll(this.queuePollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        return poll;
    }

    public List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.valueQueue.drainTo(arrayList, i);
        return arrayList;
    }

    public synchronized void close() {
        if (this.jobExecution == null) {
            log.info("Already closed, skipping");
            return;
        }
        log.info("Closing {}", this.name);
        super.close();
        if (!this.valueQueue.isEmpty()) {
            log.warn("Closing {} with {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.valueQueue.size()));
        }
        this.jobExecution = null;
    }

    public static ItemReaderBuilder client(RedisClient redisClient) {
        return new ItemReaderBuilder(redisClient);
    }

    public static ItemReaderBuilder client(RedisClusterClient redisClusterClient) {
        return new ItemReaderBuilder(redisClusterClient);
    }
}
