package me.ahoo.cosky.core.listener;

import io.lettuce.core.pubsub.api.reactive.PatternMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;

/* loaded from: input_file:me/ahoo/cosky/core/listener/PatternMessageSubscriber.class */
public class PatternMessageSubscriber extends BaseSubscriber<PatternMessage<String, String>> {
    private static final Logger log = LoggerFactory.getLogger(PatternMessageSubscriber.class);
    private final RedisPubSubReactiveCommands<String, String> pubSubReactiveCommands;
    private final ConcurrentHashMap<String, CopyOnWriteArraySet<MessageListener>> listeners = new ConcurrentHashMap<>();

    public PatternMessageSubscriber(RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands) {
        this.pubSubReactiveCommands = redisPubSubReactiveCommands;
        this.pubSubReactiveCommands.observePatterns().subscribe(this);
    }

    public void addListener(String str, MessageListener messageListener) {
        this.listeners.compute(str, (str2, copyOnWriteArraySet) -> {
            if (Objects.isNull(copyOnWriteArraySet)) {
                copyOnWriteArraySet = new CopyOnWriteArraySet();
            }
            if (copyOnWriteArraySet.isEmpty()) {
                if (log.isInfoEnabled()) {
                    log.info("addListener - psubscribe - [{}]", str2);
                }
                this.pubSubReactiveCommands.psubscribe(new String[]{str2}).subscribe();
            }
            copyOnWriteArraySet.add(messageListener);
            return copyOnWriteArraySet;
        });
    }

    public void removeListener(String str, MessageListener messageListener) {
        this.listeners.compute(str, (str2, copyOnWriteArraySet) -> {
            if (Objects.isNull(copyOnWriteArraySet)) {
                return null;
            }
            if (copyOnWriteArraySet.remove(messageListener) && copyOnWriteArraySet.isEmpty()) {
                if (log.isInfoEnabled()) {
                    log.info("removeListener - punsubscribe - [{}]", str2);
                }
                this.pubSubReactiveCommands.punsubscribe(new String[]{str2}).subscribe();
            }
            return copyOnWriteArraySet;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hookOnNext(PatternMessage<String, String> patternMessage) {
        if (log.isDebugEnabled()) {
            log.debug("hookOnNext - pattern:[{}] - channel:[{}] - message:[{}].", new Object[]{patternMessage.getPattern(), patternMessage.getChannel(), patternMessage.getMessage()});
        }
        CopyOnWriteArraySet<MessageListener> copyOnWriteArraySet = this.listeners.get(patternMessage.getPattern());
        if (!Objects.isNull(copyOnWriteArraySet) && !copyOnWriteArraySet.isEmpty()) {
            copyOnWriteArraySet.forEach(messageListener -> {
                messageListener.onMessage((String) patternMessage.getPattern(), (String) patternMessage.getChannel(), (String) patternMessage.getMessage());
            });
        } else if (log.isDebugEnabled()) {
            log.debug("hookOnNext - pattern:[{}] - channel:[{}] - message:[{}] - listeners is empty.", new Object[]{patternMessage.getPattern(), patternMessage.getChannel(), patternMessage.getMessage()});
        }
    }

    protected void hookOnError(Throwable th) {
        if (log.isErrorEnabled()) {
            log.error(th.getMessage(), th);
        }
    }

    protected void hookOnCancel() {
        if (log.isInfoEnabled()) {
            log.info("hookOnCancel.");
        }
    }

    protected void hookOnSubscribe(Subscription subscription) {
        if (log.isInfoEnabled()) {
            log.info("hookOnSubscribe.");
        }
        super.hookOnSubscribe(subscription);
    }

    protected void hookOnComplete() {
        if (log.isInfoEnabled()) {
            log.info("hookOnComplete.");
        }
    }
}
