package org.springframework.batch.item.redis.support;

import io.lettuce.core.StreamMessage;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.redis.support.AbstractPollableItemReader;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/StreamItemReader.class */
public class StreamItemReader<K, V, C extends StatefulConnection<K, V>> extends AbstractPollableItemReader<StreamMessage<K, V>> {
    private static final Logger log = LoggerFactory.getLogger(StreamItemReader.class);
    private final C connection;
    private final Function<C, RedisStreamCommands<K, V>> commands;
    private final Long block;
    private final Long count;
    private final boolean noack;
    private XReadArgs.StreamOffset<K> offset;
    private Iterator<StreamMessage<K, V>> iterator;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/StreamItemReader$StreamItemReaderBuilder.class */
    public static abstract class StreamItemReaderBuilder<K, V, R extends StreamItemReader<K, V, ?>, B extends StreamItemReaderBuilder<K, V, R, B>> extends AbstractPollableItemReader.PollableItemReaderBuilder<B> {
        protected XReadArgs.StreamOffset<K> offset;
        protected Long block;
        protected Long count;
        protected boolean noack;

        public B offset(XReadArgs.StreamOffset<K> streamOffset) {
            this.offset = streamOffset;
            return this;
        }

        public B block(Long l) {
            this.block = l;
            return this;
        }

        public B count(Long l) {
            this.count = l;
            return this;
        }

        public B noack(boolean z) {
            this.noack = z;
            return this;
        }

        public abstract R build();
    }

    public StreamItemReader(Duration duration, C c, Function<C, RedisStreamCommands<K, V>> function, XReadArgs.StreamOffset<K> streamOffset, Long l, Long l2, boolean z) {
        super(duration);
        Assert.notNull(c, "A Redis connection is required.");
        Assert.notNull(function, "A command provider is required");
        Assert.notNull(streamOffset, "Offset is required.");
        this.connection = c;
        this.commands = function;
        this.offset = streamOffset;
        this.block = l;
        this.count = l2;
        this.noack = z;
        this.iterator = Collections.emptyIterator();
    }

    @Override // org.springframework.batch.item.redis.support.PollableItemReader
    public StreamMessage<K, V> poll(long j, TimeUnit timeUnit) {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> nextMessages = nextMessages(Long.valueOf(timeUnit.toMillis(j)));
            if (nextMessages == null || nextMessages.isEmpty()) {
                return null;
            }
            this.iterator = nextMessages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() {
        return nextMessages(this.block);
    }

    private List<StreamMessage<K, V>> nextMessages(Long l) {
        XReadArgs noack = XReadArgs.Builder.noack(this.noack);
        if (l != null) {
            noack.block(l.longValue());
        }
        if (this.count != null) {
            noack.count(this.count.longValue());
        }
        List<StreamMessage<K, V>> xread = this.commands.apply(this.connection).xread(noack, new XReadArgs.StreamOffset[]{this.offset});
        if (xread != null && !xread.isEmpty()) {
            StreamMessage<K, V> streamMessage = xread.get(xread.size() - 1);
            this.offset = XReadArgs.StreamOffset.from(streamMessage.getStream(), streamMessage.getId());
        }
        return xread;
    }

    public XReadArgs.StreamOffset<K> getOffset() {
        return this.offset;
    }
}
