package cn.ponfee.disjob.supervisor.application;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.MultithreadExecutors;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.util.Numbers;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.Supervisor;
import cn.ponfee.disjob.core.base.SupervisorMetrics;
import cn.ponfee.disjob.core.base.SupervisorRpcService;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.base.WorkerMetrics;
import cn.ponfee.disjob.core.base.WorkerRpcService;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import cn.ponfee.disjob.core.exception.KeyExistsException;
import cn.ponfee.disjob.core.exception.KeyNotExistsException;
import cn.ponfee.disjob.core.param.supervisor.EventParam;
import cn.ponfee.disjob.core.param.worker.ConfigureWorkerParam;
import cn.ponfee.disjob.core.param.worker.GetMetricsParam;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import cn.ponfee.disjob.registry.rpc.DestinationServerRestProxy;
import cn.ponfee.disjob.supervisor.application.converter.ServerMetricsConverter;
import cn.ponfee.disjob.supervisor.application.request.ConfigureAllWorkerRequest;
import cn.ponfee.disjob.supervisor.application.request.ConfigureOneWorkerRequest;
import cn.ponfee.disjob.supervisor.application.response.SupervisorMetricsResponse;
import cn.ponfee.disjob.supervisor.application.response.WorkerMetricsResponse;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
/* loaded from: input_file:cn/ponfee/disjob/supervisor/application/ServerInvokeService.class */
public class ServerInvokeService extends SingletonClassConstraint {
    private static final Logger LOG = LoggerFactory.getLogger(ServerInvokeService.class);
    private final SupervisorRegistry supervisorRegistry;
    private final Supervisor.Current currentSupervisor;
    private final DestinationServerRestProxy.DestinationServerInvoker<SupervisorRpcService, Supervisor> supervisorRpcClient;
    private final DestinationServerRestProxy.DestinationServerInvoker<WorkerRpcService, Worker> workerRpcClient;

    public ServerInvokeService(SupervisorRegistry supervisorRegistry, Supervisor.Current current, @Value("${server.servlet.context-path:/}") String str, SupervisorRpcService supervisorRpcService, @Qualifier("disjob.bean.rest-template") RestTemplate restTemplate, DestinationServerRestProxy.DestinationServerInvoker<WorkerRpcService, Worker> destinationServerInvoker) {
        this.supervisorRegistry = supervisorRegistry;
        this.currentSupervisor = current;
        this.supervisorRpcClient = DestinationServerRestProxy.create(SupervisorRpcService.class, supervisorRpcService, current, supervisor -> {
            return str;
        }, restTemplate, RetryProperties.of(0, 0));
        this.workerRpcClient = destinationServerInvoker;
    }

    public List<SupervisorMetricsResponse> supervisors() throws Exception {
        return MultithreadExecutors.call(Collects.sorted(this.supervisorRegistry.getRegisteredServers(), Comparator.comparing(supervisor -> {
            return Integer.valueOf(supervisor.equals(this.currentSupervisor) ? 0 : 1);
        })), this::getSupervisorMetrics, ThreadPoolExecutors.commonThreadPool());
    }

    public List<WorkerMetricsResponse> workers(String str, String str2) {
        if (!StringUtils.isNotBlank(str2)) {
            return MultithreadExecutors.call(Collects.sorted(this.supervisorRegistry.getDiscoveredServers(str), Comparator.comparing(worker -> {
                return Integer.valueOf(worker.equals(Worker.current()) ? 0 : 1);
            })), this::getWorkerMetrics, ThreadPoolExecutors.commonThreadPool());
        }
        String[] split = str2.trim().split(":");
        WorkerMetricsResponse workerMetrics = getWorkerMetrics(new Worker(str, "", split[0].trim(), Numbers.toInt(split[1].trim(), -1)));
        return StringUtils.isBlank(workerMetrics.getWorkerId()) ? Collections.emptyList() : Collections.singletonList(workerMetrics);
    }

    public void configureOneWorker(ConfigureOneWorkerRequest configureOneWorkerRequest) {
        Worker worker = configureOneWorkerRequest.toWorker();
        if (configureOneWorkerRequest.getAction() == ConfigureWorkerParam.Action.ADD_WORKER) {
            List discoveredServers = this.supervisorRegistry.getDiscoveredServers(configureOneWorkerRequest.getGroup());
            if (discoveredServers != null) {
                Stream stream = discoveredServers.stream();
                worker.getClass();
                if (stream.anyMatch(worker::sameWorker)) {
                    throw new KeyExistsException("Worker already registered: " + worker);
                }
            }
            verifyWorkerSignature(worker);
            configureOneWorkerRequest.setData(configureOneWorkerRequest.getGroup());
        } else if (!getDiscoveredWorkers(configureOneWorkerRequest.getGroup()).contains(worker)) {
            throw new KeyNotExistsException("Not found worker: " + worker);
        }
        configureWorker(worker, configureOneWorkerRequest.getAction(), configureOneWorkerRequest.getData());
    }

    public void configureAllWorker(ConfigureAllWorkerRequest configureAllWorkerRequest) {
        MultithreadExecutors.run(getDiscoveredWorkers(configureAllWorkerRequest.getGroup()), worker -> {
            configureWorker(worker, configureAllWorkerRequest.getAction(), configureAllWorkerRequest.getData());
        }, ThreadPoolExecutors.commonThreadPool());
    }

    public void publishOtherSupervisors(EventParam eventParam) {
        try {
            MultithreadExecutors.run((List) this.supervisorRegistry.getRegisteredServers().stream().filter(supervisor -> {
                return !this.currentSupervisor.sameSupervisor(supervisor);
            }).collect(Collectors.toList()), supervisor2 -> {
                publishSupervisor(supervisor2, eventParam);
            }, ThreadPoolExecutors.commonThreadPool());
        } catch (Exception e) {
            LOG.error("Publish all supervisor error.", e);
        }
    }

    private SupervisorMetricsResponse getSupervisorMetrics(Supervisor supervisor) {
        SupervisorMetrics supervisorMetrics = null;
        Long l = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            supervisorMetrics = (SupervisorMetrics) this.supervisorRpcClient.call(supervisor, (v0) -> {
                return v0.metrics();
            });
            l = Long.valueOf(System.currentTimeMillis() - currentTimeMillis);
        } catch (Throwable th) {
            LOG.warn("Ping supervisor occur error: {} {}", supervisor, th.getMessage());
        }
        SupervisorMetricsResponse supervisorMetricsResponse = supervisorMetrics == null ? new SupervisorMetricsResponse() : ServerMetricsConverter.INSTANCE.convert(supervisorMetrics);
        supervisorMetricsResponse.setHost(supervisor.getHost());
        supervisorMetricsResponse.setPort(supervisor.getPort());
        supervisorMetricsResponse.setPingTime(l);
        return supervisorMetricsResponse;
    }

    private WorkerMetricsResponse getWorkerMetrics(Worker worker) {
        WorkerMetrics workerMetrics = null;
        Long l = null;
        String group = worker.getGroup();
        GetMetricsParam buildGetMetricsParam = buildGetMetricsParam(group);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            workerMetrics = (WorkerMetrics) this.workerRpcClient.call(worker, workerRpcService -> {
                return workerRpcService.metrics(buildGetMetricsParam);
            });
            l = Long.valueOf(System.currentTimeMillis() - currentTimeMillis);
        } catch (Throwable th) {
            LOG.warn("Ping worker occur error: {} {}", worker, th.getMessage());
        }
        WorkerMetricsResponse workerMetricsResponse = (workerMetrics == null || !SchedGroupService.verifyWorkerSignatureToken(workerMetrics.getSignature(), group)) ? new WorkerMetricsResponse(worker.getWorkerId()) : ServerMetricsConverter.INSTANCE.convert(workerMetrics);
        workerMetricsResponse.setHost(worker.getHost());
        workerMetricsResponse.setPort(worker.getPort());
        workerMetricsResponse.setPingTime(l);
        return workerMetricsResponse;
    }

    private List<Worker> getDiscoveredWorkers(String str) {
        List<Worker> discoveredServers = this.supervisorRegistry.getDiscoveredServers(str);
        if (CollectionUtils.isEmpty(discoveredServers)) {
            throw new KeyNotExistsException("Group '" + str + "' not exists workers.");
        }
        return discoveredServers;
    }

    private void verifyWorkerSignature(Worker worker) {
        String group = worker.getGroup();
        GetMetricsParam buildGetMetricsParam = buildGetMetricsParam(group);
        if (!SchedGroupService.verifyWorkerSignatureToken(((WorkerMetrics) this.workerRpcClient.call(worker, workerRpcService -> {
            return workerRpcService.metrics(buildGetMetricsParam);
        })).getSignature(), group)) {
            throw new AuthenticationException("Worker authenticated failed: " + worker);
        }
    }

    private void configureWorker(Worker worker, ConfigureWorkerParam.Action action, String str) {
        ConfigureWorkerParam configureWorkerParam = new ConfigureWorkerParam(SchedGroupService.createSupervisorAuthenticationToken(worker.getGroup()));
        configureWorkerParam.setAction(action);
        configureWorkerParam.setData(str);
        this.workerRpcClient.invoke(worker, workerRpcService -> {
            workerRpcService.configureWorker(configureWorkerParam);
        });
    }

    private void publishSupervisor(Supervisor supervisor, EventParam eventParam) {
        RetryTemplate.executeQuietly(() -> {
            this.supervisorRpcClient.invoke(supervisor, supervisorRpcService -> {
                supervisorRpcService.publish(eventParam);
            });
        }, 1, 2000L);
    }

    private GetMetricsParam buildGetMetricsParam(String str) {
        return new GetMetricsParam(SchedGroupService.createSupervisorAuthenticationToken(str), str);
    }
}
