package cn.ponfee.disjob.worker.base;

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.base.Startable;
import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.base.Supervisor;
import cn.ponfee.disjob.core.base.SupervisorRpcService;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.param.supervisor.UpdateTaskWorkerParam;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
import cn.ponfee.disjob.registry.Discovery;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ponfee/disjob/worker/base/TimingWheelRotator.class */
public class TimingWheelRotator extends SingletonClassConstraint implements Startable {
    private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
    private static final Logger LOG = LoggerFactory.getLogger(TimingWheelRotator.class);
    private final SupervisorRpcService supervisorRpcClient;
    private final Discovery<Supervisor> discoverySupervisor;
    private final TimingWheel<ExecuteTaskParam> timingWheel;
    private final WorkerThreadPool workerThreadPool;
    private final LoopThread heartbeatThread;
    private final ExecutorService processExecutor;
    private volatile long nextLogTimeMillis = 0;

    public TimingWheelRotator(SupervisorRpcService supervisorRpcService, Discovery<Supervisor> discovery, TimingWheel<ExecuteTaskParam> timingWheel, WorkerThreadPool workerThreadPool, int i) {
        this.supervisorRpcClient = supervisorRpcService;
        this.discoverySupervisor = discovery;
        this.timingWheel = timingWheel;
        this.workerThreadPool = workerThreadPool;
        this.heartbeatThread = new LoopThread("timing_wheel_rotate", timingWheel.getTickMs(), 0L, this::process);
        int max = Math.max(1, i);
        this.processExecutor = ThreadPoolExecutors.builder().corePoolSize(max).maximumPoolSize(max).workQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)).keepAliveTimeSeconds(300L).threadFactory(NamedThreadFactory.builder().prefix("timing_wheel_process").uncaughtExceptionHandler(LOG).build()).build();
    }

    public void start() {
        this.heartbeatThread.start();
    }

    public void stop() {
        if (this.heartbeatThread.terminate()) {
            ThreadPoolExecutors.shutdown(this.processExecutor, 2);
        }
    }

    private void process() {
        if (this.discoverySupervisor.hasDiscoveredServers()) {
            List poll = this.timingWheel.poll();
            if (poll.isEmpty()) {
                return;
            }
            this.processExecutor.execute(() -> {
                process(poll);
            });
            return;
        }
        if (System.currentTimeMillis() > this.nextLogTimeMillis) {
            this.nextLogTimeMillis = System.currentTimeMillis() + 5000;
            LOG.warn("Not found available supervisor.");
        }
    }

    private void process(List<ExecuteTaskParam> list) {
        for (List list2 : Lists.partition(list, 200)) {
            List list3 = (List) list2.stream().filter(executeTaskParam -> {
                return executeTaskParam.getRouteStrategy() != RouteStrategy.BROADCAST;
            }).map(executeTaskParam2 -> {
                return new UpdateTaskWorkerParam(executeTaskParam2.getTaskId(), executeTaskParam2.getWorker());
            }).collect(Collectors.toList());
            Throwables.ThrowingRunnable.doCaught(() -> {
                this.supervisorRpcClient.updateTaskWorker(list3);
            }, () -> {
                return "Update task worker error: " + Jsons.toJson(list3);
            });
            list2.forEach(executeTaskParam3 -> {
                LOG.info("Task trace [{}] triggered: {}, {}, {}", new Object[]{Long.valueOf(executeTaskParam3.getTaskId()), executeTaskParam3.getOperation(), executeTaskParam3.getWorker(), DATE_FORMAT.format(executeTaskParam3.getTriggerTime())});
                this.workerThreadPool.submit(executeTaskParam3);
            });
        }
    }
}
