package cn.ponfee.disjob.registry.redis;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.base.TextTokenizer;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.EventType;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.redis.configuration.RedisRegistryProperties;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:cn/ponfee/disjob/registry/redis/RedisServerRegistry.class */
public abstract class RedisServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final RedisScript<Void> REGISTRY_SCRIPT = RedisScript.of("local score  = ARGV[1];                        \nlocal expire = ARGV[2];                        \nlocal length = #ARGV;                          \nfor i = 3,length do                            \n  redis.call('zadd', KEYS[1], score, ARGV[i]); \nend                                            \nredis.call('pexpire', KEYS[1], expire);        \n", Void.class);
    private static final RedisScript<List> QUERY_SCRIPT = RedisScript.of("redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1]);          \nlocal ret = redis.call('zrangebyscore', KEYS[1], ARGV[1], '+inf'); \nredis.call('pexpire', KEYS[1], ARGV[2]);                           \nreturn ret;                                                        \n", List.class);
    private static final String REDIS_KEY_TTL_MILLIS = Long.toString(2592000000L);
    private static final String CHANNEL = "channel";
    private final String registryChannel;
    private final StringRedisTemplate stringRedisTemplate;
    private final long sessionTimeoutMs;
    private final long periodMs;
    private final LoopThread registerHeartbeatThread;
    private final List<String> registryRedisKey;
    private final LoopThread discoverHeartbeatThread;
    private final ThreadPoolExecutor redisSubscribeExecutor;
    private final Lock asyncRefreshLock;
    private final List<String> discoveryRedisKey;
    private volatile long nextDiscoverTimeMillis;
    private final RedisMessageListenerContainer redisMessageListenerContainer;

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisServerRegistry(StringRedisTemplate stringRedisTemplate, RedisRegistryProperties redisRegistryProperties) {
        super(redisRegistryProperties.getNamespace(), ':');
        this.asyncRefreshLock = new ReentrantLock();
        this.nextDiscoverTimeMillis = 0L;
        this.registryChannel = this.registryRootPath + this.separator + CHANNEL;
        this.stringRedisTemplate = stringRedisTemplate;
        this.sessionTimeoutMs = redisRegistryProperties.getSessionTimeoutMs();
        this.periodMs = redisRegistryProperties.getSessionTimeoutMs() / 3;
        this.registryRedisKey = Collections.singletonList(this.registryRootPath);
        this.discoveryRedisKey = Collections.singletonList(this.discoveryRootPath);
        this.registerHeartbeatThread = LoopThread.createStarted("redis_register_heartbeat", this.periodMs, this.periodMs, () -> {
            RetryTemplate.execute(() -> {
                doRegisterServers(this.registered);
            }, 3, 1000L);
        });
        this.discoverHeartbeatThread = LoopThread.createStarted("redis_discover_heartbeat", this.periodMs, this.periodMs, () -> {
            if (requireDiscoverServers()) {
                tryDiscoverServers();
            }
        });
        this.redisSubscribeExecutor = ThreadPoolExecutors.builder().corePoolSize(1).maximumPoolSize(1).workQueue(new ArrayBlockingQueue(1)).keepAliveTimeSeconds(600L).rejectedHandler(ThreadPoolExecutors.DISCARD).threadFactory(NamedThreadFactory.builder().prefix("redis_async_subscribe").priority(10).uncaughtExceptionHandler(this.log).build()).build();
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory((RedisConnectionFactory) Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()));
        redisMessageListenerContainer.setTaskExecutor(this.redisSubscribeExecutor);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this, (String) Throwables.ThrowingSupplier.doChecked(() -> {
            return RedisServerRegistry.class.getMethod("handleMessage", String.class, String.class).getName();
        }));
        messageListenerAdapter.afterPropertiesSet();
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new ChannelTopic(this.discoveryRootPath + this.separator + CHANNEL));
        redisMessageListenerContainer.afterPropertiesSet();
        redisMessageListenerContainer.start();
        this.redisMessageListenerContainer = redisMessageListenerContainer;
        try {
            doDiscoverServers();
        } catch (Throwable th) {
            Threads.interruptIfNecessary(th);
            close();
            throw new Error("Redis init discover error.", th);
        }
    }

    public boolean isConnected() {
        Boolean bool = (Boolean) this.stringRedisTemplate.execute(redisConnection -> {
            return Boolean.valueOf(!redisConnection.isClosed());
        });
        return bool != null && bool.booleanValue();
    }

    public final void register(R r) {
        if (this.closed.get()) {
            return;
        }
        doRegisterServers(Collections.singleton(r));
        this.registered.add(r);
        Throwables.ThrowingRunnable.doCaught(() -> {
            publish(r, EventType.REGISTER);
        });
        this.log.info("Server registered: {}, {}", this.registryRole, r);
    }

    public final void deregister(R r) {
        this.registered.remove(r);
        Throwables.ThrowingSupplier.doCaught(() -> {
            return this.stringRedisTemplate.opsForZSet().remove(this.registryRootPath, new Object[]{r.serialize()});
        });
        Throwables.ThrowingRunnable.doCaught(() -> {
            publish(r, EventType.DEREGISTER);
        });
        this.log.info("Server deregister: {}, {}", this.registryRole, r);
    }

    public List<R> getRegisteredServers() {
        return deserializeRegistryServers((List) this.stringRedisTemplate.execute(QUERY_SCRIPT, this.registryRedisKey, new Object[]{Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS}));
    }

    @PreDestroy
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.registerHeartbeatThread.terminate();
            this.registered.forEach(this::deregister);
            RedisMessageListenerContainer redisMessageListenerContainer = this.redisMessageListenerContainer;
            redisMessageListenerContainer.getClass();
            Throwables.ThrowingRunnable.doCaught(redisMessageListenerContainer::stop);
            this.discoverHeartbeatThread.terminate();
            ThreadPoolExecutors.shutdown(this.redisSubscribeExecutor, 2);
            this.registered.clear();
            super.close();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void handleMessage(String str, String str2) {
        try {
            TextTokenizer textTokenizer = new TextTokenizer(str, ":");
            String next = textTokenizer.next();
            String tail = textTokenizer.tail();
            this.log.info("Subscribed message: {}, {}", str2, str);
            subscribe(EventType.valueOf(next), this.discoveryRole.deserialize(tail));
        } catch (Throwable th) {
            this.log.error("Parse subscribed message error: " + str + ", " + str2, th);
        }
    }

    private void publish(R r, EventType eventType) {
        this.stringRedisTemplate.convertAndSend(this.registryChannel, eventType.name() + ":" + r.serialize());
    }

    private void subscribe(EventType eventType, D d) {
        tryDiscoverServers();
    }

    private void doRegisterServers(Set<R> set) {
        if (CollectionUtils.isEmpty(set)) {
            return;
        }
        Object[] objArr = new Object[set.size() + 2];
        int i = 0 + 1;
        objArr[0] = Long.toString(System.currentTimeMillis() + this.sessionTimeoutMs);
        int i2 = i + 1;
        objArr[i] = REDIS_KEY_TTL_MILLIS;
        Iterator<R> it = set.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            objArr[i3] = it.next().serialize();
        }
        this.stringRedisTemplate.execute(REGISTRY_SCRIPT, this.registryRedisKey, objArr);
    }

    private void tryDiscoverServers() {
        if (this.asyncRefreshLock.tryLock()) {
            try {
                doDiscoverServers();
            } catch (Throwable th) {
                Threads.interruptIfNecessary(th);
                this.log.error("Redis discover servers occur error.", th);
            } finally {
                this.asyncRefreshLock.unlock();
            }
        }
    }

    private void doDiscoverServers() throws Throwable {
        RetryTemplate.execute(() -> {
            List list = (List) this.stringRedisTemplate.execute(QUERY_SCRIPT, this.discoveryRedisKey, new Object[]{Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS});
            if (CollectionUtils.isEmpty(list)) {
                this.log.warn("Not discovered available {} from redis.", this.discoveryRole);
                list = Collections.emptyList();
            }
            Stream stream = list.stream();
            ServerRole serverRole = this.discoveryRole;
            serverRole.getClass();
            refreshDiscoveredServers((List) stream.map(serverRole::deserialize).collect(Collectors.toList()));
            renewNextDiscoverTimeMillis();
            this.log.debug("Redis discovered {} servers.", this.discoveryRole);
        }, 3, 1000L);
    }

    private boolean requireDiscoverServers() {
        return !this.closed.get() && this.nextDiscoverTimeMillis < System.currentTimeMillis();
    }

    private void renewNextDiscoverTimeMillis() {
        this.nextDiscoverTimeMillis = System.currentTimeMillis() + this.periodMs;
    }
}
