package me.ahoo.cosky.discovery.redis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import me.ahoo.cosky.core.listener.MessageListenable;
import me.ahoo.cosky.core.listener.MessageListener;
import me.ahoo.cosky.discovery.DiscoveryKeyGenerator;
import me.ahoo.cosky.discovery.Instance;
import me.ahoo.cosky.discovery.InstanceIdGenerator;
import me.ahoo.cosky.discovery.NamespacedServiceId;
import me.ahoo.cosky.discovery.ServiceChangedEvent;
import me.ahoo.cosky.discovery.ServiceChangedListener;
import me.ahoo.cosky.discovery.ServiceDiscovery;
import me.ahoo.cosky.discovery.ServiceInstance;
import me.ahoo.cosky.discovery.ServiceInstanceContext;
import me.ahoo.cosky.discovery.ServiceListenable;
import me.ahoo.cosky.discovery.ServiceTopology;
import me.ahoo.cosky.discovery.loadbalancer.LoadBalancer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.class */
public class ConsistencyRedisServiceDiscovery implements ServiceDiscovery, ServiceListenable, ServiceTopology {
    private static final Logger log = LoggerFactory.getLogger(ConsistencyRedisServiceDiscovery.class);
    private final ServiceDiscovery delegate;
    private final MessageListenable messageListenable;
    private final RedisClusterReactiveCommands<String, String> redisCommands;
    private final ConcurrentHashMap<NamespacedServiceId, Mono<CopyOnWriteArrayList<ServiceInstance>>> serviceMapInstances = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Mono<List<String>>> namespaceMapServices = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<NamespacedServiceId, CopyOnWriteArraySet<ServiceChangedListener>> serviceMapListener = new ConcurrentHashMap<>();
    private final ServiceIdxListener serviceIdxListener = new ServiceIdxListener();
    private final InstanceListener instanceListener = new InstanceListener();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery$InstanceListener.class */
    public class InstanceListener implements MessageListener {
        private InstanceListener() {
        }

        public void onMessage(@Nullable String str, String str2, String str3) {
            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}]", new Object[]{str, str2, str3});
            }
            String namespaceOfKey = DiscoveryKeyGenerator.getNamespaceOfKey(str2);
            String instanceIdOfKey = DiscoveryKeyGenerator.getInstanceIdOfKey(namespaceOfKey, str2);
            Instance of = InstanceIdGenerator.DEFAULT.of(instanceIdOfKey);
            String serviceId = of.getServiceId();
            NamespacedServiceId of2 = NamespacedServiceId.of(namespaceOfKey, serviceId);
            AtomicReference atomicReference = new AtomicReference(ServiceChangedEvent.of(of2, str3, of));
            CopyOnWriteArraySet<ServiceChangedListener> copyOnWriteArraySet = (CopyOnWriteArraySet) ConsistencyRedisServiceDiscovery.this.serviceMapListener.get(of2);
            Mono mono = (Mono) ConsistencyRedisServiceDiscovery.this.serviceMapInstances.get(of2);
            if (!Objects.isNull(mono)) {
                mono.flatMap(copyOnWriteArrayList -> {
                    ServiceInstance serviceInstance = (ServiceInstance) copyOnWriteArrayList.stream().filter(serviceInstance2 -> {
                        return serviceInstance2.getInstanceId().equals(instanceIdOfKey);
                    }).findFirst().orElse(ServiceInstance.NOT_FOUND);
                    if (ServiceChangedEvent.REGISTER.equals(str3)) {
                        if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                            ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", new Object[]{str, str2, str3});
                        }
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespaceOfKey, serviceId, instanceIdOfKey).doOnNext(serviceInstance3 -> {
                            if (ServiceInstance.NOT_FOUND.equals(serviceInstance)) {
                                copyOnWriteArrayList.add(serviceInstance3);
                                return;
                            }
                            serviceInstance.setSchema(serviceInstance3.getSchema());
                            serviceInstance.setHost(serviceInstance3.getHost());
                            serviceInstance.setPort(serviceInstance3.getPort());
                            serviceInstance.setEphemeral(serviceInstance3.isEphemeral());
                            serviceInstance.setTtlAt(serviceInstance3.getTtlAt());
                            serviceInstance.setWeight(serviceInstance3.getWeight());
                            serviceInstance.setMetadata(serviceInstance3.getMetadata());
                        });
                    }
                    if (ServiceInstance.NOT_FOUND.equals(serviceInstance)) {
                        if (ConsistencyRedisServiceDiscovery.log.isWarnEnabled()) {
                            ConsistencyRedisServiceDiscovery.log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", new Object[]{str, str2, str3});
                        }
                        return Mono.empty();
                    }
                    boolean z = -1;
                    switch (str3.hashCode()) {
                        case -1309235419:
                            if (str3.equals(ServiceChangedEvent.EXPIRED)) {
                                z = 3;
                                break;
                            }
                            break;
                        case -1031001564:
                            if (str3.equals(ServiceChangedEvent.DEREGISTER)) {
                                z = 2;
                                break;
                            }
                            break;
                        case -824066452:
                            if (str3.equals(ServiceChangedEvent.SET_METADATA)) {
                                z = true;
                                break;
                            }
                            break;
                        case 108399245:
                            if (str3.equals(ServiceChangedEvent.RENEW)) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case LoadBalancer.ZERO /* 0 */:
                            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                                ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setTtlAt.", new Object[]{str, str2, str3});
                            }
                            Mono<Long> instanceTtl = ConsistencyRedisServiceDiscovery.this.delegate.getInstanceTtl(namespaceOfKey, serviceId, instanceIdOfKey);
                            serviceInstance.getClass();
                            return instanceTtl.doOnNext((v1) -> {
                                r1.setTtlAt(v1);
                            });
                        case LoadBalancer.ONE /* 1 */:
                            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                                ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setMetadata.", new Object[]{str, str2, str3});
                            }
                            return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespaceOfKey, serviceId, instanceIdOfKey).doOnNext(serviceInstance4 -> {
                                serviceInstance.setMetadata(serviceInstance4.getMetadata());
                            });
                        case true:
                        case true:
                            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                                ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] remove instance.", new Object[]{str, str2, str3});
                            }
                            atomicReference.set(ServiceChangedEvent.of(of2, str3, serviceInstance));
                            copyOnWriteArrayList.remove(serviceInstance);
                            return Mono.empty();
                        default:
                            return Mono.error(new IllegalStateException("Unexpected value: " + str3));
                    }
                }).doOnSuccess(obj -> {
                    invokeChanged((ServiceChangedEvent) atomicReference.get(), copyOnWriteArraySet);
                }).subscribe();
                return;
            }
            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                ConsistencyRedisServiceDiscovery.log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] instancesMono is null.", new Object[]{str, str2, str3});
            }
            invokeChanged((ServiceChangedEvent) atomicReference.get(), copyOnWriteArraySet);
        }

        private void invokeChanged(ServiceChangedEvent serviceChangedEvent, CopyOnWriteArraySet<ServiceChangedListener> copyOnWriteArraySet) {
            if (!Objects.nonNull(copyOnWriteArraySet) || copyOnWriteArraySet.isEmpty()) {
                return;
            }
            copyOnWriteArraySet.forEach(serviceChangedListener -> {
                serviceChangedListener.onChange(serviceChangedEvent);
            });
        }
    }

    /* loaded from: input_file:me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery$ServiceIdxListener.class */
    private class ServiceIdxListener implements MessageListener {
        private ServiceIdxListener() {
        }

        public void onMessage(@Nullable String str, String str2, String str3) {
            if (ConsistencyRedisServiceDiscovery.log.isInfoEnabled()) {
                ConsistencyRedisServiceDiscovery.log.info("onMessage@ServiceIdxListener - pattern:[{}] - channel:[{}] - message:[{}]", new Object[]{str, str2, str3});
            }
            String namespaceOfKey = DiscoveryKeyGenerator.getNamespaceOfKey(str2);
            ConsistencyRedisServiceDiscovery.this.namespaceMapServices.put(namespaceOfKey, ConsistencyRedisServiceDiscovery.this.delegate.getServices(namespaceOfKey).cache());
        }
    }

    public ConsistencyRedisServiceDiscovery(ServiceDiscovery serviceDiscovery, MessageListenable messageListenable, RedisClusterReactiveCommands<String, String> redisClusterReactiveCommands) {
        this.redisCommands = redisClusterReactiveCommands;
        this.delegate = serviceDiscovery;
        this.messageListenable = messageListenable;
    }

    @Override // me.ahoo.cosky.discovery.ServiceDiscovery
    public Mono<List<String>> getServices(String str) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "namespace can not be empty!");
        return this.namespaceMapServices.computeIfAbsent(str, str2 -> {
            this.messageListenable.addChannelListener(DiscoveryKeyGenerator.getServiceIdxKey(str), this.serviceIdxListener);
            return this.delegate.getServices(str).cache();
        });
    }

    @Override // me.ahoo.cosky.discovery.ServiceDiscovery
    public Mono<List<ServiceInstance>> getInstances(String str, String str2) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "namespace can not be empty!");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "serviceId can not be empty!");
        return this.serviceMapInstances.computeIfAbsent(NamespacedServiceId.of(str, str2), namespacedServiceId -> {
            return addListener(str, str2).then(this.delegate.getInstances(str, str2)).map((v1) -> {
                return new CopyOnWriteArrayList(v1);
            }).cache();
        }).map(copyOnWriteArrayList -> {
            return (List) copyOnWriteArrayList.stream().filter(serviceInstance -> {
                return !serviceInstance.isExpired();
            }).collect(Collectors.toList());
        });
    }

    public Mono<ServiceInstance> getInstance0(String str, String str2, String str3) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "namespace can not be empty!");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "serviceId can not be empty!");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str3), "instanceId can not be empty!");
        Mono<CopyOnWriteArrayList<ServiceInstance>> mono = this.serviceMapInstances.get(NamespacedServiceId.of(str, str2));
        return Objects.isNull(mono) ? this.delegate.getInstance(str, str2, str3) : mono.mapNotNull(copyOnWriteArrayList -> {
            return (ServiceInstance) copyOnWriteArrayList.stream().filter(serviceInstance -> {
                return serviceInstance.getInstanceId().equals(str3);
            }).findFirst().orElse(null);
        });
    }

    @Override // me.ahoo.cosky.discovery.ServiceDiscovery
    public Mono<ServiceInstance> getInstance(String str, String str2, String str3) {
        return getInstance0(str, str2, str3);
    }

    @Override // me.ahoo.cosky.discovery.ServiceDiscovery
    public Mono<Long> getInstanceTtl(String str, String str2, String str3) {
        return getInstance0(str, str2, str3).map((v0) -> {
            return v0.getTtlAt();
        });
    }

    @VisibleForTesting
    public Mono<Void> addListener(String str, String str2) {
        this.messageListenable.addPatternListener(DiscoveryKeyGenerator.getInstanceKeyPatternOfService(str, str2), this.instanceListener);
        return addTopology(str, str2);
    }

    @Override // me.ahoo.cosky.discovery.ServiceTopology
    public Mono<Void> addTopology(String str, String str2) {
        String namespace = ServiceInstanceContext.CURRENT.getNamespace();
        String consumerName = ServiceTopology.getConsumerName();
        String producerName = ServiceTopology.getProducerName(str, str2);
        return Objects.equals(consumerName, producerName) ? Mono.empty() : DiscoveryRedisScripts.doServiceTopologyAdd(this.redisCommands, str3 -> {
            return this.redisCommands.evalsha(str3, ScriptOutputType.STATUS, new String[]{namespace}, new String[]{consumerName, producerName}).then();
        });
    }

    @Override // me.ahoo.cosky.discovery.ServiceListenable
    public void addListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (namespacedServiceId2, copyOnWriteArraySet) -> {
            CopyOnWriteArraySet copyOnWriteArraySet = copyOnWriteArraySet;
            if (Objects.isNull(copyOnWriteArraySet)) {
                addListener(namespacedServiceId.getNamespace(), namespacedServiceId.getServiceId()).subscribe();
                copyOnWriteArraySet = new CopyOnWriteArraySet();
            }
            copyOnWriteArraySet.add(serviceChangedListener);
            return copyOnWriteArraySet;
        });
    }

    @Override // me.ahoo.cosky.discovery.ServiceListenable
    public void removeListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (namespacedServiceId2, copyOnWriteArraySet) -> {
            if (Objects.isNull(copyOnWriteArraySet)) {
                return null;
            }
            copyOnWriteArraySet.remove(serviceChangedListener);
            return copyOnWriteArraySet;
        });
    }

    @VisibleForTesting
    public void removeListener(String str, String str2) {
        this.messageListenable.removePatternListener(DiscoveryKeyGenerator.getInstanceKeyPatternOfService(str, str2), this.instanceListener);
    }
}
