package org.springframework.batch.item.redis.support;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/RedisItemReader.class */
public class RedisItemReader<K, T> extends AbstractItemCountingItemStreamItemReader<T> {
    private static final Logger log = LoggerFactory.getLogger(RedisItemReader.class);
    private final ItemReader<K> keyReader;
    private final ItemProcessor<List<? extends K>, List<T>> keyValueProcessor;
    private final BlockingQueue<T> itemQueue;
    private final ExecutorService executor;
    private final List<BatchRunnable<K>> enqueuers;
    private final long queuePollingTimeout;

    public RedisItemReader(ItemReader<K> itemReader, ItemProcessor<List<? extends K>, List<T>> itemProcessor, int i, int i2, int i3, long j) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(itemReader, "A key reader is required.");
        Assert.notNull(itemProcessor, "A key/value processor is required.");
        this.keyReader = itemReader;
        this.keyValueProcessor = itemProcessor;
        this.itemQueue = new LinkedBlockingDeque(i3);
        this.queuePollingTimeout = j;
        this.executor = Executors.newFixedThreadPool(i);
        this.enqueuers = new ArrayList(i);
        for (int i4 = 0; i4 < i; i4++) {
            this.enqueuers.add(new BatchRunnable<>(itemReader, this::write, i2));
        }
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.keyReader instanceof ItemStream) {
            this.keyReader.open(executionContext);
        }
        super.open(executionContext);
    }

    public void close() throws ItemStreamException {
        super.close();
        if (this.keyReader instanceof ItemStream) {
            this.keyReader.close();
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        if (this.keyReader instanceof ItemStream) {
            this.keyReader.update(executionContext);
        }
    }

    protected void doOpen() {
        List<BatchRunnable<K>> list = this.enqueuers;
        ExecutorService executorService = this.executor;
        Objects.requireNonNull(executorService);
        list.forEach((v1) -> {
            r1.submit(v1);
        });
        this.executor.shutdown();
    }

    private void write(List<? extends K> list) throws Exception {
        List list2 = (List) this.keyValueProcessor.process(list);
        if (list2 == null) {
            return;
        }
        this.itemQueue.addAll(list2);
    }

    protected void doClose() throws ItemStreamException {
        if (this.executor.isTerminated()) {
            return;
        }
        this.executor.shutdownNow();
    }

    public void flush() {
        Iterator<BatchRunnable<K>> it = this.enqueuers.iterator();
        while (it.hasNext()) {
            try {
                it.next().flush();
            } catch (Exception e) {
                log.error("Could not flush", e);
            }
        }
    }

    protected T doRead() throws Exception {
        T poll;
        do {
            poll = this.itemQueue.poll(this.queuePollingTimeout, TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!this.executor.isTerminated());
        return poll;
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public ItemProcessor<List<? extends K>, List<T>> getKeyValueProcessor() {
        return this.keyValueProcessor;
    }
}
