package org.springframework.batch.item.redis.support;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.listener.MulticasterBatchListener;
import org.springframework.batch.core.metrics.BatchMetrics;
import org.springframework.batch.core.step.item.Chunk;
import org.springframework.batch.core.step.item.ChunkProvider;
import org.springframework.batch.core.step.item.SkipOverflowException;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/FlushingChunkProvider.class */
public class FlushingChunkProvider<I> implements ChunkProvider<I> {
    private static final Logger log = LoggerFactory.getLogger(FlushingChunkProvider.class);
    private final PollableItemReader<? extends I> itemReader;
    private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<>();
    private final RepeatOperations repeatOperations;
    private final long flushingInterval;
    private final long idleTimeout;
    private long lastActivity;

    public FlushingChunkProvider(PollableItemReader<? extends I> pollableItemReader, RepeatOperations repeatOperations, Duration duration, Duration duration2) {
        Assert.notNull(pollableItemReader, "Item reader is required.");
        Assert.notNull(repeatOperations, "Repeat operations are required.");
        Assert.notNull(duration, "Flushing interval is required.");
        this.itemReader = pollableItemReader;
        this.repeatOperations = repeatOperations;
        this.flushingInterval = duration.toMillis();
        this.idleTimeout = duration2 == null ? Long.MAX_VALUE : duration2.toMillis();
    }

    public void setListeners(List<? extends StepListener> list) {
        Iterator<? extends StepListener> it = list.iterator();
        while (it.hasNext()) {
            registerListener(it.next());
        }
    }

    public void registerListener(StepListener stepListener) {
        this.listener.register(stepListener);
    }

    public Chunk<I> provide(StepContribution stepContribution) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastActivity == 0) {
            this.lastActivity = currentTimeMillis;
        }
        Chunk<I> chunk = new Chunk<>();
        this.repeatOperations.iterate(repeatContext -> {
            long currentTimeMillis2 = this.flushingInterval - (System.currentTimeMillis() - currentTimeMillis);
            if (currentTimeMillis2 < 0) {
                return RepeatStatus.FINISHED;
            }
            Timer.Sample start = Timer.start(Metrics.globalRegistry);
            try {
                I poll = poll(currentTimeMillis2);
                if (poll == null) {
                    long currentTimeMillis3 = System.currentTimeMillis() - this.lastActivity;
                    if (currentTimeMillis3 > this.idleTimeout) {
                        log.debug("Idle for {} ms - End of stream", Long.valueOf(currentTimeMillis3));
                        chunk.setEnd();
                    }
                    return RepeatStatus.CONTINUABLE;
                }
                stopTimer(start, stepContribution.getStepExecution(), "SUCCESS");
                chunk.add(poll);
                stepContribution.incrementReadCount();
                this.lastActivity = System.currentTimeMillis();
                return RepeatStatus.CONTINUABLE;
            } catch (SkipOverflowException e) {
                stopTimer(start, stepContribution.getStepExecution(), "FAILURE");
                return RepeatStatus.FINISHED;
            }
        });
        return chunk;
    }

    private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String str) {
        sample.stop(BatchMetrics.createTimer("item.read", "Item reading duration", new Tag[]{Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of("step.name", stepExecution.getStepName()), Tag.of("status", str)}));
    }

    protected I poll(long j) throws InterruptedException {
        try {
            this.listener.beforeRead();
            I poll = this.itemReader.poll(j, TimeUnit.MILLISECONDS);
            if (poll != null) {
                this.listener.afterRead(poll);
            }
            return poll;
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug(e.getMessage() + " : " + e.getClass().getName());
            }
            this.listener.onReadError(e);
            throw e;
        }
    }

    public void postProcess(StepContribution stepContribution, Chunk<I> chunk) {
    }
}
