package me.ahoo.govern.core.listener;

import io.lettuce.core.api.sync.RedisServerCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands;
import java.util.concurrent.CompletableFuture;
import me.ahoo.govern.core.util.RedisKeySpaces;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/govern/core/listener/RedisClusterMessageListenable.class */
public class RedisClusterMessageListenable extends AbstractMessageListenable {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterMessageListenable.class);
    private final RedisClusterPubSubListenerAdapter listenerAdapter;
    private final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;
    private final NodeSelectionPubSubAsyncCommands<String, String> pubSubCommands;

    /* loaded from: input_file:me/ahoo/govern/core/listener/RedisClusterMessageListenable$RedisClusterPubSubListenerAdapter.class */
    private class RedisClusterPubSubListenerAdapter implements RedisClusterPubSubListener<String, String> {
        private RedisClusterPubSubListenerAdapter() {
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2) {
            if (RedisClusterMessageListenable.log.isDebugEnabled()) {
                RedisClusterMessageListenable.log.debug("Message received from a channel subscription - RedisNode[{}] | channel[{}] | message[{}].", new Object[]{redisClusterNode.getUri(), str, str2});
            }
            RedisClusterMessageListenable.this.onMessage(str, str2, null);
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
            if (RedisClusterMessageListenable.log.isDebugEnabled()) {
                RedisClusterMessageListenable.log.debug("Message received from a pattern subscription - RedisNode[{}]  | pattern[{}] | channel[{}] | message[{}].", new Object[]{redisClusterNode.getUri(), str, str2, str3});
            }
            RedisClusterMessageListenable.this.onMessage(str2, str3, str);
        }

        public void subscribed(RedisClusterNode redisClusterNode, String str, long j) {
            if (RedisClusterMessageListenable.log.isInfoEnabled()) {
                RedisClusterMessageListenable.log.debug("Subscribed to a channel - RedisNode[{}]  | channel[{}] | {}.", new Object[]{redisClusterNode.getUri(), str, Long.valueOf(j)});
            }
        }

        public void psubscribed(RedisClusterNode redisClusterNode, String str, long j) {
            if (RedisClusterMessageListenable.log.isInfoEnabled()) {
                RedisClusterMessageListenable.log.info("Subscribed to a pattern - RedisNode[{}]  | pattern[{}] | {}.", new Object[]{redisClusterNode.getUri(), str, Long.valueOf(j)});
            }
        }

        public void unsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
            if (RedisClusterMessageListenable.log.isInfoEnabled()) {
                RedisClusterMessageListenable.log.info("Unsubscribed from a channel - RedisNode[{}] | channel[{}] | {}.", new Object[]{redisClusterNode.getUri(), str, Long.valueOf(j)});
            }
        }

        public void punsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
            if (RedisClusterMessageListenable.log.isInfoEnabled()) {
                RedisClusterMessageListenable.log.info("Unsubscribed from a pattern - RedisNode[{}] | pattern[{}] | {}.", new Object[]{redisClusterNode.getUri(), str, Long.valueOf(j)});
            }
        }
    }

    public RedisClusterMessageListenable(StatefulRedisClusterPubSubConnection<String, String> statefulRedisClusterPubSubConnection) {
        RedisKeySpaces.ensureNotifyKeyspaceEvents((RedisServerCommands<String, String>) statefulRedisClusterPubSubConnection.sync());
        this.pubSubConnection = statefulRedisClusterPubSubConnection;
        this.pubSubConnection.setNodeMessagePropagation(true);
        this.pubSubCommands = (NodeSelectionPubSubAsyncCommands) statefulRedisClusterPubSubConnection.async().upstream().commands();
        this.listenerAdapter = new RedisClusterPubSubListenerAdapter();
        this.pubSubConnection.addListener(this.listenerAdapter);
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> subscribe(ChannelTopic channelTopic) {
        return CompletableFuture.allOf(this.pubSubCommands.subscribe(new String[]{channelTopic.getTopic()}).futures());
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> subscribe(PatternTopic patternTopic) {
        return CompletableFuture.allOf(this.pubSubCommands.psubscribe(new String[]{patternTopic.getTopic()}).futures());
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> unsubscribe(ChannelTopic channelTopic) {
        return CompletableFuture.allOf(this.pubSubCommands.unsubscribe(new String[]{channelTopic.getTopic()}).futures());
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> unsubscribe(PatternTopic patternTopic) {
        return CompletableFuture.allOf(this.pubSubCommands.punsubscribe(new String[]{patternTopic.getTopic()}).futures());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.pubSubConnection.close();
    }
}
