package org.springframework.batch.item.redis.support;

import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.time.Duration;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/RedisKeyspaceNotificationItemReader.class */
public class RedisKeyspaceNotificationItemReader<K, V> extends AbstractKeyspaceNotificationItemReader<K, V> {
    private final StatefulRedisPubSubConnection<K, V> connection;
    private final RedisKeyspaceNotificationItemReader<K, V>.KeyspaceNotificationListener listener;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/RedisKeyspaceNotificationItemReader$KeyspaceNotificationListener.class */
    private class KeyspaceNotificationListener extends RedisPubSubAdapter<K, V> {
        private KeyspaceNotificationListener() {
        }

        public void message(K k, V v) {
            RedisKeyspaceNotificationItemReader.this.notification(k, v);
        }

        public void message(K k, K k2, V v) {
            RedisKeyspaceNotificationItemReader.this.notification(k2, v);
        }
    }

    public RedisKeyspaceNotificationItemReader(Duration duration, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, K k, Converter<K, K> converter, int i) {
        super(duration, k, converter, i);
        this.listener = new KeyspaceNotificationListener();
        Assert.notNull(statefulRedisPubSubConnection, "A pub/sub connection is required.");
        this.connection = statefulRedisPubSubConnection;
    }

    @Override // org.springframework.batch.item.redis.support.AbstractKeyspaceNotificationItemReader
    protected void subscribe(K k) {
        this.connection.addListener(this.listener);
        this.connection.sync().psubscribe(new Object[]{k});
    }

    @Override // org.springframework.batch.item.redis.support.AbstractKeyspaceNotificationItemReader
    protected void unsubscribe(K k) {
        this.connection.sync().punsubscribe(new Object[]{k});
        this.connection.removeListener(this.listener);
    }
}
