package org.springframework.batch.item.redis.support;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import java.time.Duration;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/RedisClusterKeyspaceNotificationItemReader.class */
public class RedisClusterKeyspaceNotificationItemReader<K, V> extends AbstractKeyspaceNotificationItemReader<K, V> {
    private final StatefulRedisClusterPubSubConnection<K, V> connection;
    private final RedisClusterKeyspaceNotificationItemReader<K, V>.ClusterKeyspaceNotificationListener listener;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/RedisClusterKeyspaceNotificationItemReader$ClusterKeyspaceNotificationListener.class */
    private class ClusterKeyspaceNotificationListener extends RedisClusterPubSubAdapter<K, V> {
        private ClusterKeyspaceNotificationListener() {
        }

        public void message(RedisClusterNode redisClusterNode, K k, V v) {
            RedisClusterKeyspaceNotificationItemReader.this.notification(k, v);
        }

        public void message(RedisClusterNode redisClusterNode, K k, K k2, V v) {
            RedisClusterKeyspaceNotificationItemReader.this.notification(k2, v);
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/support/RedisClusterKeyspaceNotificationItemReader$RedisClusterKeyspaceNotificationItemReaderBuilder.class */
    public static class RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> {
        private Duration readTimeout;
        private StatefulRedisClusterPubSubConnection<K, V> connection;
        private K pubSubPattern;
        private Converter<K, K> keyExtractor;
        private int queueCapacity;

        RedisClusterKeyspaceNotificationItemReaderBuilder() {
        }

        public RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> readTimeout(Duration duration) {
            this.readTimeout = duration;
            return this;
        }

        public RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> connection(StatefulRedisClusterPubSubConnection<K, V> statefulRedisClusterPubSubConnection) {
            this.connection = statefulRedisClusterPubSubConnection;
            return this;
        }

        public RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> pubSubPattern(K k) {
            this.pubSubPattern = k;
            return this;
        }

        public RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> keyExtractor(Converter<K, K> converter) {
            this.keyExtractor = converter;
            return this;
        }

        public RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public RedisClusterKeyspaceNotificationItemReader<K, V> build() {
            return new RedisClusterKeyspaceNotificationItemReader<>(this.readTimeout, this.connection, this.pubSubPattern, this.keyExtractor, this.queueCapacity);
        }

        public String toString() {
            return "RedisClusterKeyspaceNotificationItemReader.RedisClusterKeyspaceNotificationItemReaderBuilder(readTimeout=" + this.readTimeout + ", connection=" + this.connection + ", pubSubPattern=" + this.pubSubPattern + ", keyExtractor=" + this.keyExtractor + ", queueCapacity=" + this.queueCapacity + ")";
        }
    }

    public RedisClusterKeyspaceNotificationItemReader(Duration duration, StatefulRedisClusterPubSubConnection<K, V> statefulRedisClusterPubSubConnection, K k, Converter<K, K> converter, int i) {
        super(duration, k, converter, i);
        this.listener = new ClusterKeyspaceNotificationListener();
        Assert.notNull(statefulRedisClusterPubSubConnection, "A pub/sub connection is required.");
        this.connection = statefulRedisClusterPubSubConnection;
    }

    @Override // org.springframework.batch.item.redis.support.AbstractKeyspaceNotificationItemReader
    protected void subscribe(K k) {
        this.connection.addListener(this.listener);
        this.connection.setNodeMessagePropagation(true);
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).psubscribe(new Object[]{k});
    }

    @Override // org.springframework.batch.item.redis.support.AbstractKeyspaceNotificationItemReader
    protected void unsubscribe(K k) {
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(new Object[]{k});
        this.connection.removeListener(this.listener);
    }

    public static <K, V> RedisClusterKeyspaceNotificationItemReaderBuilder<K, V> builder() {
        return new RedisClusterKeyspaceNotificationItemReaderBuilder<>();
    }
}
