package cn.ponfee.disjob.worker.base;

import cn.ponfee.disjob.common.base.Startable;
import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.base.Supervisor;
import cn.ponfee.disjob.core.base.SupervisorService;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.param.ExecuteTaskParam;
import cn.ponfee.disjob.core.param.TaskWorkerParam;
import cn.ponfee.disjob.registry.Discovery;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ponfee/disjob/worker/base/TimingWheelRotator.class */
public class TimingWheelRotator implements Startable {
    private static final Logger LOG = LoggerFactory.getLogger(TimingWheelRotator.class);
    private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
    private final Worker currentWorker;
    private final SupervisorService supervisorServiceClient;
    private final Discovery<Supervisor> discoverySupervisor;
    private final TimingWheel<ExecuteTaskParam> timingWheel;
    private final WorkerThreadPool workerThreadPool;
    private final ExecutorService processExecutor;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private volatile int round = 0;
    private final ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1, runnable -> {
        Thread thread = new Thread(runnable, "timing_wheel_rotate");
        thread.setDaemon(true);
        thread.setPriority(10);
        return thread;
    });

    public TimingWheelRotator(Worker worker, SupervisorService supervisorService, Discovery<Supervisor> discovery, TimingWheel<ExecuteTaskParam> timingWheel, WorkerThreadPool workerThreadPool, int i) {
        this.currentWorker = worker;
        this.supervisorServiceClient = supervisorService;
        this.discoverySupervisor = discovery;
        this.timingWheel = timingWheel;
        this.workerThreadPool = workerThreadPool;
        int max = Math.max(1, i);
        this.processExecutor = ThreadPoolExecutors.builder().corePoolSize(max).maximumPoolSize(max).workQueue(new LinkedBlockingQueue(Integer.MAX_VALUE)).keepAliveTimeSeconds(300L).threadFactory(NamedThreadFactory.builder().prefix("timing_wheel_process").build()).prestartCoreThreadType(ThreadPoolExecutors.PrestartCoreThreadType.ONE).build();
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            LOG.warn("Timing wheel rotator already started.");
        } else {
            LOG.info("Timing wheel rotator started.");
            this.scheduledExecutor.scheduleAtFixedRate(Throwables.caught(this::process), this.timingWheel.getTickMs(), this.timingWheel.getTickMs(), TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (!this.started.compareAndSet(true, false)) {
            LOG.warn("Timing wheel rotator already stopped.");
            return;
        }
        LOG.info("Timing wheel rotator stopped.");
        Throwables.ThrowingSupplier.caught(() -> {
            return Boolean.valueOf(ThreadPoolExecutors.shutdown(this.scheduledExecutor, 3));
        });
        Throwables.ThrowingSupplier.caught(() -> {
            return Boolean.valueOf(ThreadPoolExecutors.shutdown(this.processExecutor, 3));
        });
    }

    private void process() {
        int i = this.round + 1;
        this.round = i;
        if (i > 1024) {
            this.round = 0;
            LOG.info("Timing wheel rotator heartbeat: worker-thread-pool={}, jvm-thread-count={}", this.workerThreadPool, Integer.valueOf(Thread.activeCount()));
        }
        if (this.started.get()) {
            if (!this.discoverySupervisor.hasDiscoveredServers()) {
                if ((this.round & 31) == 0) {
                    LOG.warn("Not found available supervisor.");
                }
            } else {
                List poll = this.timingWheel.poll();
                if (poll.isEmpty()) {
                    return;
                }
                this.processExecutor.execute(() -> {
                    process(poll);
                });
            }
        }
    }

    private void process(List<ExecuteTaskParam> list) {
        List list2 = (List) list.stream().filter(executeTaskParam -> {
            Worker worker = executeTaskParam.getWorker();
            if (!this.currentWorker.sameWorker(worker)) {
                LOG.error("Processed unmatched worker: {} | '{}' | '{}'", new Object[]{Long.valueOf(executeTaskParam.getTaskId()), this.currentWorker, worker});
                return false;
            }
            if (!this.currentWorker.getWorkerId().equals(worker.getWorkerId())) {
                LOG.warn("Processed former worker: {} | '{}' | '{}'", new Object[]{Long.valueOf(executeTaskParam.getTaskId()), this.currentWorker, worker});
            }
            LOG.info("Processed task {} | {} | {} | {}", new Object[]{Long.valueOf(executeTaskParam.getTaskId()), executeTaskParam.getOperation(), worker, DATE_FORMAT.format(executeTaskParam.getTriggerTime())});
            return true;
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return;
        }
        for (List list3 : Lists.partition(list2, 200)) {
            List list4 = (List) list3.stream().filter(executeTaskParam2 -> {
                return executeTaskParam2.getRouteStrategy() != RouteStrategy.BROADCAST;
            }).map(executeTaskParam3 -> {
                return new TaskWorkerParam(Long.valueOf(executeTaskParam3.getTaskId()), executeTaskParam3.getWorker().serialize());
            }).collect(Collectors.toList());
            try {
                this.supervisorServiceClient.updateTaskWorker(list4);
            } catch (Throwable th) {
                LOG.error("Update task worker error: " + Jsons.toJson(list4), th);
            }
            WorkerThreadPool workerThreadPool = this.workerThreadPool;
            workerThreadPool.getClass();
            list3.forEach(workerThreadPool::submit);
        }
    }
}
