package cn.ponfee.disjob.registry.zookeeper;

import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.RegistryException;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.zookeeper.configuration.ZookeeperRegistryProperties;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;

/* loaded from: input_file:cn/ponfee/disjob/registry/zookeeper/ZookeeperServerRegistry.class */
public abstract class ZookeeperServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final int CREATE_EPHEMERAL_FAIL_RETRIES = 3;
    private final CuratorFrameworkClient client;
    private final String zkRegistryRootPath;

    /* JADX INFO: Access modifiers changed from: protected */
    public ZookeeperServerRegistry(ZookeeperRegistryProperties zookeeperRegistryProperties) {
        super(zookeeperRegistryProperties.getNamespace(), '/');
        this.zkRegistryRootPath = this.separator + this.registryRootPath;
        String str = this.separator + this.discoveryRootPath;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            try {
                this.client = new CuratorFrameworkClient(zookeeperRegistryProperties, curatorFrameworkClient -> {
                    if (this.closed.get()) {
                        return;
                    }
                    for (Server server : this.registered) {
                        try {
                            curatorFrameworkClient.createEphemeral(buildRegistryPath(server), CREATE_EPHEMERAL_FAIL_RETRIES);
                        } catch (Throwable th) {
                            this.log.error("Re-registry server to zookeeper occur error: " + server, th);
                        }
                    }
                });
                this.client.createPersistent(this.zkRegistryRootPath);
                this.client.createPersistent(str);
                this.client.watchChildChanged(str, countDownLatch, this::doRefreshDiscoveryServers);
                countDownLatch.countDown();
            } catch (Exception e) {
                throw new RegistryException("Zookeeper registry init error: " + zookeeperRegistryProperties, e);
            }
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    public final boolean isConnected() {
        return this.client.isConnected();
    }

    public final void register(R r) {
        if (this.closed.get()) {
            return;
        }
        try {
            this.client.createEphemeral(buildRegistryPath(r), CREATE_EPHEMERAL_FAIL_RETRIES);
            this.registered.add(r);
            this.log.info("Server registered: {}, {}", this.registryRole, r);
        } catch (Throwable th) {
            throw new RegistryException("Zookeeper server register failed: " + r, th);
        }
    }

    public final void deregister(R r) {
        String buildRegistryPath = buildRegistryPath(r);
        try {
            this.registered.remove(r);
            this.client.deletePath(buildRegistryPath);
            this.log.info("Server deregister: {}, {}", this.registryRole, r);
        } catch (Throwable th) {
            this.log.error("Deregister to zookeeper failed: " + buildRegistryPath, th);
        }
    }

    public List<R> getRegisteredServers() throws Exception {
        return deserializeRegistryServers(this.client.getChildren(this.zkRegistryRootPath));
    }

    @PreDestroy
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.registered.forEach(this::deregister);
            CuratorFrameworkClient curatorFrameworkClient = this.client;
            curatorFrameworkClient.getClass();
            Throwables.ThrowingRunnable.doCaught(curatorFrameworkClient::close);
            this.registered.clear();
            super.close();
        }
    }

    private String buildRegistryPath(R r) {
        return this.zkRegistryRootPath + this.separator + r.serialize();
    }

    private synchronized void doRefreshDiscoveryServers(List<String> list) {
        List list2;
        this.log.info("Watched servers {}", list);
        if (CollectionUtils.isEmpty(list)) {
            this.log.warn("Not discovered available {} from zookeeper.", this.discoveryRole);
            list2 = Collections.emptyList();
        } else {
            Stream<String> filter = list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            });
            ServerRole serverRole = this.discoveryRole;
            serverRole.getClass();
            list2 = (List) filter.map(serverRole::deserialize).collect(Collectors.toList());
        }
        refreshDiscoveredServers(list2);
    }
}
