package org.springframework.batch.item.redis.support;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/LiveKeyItemReader.class */
public class LiveKeyItemReader<K, V> extends KeyItemReader<K, V> implements RedisPubSubListener<K, V>, RedisClusterPubSubListener<K, V> {
    private static final Logger log = LoggerFactory.getLogger(LiveKeyItemReader.class);
    private final StatefulRedisPubSubConnection<K, V> pubSubConnection;
    private final long queuePollingTimeout;
    private final BlockingQueue<K> queue;
    private final K pubSubPattern;
    private final Converter<K, K> keyExtractor;
    private boolean stopped;
    private boolean running;

    public LiveKeyItemReader(StatefulConnection<K, V> statefulConnection, Function<StatefulConnection<K, V>, BaseRedisCommands<K, V>> function, long j, String str, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, int i, long j2, K k, Converter<K, K> converter) {
        super(statefulConnection, function, j, str);
        Assert.notNull(statefulRedisPubSubConnection, "A PubSub connection is required.");
        Assert.notNull(k, "A PubSub channel pattern is required.");
        Assert.notNull(converter, "A key extractor is required.");
        this.pubSubConnection = statefulRedisPubSubConnection;
        this.queue = new LinkedBlockingDeque(i);
        this.queuePollingTimeout = j2;
        this.pubSubPattern = k;
        this.keyExtractor = converter;
    }

    public void message(K k, V v) {
        notification(k);
    }

    public void message(K k, K k2, V v) {
        message(k2, v);
    }

    public void subscribed(K k, long j) {
    }

    public void psubscribed(K k, long j) {
    }

    public void unsubscribed(K k, long j) {
    }

    public void punsubscribed(K k, long j) {
    }

    public void message(RedisClusterNode redisClusterNode, K k, V v) {
        notification(k);
    }

    public void message(RedisClusterNode redisClusterNode, K k, K k2, V v) {
        notification(k2);
    }

    public void subscribed(RedisClusterNode redisClusterNode, K k, long j) {
    }

    public void psubscribed(RedisClusterNode redisClusterNode, K k, long j) {
    }

    public void unsubscribed(RedisClusterNode redisClusterNode, K k, long j) {
    }

    public void punsubscribed(RedisClusterNode redisClusterNode, K k, long j) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.KeyItemReader
    public synchronized void doOpen() {
        super.doOpen();
        if (this.pubSubConnection instanceof StatefulRedisClusterPubSubConnection) {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.pubSubConnection;
            statefulRedisClusterPubSubConnection.addListener(this);
            statefulRedisClusterPubSubConnection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().masters().commands()).psubscribe(new Object[]{this.pubSubPattern});
        } else {
            this.pubSubConnection.addListener(this);
            this.pubSubConnection.sync().psubscribe(new Object[]{this.pubSubPattern});
        }
        this.running = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.KeyItemReader
    public synchronized void doClose() {
        if (this.pubSubConnection instanceof StatefulRedisClusterPubSubConnection) {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.pubSubConnection;
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().masters().commands()).punsubscribe(new Object[]{this.pubSubPattern});
            statefulRedisClusterPubSubConnection.removeListener(this);
        } else {
            this.pubSubConnection.sync().punsubscribe(new Object[]{this.pubSubPattern});
            this.pubSubConnection.removeListener(this);
        }
        super.doClose();
    }

    public void stop() {
        this.stopped = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.batch.item.redis.support.KeyItemReader
    public synchronized K doRead() throws Exception {
        K poll;
        K k;
        if (this.queue.isEmpty() && (k = (K) super.doRead()) != null) {
            return k;
        }
        do {
            poll = this.queue.poll(this.queuePollingTimeout, TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!this.stopped);
        return poll;
    }

    private void notification(K k) {
        Object convert = this.keyExtractor.convert(k);
        if (convert == null) {
            return;
        }
        try {
            this.queue.put(convert);
        } catch (InterruptedException e) {
            log.debug("Interrupted while trying to enqueue key", e);
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}
