package org.springframework.batch.item.redis.support;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractLiveKeyItemReader.class */
public abstract class AbstractLiveKeyItemReader<K, V, C extends StatefulConnection<K, V>> extends AbstractKeyItemReader<K, V, C> {
    private static final Logger log = LoggerFactory.getLogger(AbstractLiveKeyItemReader.class);
    private final LiveKeyReaderOptions<K> options;
    private BlockingQueue<K> queue;
    private boolean stopped;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractLiveKeyItemReader(C c, Function<C, BaseRedisCommands<K, V>> function, LiveKeyReaderOptions<K> liveKeyReaderOptions) {
        super(c, function, liveKeyReaderOptions.getScanArgs());
        this.options = liveKeyReaderOptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.AbstractKeyItemReader
    public synchronized void doOpen() {
        this.queue = new LinkedBlockingDeque(this.options.getQueueCapacity());
        open(this.options.getPubsubPattern());
        super.doOpen();
    }

    protected abstract void open(K k);

    public void stop() {
        this.stopped = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.AbstractKeyItemReader
    public synchronized void doClose() {
        super.doClose();
        close(this.options.getPubsubPattern());
    }

    protected abstract void close(K k);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.AbstractKeyItemReader
    public synchronized K doRead() throws Exception {
        K poll;
        K k;
        if (this.queue.isEmpty() && (k = (K) super.doRead()) != null) {
            return k;
        }
        do {
            poll = this.queue.poll(this.options.getQueuePollingTimeout(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!this.stopped);
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(K k) {
        Object convert = this.options.getChannelConverter().convert(k);
        if (convert == null) {
            return;
        }
        try {
            this.queue.put(convert);
        } catch (InterruptedException e) {
            log.debug("Interrupted while trying to enqueue key", e);
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }
}
