package org.springframework.batch.item.redis.support;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.redis.support.AbstractPollableItemReader;
import org.springframework.batch.item.redis.support.KeyValue;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyValueItemReader.class */
public abstract class AbstractKeyValueItemReader<K, V, C extends StatefulConnection<K, V>, T extends KeyValue<K, ?>> extends AbstractPollableItemReader<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractKeyValueItemReader.class);
    private final ItemReader<K> keyReader;
    protected final GenericObjectPool<C> pool;
    protected final Function<C, BaseRedisAsyncCommands<K, V>> commands;
    private final int threads;
    private final int chunkSize;
    private final BlockingQueue<T> queue;
    private final Function<SimpleStepBuilder<K, K>, SimpleStepBuilder<K, K>> stepBuilderProvider;
    private JobExecution jobExecution;
    private String name;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyValueItemReader$KeyValueItemReaderBuilder.class */
    public static abstract class KeyValueItemReaderBuilder<R extends AbstractKeyValueItemReader, B extends KeyValueItemReaderBuilder<R, B>> extends AbstractPollableItemReader.PollableItemReaderBuilder<B> {
        public static final int DEFAULT_QUEUE_CAPACITY = 1000;
        public static final int DEFAULT_CHUNK_SIZE = 50;
        public static final int DEFAULT_THREAD_COUNT = 1;
        protected int chunkSize = 50;
        protected int threadCount = 1;
        protected int queueCapacity = 1000;

        public B chunkSize(int i) {
            this.chunkSize = i;
            return this;
        }

        public B threadCount(int i) {
            this.threadCount = i;
            return this;
        }

        public B queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public abstract R build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyValueItemReader(Duration duration, ItemReader<K> itemReader, GenericObjectPool<C> genericObjectPool, Function<C, BaseRedisAsyncCommands<K, V>> function, int i, int i2, int i3) {
        this(duration, itemReader, genericObjectPool, function, i, i2, i3, Function.identity());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyValueItemReader(Duration duration, ItemReader<K> itemReader, GenericObjectPool<C> genericObjectPool, Function<C, BaseRedisAsyncCommands<K, V>> function, int i, int i2, int i3, Function<SimpleStepBuilder<K, K>, SimpleStepBuilder<K, K>> function2) {
        super(duration);
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(itemReader, "A key reader is required.");
        Assert.notNull(genericObjectPool, "A connection pool is required");
        Assert.notNull(function, "A command function is required");
        Assert.isTrue(i > 0, "Chunk size must be greater than zero.");
        Assert.isTrue(i2 > 0, "Thread count must be greater than zero.");
        Assert.isTrue(i3 > 0, "Queue capacity must be greater than zero.");
        this.keyReader = itemReader;
        this.pool = genericObjectPool;
        this.commands = function;
        this.threads = i2;
        this.chunkSize = i;
        this.queue = new LinkedBlockingDeque(i3);
        this.stepBuilderProvider = function2;
    }

    public void setName(String str) {
        this.name = str;
        super.setName(str);
    }

    @Override // org.springframework.batch.item.redis.support.PollableItemReader
    public T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.AbstractPollableItemReader
    public boolean isRunning() {
        return super.isRunning() && this.jobExecution.isRunning();
    }

    @Override // org.springframework.batch.item.redis.support.AbstractPollableItemReader
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        log.debug("Opening {}", this.name);
        JobFactory jobFactory = new JobFactory();
        try {
            jobFactory.afterPropertiesSet();
            SimpleStepBuilder writer = this.stepBuilderProvider.apply(jobFactory.step(this.name + "-step").chunk(this.chunkSize)).reader(keyReader()).writer(this::addToQueue);
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(this.threads);
            threadPoolTaskExecutor.setCorePoolSize(this.threads);
            threadPoolTaskExecutor.afterPropertiesSet();
            Job build = jobFactory.job(this.name + "-job").start(writer.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.threads).build()).build();
            MetricsUtils.createGaugeCollectionSize("reader.queue.size", this.queue, new Tag[0]);
            try {
                this.jobExecution = jobFactory.getAsyncLauncher().run(build, new JobParameters());
                while (!this.jobExecution.isRunning()) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        throw new ItemStreamException("Interrupted while waiting for job to run");
                    }
                }
                super.open(executionContext);
                log.debug("Opened {}", this.name);
            } catch (Exception e2) {
                throw new ItemStreamException("Could not run job " + build.getName());
            }
        } catch (Exception e3) {
            throw new ItemStreamException("Failed to initialize the reader", e3);
        }
    }

    private ItemReader<? extends K> keyReader() {
        return this.threads > 1 ? new SynchronizedItemReader(this.keyReader) : this.keyReader;
    }

    @Override // org.springframework.batch.item.redis.support.AbstractPollableItemReader
    public void close() {
        log.debug("Closing {}", this.name);
        super.close();
        if (!this.queue.isEmpty()) {
            log.warn("Closing {} with {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.queue.size()));
        }
        while (this.jobExecution.isRunning()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                throw new ItemStreamException("Interrupted while waiting for job to finish running");
            }
        }
        this.jobExecution = null;
        log.debug("Closed {}", this.name);
    }

    public abstract List<T> values(List<? extends K> list) throws Exception;

    private void addToQueue(List<? extends K> list) throws Exception {
        for (T t : values(list)) {
            this.queue.removeIf(keyValue -> {
                return keyValue.getKey().equals(t.getKey());
            });
            this.queue.put(t);
        }
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }
}
