package com.redislabs.riot.redis.replicate;

import com.redislabs.riot.redis.writer.KeyBuilder;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyspaceNotificationsIterator.class */
public class KeyspaceNotificationsIterator implements KeyIterator {
    private static final Logger log = LoggerFactory.getLogger(KeyspaceNotificationsIterator.class);
    private StatefulRedisPubSubConnection<String, String> connection;
    private String channel;
    private int queueCapacity;
    private KeyspaceNotificationsListener listener;
    private ClusterKeyspaceNotificationsListener clusterListener;
    private BlockingQueue<String> queue;
    private boolean stopped;

    /* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyspaceNotificationsIterator$ClusterKeyspaceNotificationsListener.class */
    private class ClusterKeyspaceNotificationsListener extends RedisClusterPubSubAdapter<String, String> {
        private ClusterKeyspaceNotificationsListener() {
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
            KeyspaceNotificationsIterator.this.message(str2);
        }
    }

    /* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyspaceNotificationsIterator$KeyspaceNotificationsIteratorBuilder.class */
    public static class KeyspaceNotificationsIteratorBuilder {
        private StatefulRedisPubSubConnection<String, String> connection;
        private String channel;
        private int queueCapacity;

        KeyspaceNotificationsIteratorBuilder() {
        }

        public KeyspaceNotificationsIteratorBuilder connection(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection) {
            this.connection = statefulRedisPubSubConnection;
            return this;
        }

        public KeyspaceNotificationsIteratorBuilder channel(String str) {
            this.channel = str;
            return this;
        }

        public KeyspaceNotificationsIteratorBuilder queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public KeyspaceNotificationsIterator build() {
            return new KeyspaceNotificationsIterator(this.connection, this.channel, this.queueCapacity);
        }

        public String toString() {
            return "KeyspaceNotificationsIterator.KeyspaceNotificationsIteratorBuilder(connection=" + this.connection + ", channel=" + this.channel + ", queueCapacity=" + this.queueCapacity + ")";
        }
    }

    /* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyspaceNotificationsIterator$KeyspaceNotificationsListener.class */
    private class KeyspaceNotificationsListener extends RedisPubSubAdapter<String, String> {
        private KeyspaceNotificationsListener() {
        }

        public void message(String str, String str2, String str3) {
            KeyspaceNotificationsIterator.this.message(str2);
        }
    }

    private KeyspaceNotificationsIterator(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, String str, int i) {
        this.listener = new KeyspaceNotificationsListener();
        this.clusterListener = new ClusterKeyspaceNotificationsListener();
        this.connection = statefulRedisPubSubConnection;
        this.channel = str;
        this.queueCapacity = i;
    }

    @Override // com.redislabs.riot.redis.replicate.KeyIterator
    public void start() {
        log.debug("Creating queue with capacity {}", Integer.valueOf(this.queueCapacity));
        this.queue = new LinkedBlockingDeque(this.queueCapacity);
        if (cluster()) {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.connection;
            statefulRedisClusterPubSubConnection.addListener(this.clusterListener);
            statefulRedisClusterPubSubConnection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().masters().commands()).psubscribe(new String[]{this.channel});
            log.debug("Subscribed to channel {}", this.channel);
        } else {
            this.connection.addListener(this.listener);
            this.connection.sync().psubscribe(new String[]{this.channel});
        }
        this.stopped = false;
    }

    private boolean cluster() {
        return this.connection instanceof StatefulRedisClusterPubSubConnection;
    }

    @Override // com.redislabs.riot.redis.replicate.KeyIterator
    public void stop() {
        this.stopped = true;
        if (!cluster()) {
            this.connection.sync().punsubscribe(new String[]{this.channel});
            this.connection.removeListener(this.listener);
        } else {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.connection;
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().masters().commands()).punsubscribe(new String[]{this.channel});
            statefulRedisClusterPubSubConnection.removeListener(this.clusterListener);
        }
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return !this.stopped;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public String next() {
        String poll;
        do {
            try {
                poll = this.queue.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    break;
                }
            } catch (InterruptedException e) {
                return null;
            }
        } while (!this.stopped);
        return poll;
    }

    public void message(String str) {
        try {
            this.queue.put(str.substring(str.indexOf(KeyBuilder.DEFAULT_KEY_SEPARATOR) + 1));
        } catch (InterruptedException e) {
        }
    }

    public static KeyspaceNotificationsIteratorBuilder builder() {
        return new KeyspaceNotificationsIteratorBuilder();
    }
}
