package me.ahoo.govern.core.listener;

import io.lettuce.core.api.sync.RedisServerCommands;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import java.util.concurrent.CompletableFuture;
import me.ahoo.govern.core.util.RedisKeySpaces;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/govern/core/listener/RedisMessageListenable.class */
public class RedisMessageListenable extends AbstractMessageListenable {
    private static final Logger log = LoggerFactory.getLogger(RedisMessageListenable.class);
    private final StatefulRedisPubSubConnection<String, String> pubSubConnection;
    private final RedisPubSubAsyncCommands<String, String> pubSubCommands;
    private final RedisPubSubListenerAdapter listenerAdapter;

    /* loaded from: input_file:me/ahoo/govern/core/listener/RedisMessageListenable$RedisPubSubListenerAdapter.class */
    private class RedisPubSubListenerAdapter implements RedisPubSubListener<String, String> {
        private RedisPubSubListenerAdapter() {
        }

        public void message(String str, String str2) {
            if (RedisMessageListenable.log.isDebugEnabled()) {
                RedisMessageListenable.log.debug("Message received from a channel subscription - channel[{}] | message[{}]", str, str2);
            }
            RedisMessageListenable.this.onMessage(str, str2, null);
        }

        public void message(String str, String str2, String str3) {
            if (RedisMessageListenable.log.isDebugEnabled()) {
                RedisMessageListenable.log.debug("Message received from a pattern subscription - pattern[{}] | channel[{}] | message[{}]", new Object[]{str, str2, str3});
            }
            RedisMessageListenable.this.onMessage(str2, str3, str);
        }

        public void subscribed(String str, long j) {
            if (RedisMessageListenable.log.isInfoEnabled()) {
                RedisMessageListenable.log.info("Subscribed to a channel - channel[{}] | [{}]", str, Long.valueOf(j));
            }
        }

        public void psubscribed(String str, long j) {
            if (RedisMessageListenable.log.isInfoEnabled()) {
                RedisMessageListenable.log.info("Subscribed to a pattern - pattern[{}] | [{}]", str, Long.valueOf(j));
            }
        }

        public void unsubscribed(String str, long j) {
            if (RedisMessageListenable.log.isInfoEnabled()) {
                RedisMessageListenable.log.info("Unsubscribed from a channel - channel[{}] | [{}]", str, Long.valueOf(j));
            }
        }

        public void punsubscribed(String str, long j) {
            if (RedisMessageListenable.log.isInfoEnabled()) {
                RedisMessageListenable.log.debug("Unsubscribed from a pattern - pattern[{}] | [{}]", str, Long.valueOf(j));
            }
        }
    }

    public RedisMessageListenable(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection) {
        RedisKeySpaces.ensureNotifyKeyspaceEvents((RedisServerCommands<String, String>) statefulRedisPubSubConnection.sync());
        this.pubSubConnection = statefulRedisPubSubConnection;
        this.pubSubCommands = statefulRedisPubSubConnection.async();
        this.listenerAdapter = new RedisPubSubListenerAdapter();
        this.pubSubConnection.addListener(this.listenerAdapter);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.pubSubConnection.close();
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> subscribe(ChannelTopic channelTopic) {
        return this.pubSubCommands.subscribe(new String[]{channelTopic.getTopic()}).toCompletableFuture();
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> subscribe(PatternTopic patternTopic) {
        return this.pubSubCommands.psubscribe(new String[]{patternTopic.getTopic()}).toCompletableFuture();
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> unsubscribe(ChannelTopic channelTopic) {
        return this.pubSubCommands.unsubscribe(new String[]{channelTopic.getTopic()}).toCompletableFuture();
    }

    @Override // me.ahoo.govern.core.listener.AbstractMessageListenable
    protected CompletableFuture<Void> unsubscribe(PatternTopic patternTopic) {
        return this.pubSubCommands.punsubscribe(new String[]{patternTopic.getTopic()}).toCompletableFuture();
    }
}
