package me.kpali.wolfflow.core.scheduler.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.kpali.wolfflow.core.cluster.IClusterController;
import me.kpali.wolfflow.core.config.SchedulerConfig;
import me.kpali.wolfflow.core.enums.TaskFlowScheduleStatusEnum;
import me.kpali.wolfflow.core.enums.TaskFlowStatusEnum;
import me.kpali.wolfflow.core.enums.TaskStatusEnum;
import me.kpali.wolfflow.core.event.ScheduleStatusChangeEvent;
import me.kpali.wolfflow.core.event.ScheduleStatusEventPublisher;
import me.kpali.wolfflow.core.event.TaskFlowStatusEventPublisher;
import me.kpali.wolfflow.core.event.TaskStatusEventPublisher;
import me.kpali.wolfflow.core.exception.InvalidCronExpressionException;
import me.kpali.wolfflow.core.exception.InvalidTaskFlowException;
import me.kpali.wolfflow.core.exception.TaskFlowExecuteException;
import me.kpali.wolfflow.core.exception.TaskFlowInterruptedException;
import me.kpali.wolfflow.core.exception.TaskFlowStopException;
import me.kpali.wolfflow.core.exception.TaskFlowTriggerException;
import me.kpali.wolfflow.core.exception.TryLockException;
import me.kpali.wolfflow.core.executor.ITaskFlowExecutor;
import me.kpali.wolfflow.core.logger.ITaskFlowLogger;
import me.kpali.wolfflow.core.logger.ITaskLogger;
import me.kpali.wolfflow.core.model.ClusterConstants;
import me.kpali.wolfflow.core.model.ContextKey;
import me.kpali.wolfflow.core.model.TaskFlow;
import me.kpali.wolfflow.core.model.TaskFlowContextWrapper;
import me.kpali.wolfflow.core.model.TaskFlowExecRequest;
import me.kpali.wolfflow.core.model.TaskFlowLog;
import me.kpali.wolfflow.core.model.TaskFlowStatus;
import me.kpali.wolfflow.core.model.TaskLog;
import me.kpali.wolfflow.core.querier.ITaskFlowQuerier;
import me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler;
import me.kpali.wolfflow.core.scheduler.impl.quartz.MyDynamicScheduler;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:me/kpali/wolfflow/core/scheduler/impl/DefaultTaskFlowScheduler.class */
public class DefaultTaskFlowScheduler implements ITaskFlowScheduler {
    private static final Logger log = LoggerFactory.getLogger(DefaultTaskFlowScheduler.class);

    @Autowired
    private SchedulerConfig schedulerConfig;

    @Autowired
    private TaskFlowStatusEventPublisher taskFlowStatusEventPublisher;

    @Autowired
    private TaskStatusEventPublisher taskStatusEventPublisher;

    @Autowired
    private ScheduleStatusEventPublisher scheduleStatusEventPublisher;
    private boolean started = false;
    private final Object lock = new Object();

    @Autowired
    private ITaskFlowQuerier taskFlowQuerier;
    private ExecutorService threadPool;

    @Autowired
    private ITaskFlowExecutor taskFlowExecutor;

    @Autowired
    private ITaskFlowLogger taskFlowLogger;

    @Autowired
    private ITaskLogger taskLogger;

    @Autowired
    private IClusterController clusterController;

    @Autowired
    private SystemTimeUtils systemTimeUtils;

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public void startup() {
        if (this.started) {
            return;
        }
        log.info("任务流调度器启动，任务流执行请求扫描间隔：{}秒，定时任务流扫描间隔：{}秒，定时任务流扫描加锁等待时间：{}秒，定时任务流扫描自动解锁时间：{}秒，核心线程数：{}，最大线程数：{}", new Object[]{this.schedulerConfig.getExecRequestScanInterval(), this.schedulerConfig.getCronScanInterval(), this.schedulerConfig.getCronScanLockWaitTime(), this.schedulerConfig.getCronScanLockLeaseTime(), this.schedulerConfig.getCorePoolSize(), this.schedulerConfig.getMaximumPoolSize()});
        this.started = true;
        startTaskFlowScaner();
    }

    private void startTaskFlowScaner() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new ThreadFactoryBuilder().setNameFormat("taskFlowScaner-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
        log.info("任务流执行请求扫描线程启动");
        threadPoolExecutor.execute(() -> {
            while (true) {
                try {
                    Thread.sleep(this.schedulerConfig.getExecRequestScanInterval().intValue() * 1000);
                    TaskFlowExecRequest execRequestPoll = this.clusterController.execRequestPoll();
                    if (execRequestPoll != null) {
                        TaskFlow taskFlow = execRequestPoll.getTaskFlow();
                        Map<String, Object> taskFlowContext = execRequestPoll.getTaskFlowContext();
                        TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper(taskFlowContext);
                        log.info("扫描到新的任务流执行请求，任务流ID：{}，任务流日志ID：{}，当前节点ID：{}", new Object[]{execRequestPoll.getTaskFlow().getId(), (Long) taskFlowContextWrapper.getValue("logId", Long.class), this.clusterController.getNodeId()});
                        taskFlowContextWrapper.put(ContextKey.EXECUTED_BY_NODE, this.clusterController.getNodeId());
                        if (this.threadPool == null) {
                            synchronized (this.lock) {
                                if (this.threadPool == null) {
                                    this.threadPool = new ThreadPoolExecutor(this.schedulerConfig.getCorePoolSize().intValue(), this.schedulerConfig.getMaximumPoolSize().intValue(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1024), new ThreadFactoryBuilder().setNameFormat("schedulerExecutor-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
                                }
                            }
                        }
                        this.threadPool.execute(() -> {
                            try {
                                this.taskFlowExecutor.beforeExecute(taskFlow, taskFlowContext);
                                this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContext, TaskFlowStatusEnum.EXECUTING.getCode(), null, true);
                                this.taskFlowExecutor.execute(taskFlow, taskFlowContext);
                                this.taskFlowExecutor.afterExecute(taskFlow, taskFlowContext);
                                this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContext, TaskFlowStatusEnum.EXECUTE_SUCCESS.getCode(), null, true);
                            } catch (TaskFlowExecuteException | TaskFlowInterruptedException e) {
                                log.error("任务流执行失败！任务流ID：" + taskFlow.getId() + " 异常信息：" + e.getMessage(), e);
                                this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContext, TaskFlowStatusEnum.EXECUTE_FAILURE.getCode(), e.getMessage(), true);
                            }
                        });
                    }
                } catch (Exception e) {
                    log.error("任务流执行请求扫描异常！" + e.getMessage(), e);
                }
            }
        });
        log.info("定时任务流扫描线程启动");
        threadPoolExecutor.execute(() -> {
            String valueOf;
            String cron;
            while (true) {
                try {
                    Thread.sleep(this.schedulerConfig.getCronScanInterval().intValue() * 1000);
                    if (this.clusterController.tryLock(ClusterConstants.CRON_TASK_FLOW_SCAN_LOCK, this.schedulerConfig.getCronScanLockWaitTime().intValue(), this.schedulerConfig.getCronScanLockLeaseTime().intValue(), TimeUnit.SECONDS)) {
                        log.info("定时任务流扫描线程获取锁成功");
                        List<TaskFlow> listCronTaskFlow = this.taskFlowQuerier.listCronTaskFlow();
                        List<TaskFlow> arrayList = listCronTaskFlow == null ? new ArrayList<>() : listCronTaskFlow;
                        log.info("共扫描到{}个定时任务流", Integer.valueOf(arrayList.size()));
                        ArrayList<JobKey> arrayList2 = new ArrayList();
                        for (JobKey jobKey : MyDynamicScheduler.getJobKeysGroupEquals("DefaultJobGroup")) {
                            boolean z = false;
                            Iterator<TaskFlow> it = arrayList.iterator();
                            while (true) {
                                if (it.hasNext()) {
                                    if (String.valueOf(it.next().getId()).equals(jobKey.getName())) {
                                        z = true;
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            if (!z) {
                                arrayList2.add(jobKey);
                            }
                        }
                        for (JobKey jobKey2 : arrayList2) {
                            MyDynamicScheduler.removeJob(jobKey2.getName(), jobKey2.getGroup());
                        }
                        for (TaskFlow taskFlow : arrayList) {
                            try {
                                valueOf = String.valueOf(taskFlow.getId());
                                cron = taskFlow.getCron();
                            } catch (Exception e) {
                                log.error("定时任务流调度失败，任务流ID：" + taskFlow.getId() + "，失败原因：" + e.getMessage());
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.FAIL.getCode()));
                            }
                            if (cron == null || cron.length() == 0) {
                                throw new InvalidCronExpressionException("cron表达式不能为空");
                            }
                            HashMap hashMap = new HashMap();
                            if (taskFlow.getFromTaskId() != null) {
                                hashMap.put(ContextKey.FROM_TASK_ID, taskFlow.getFromTaskId());
                            }
                            if (taskFlow.getToTaskId() != null) {
                                hashMap.put(ContextKey.TO_TASK_ID, taskFlow.getToTaskId());
                            }
                            if (MyDynamicScheduler.checkExists(valueOf, "DefaultJobGroup")) {
                                MyDynamicScheduler.updateJobCron(valueOf, "DefaultJobGroup", cron, hashMap);
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.UPDATE.getCode()));
                            } else {
                                MyDynamicScheduler.addJob(valueOf, "DefaultJobGroup", cron, hashMap);
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.JOIN.getCode()));
                            }
                        }
                    } else {
                        log.info("定时任务流扫描线程获取锁失败");
                        MyDynamicScheduler.clear();
                    }
                } catch (Exception e2) {
                    log.error("定时任务流扫描异常！" + e2.getMessage(), e2);
                }
            }
        });
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long trigger(Long l, Map<String, Object> map) throws InvalidTaskFlowException, TaskFlowTriggerException {
        return trigger(l, null, null, map);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long trigger(Long l, Long l2, Map<String, Object> map) throws InvalidTaskFlowException, TaskFlowTriggerException {
        return trigger(l, l2, l2, map);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long triggerFrom(Long l, Long l2, Map<String, Object> map) throws InvalidTaskFlowException, TaskFlowTriggerException {
        return trigger(l, l2, null, map);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long triggerTo(Long l, Long l2, Map<String, Object> map) throws InvalidTaskFlowException, TaskFlowTriggerException {
        return trigger(l, null, l2, map);
    }

    private long trigger(Long l, Long l2, Long l3, Map<String, Object> map) throws InvalidTaskFlowException, TaskFlowTriggerException {
        TaskFlow taskFlow = this.taskFlowQuerier.getTaskFlow(l);
        long uniqueTimeStamp = this.systemTimeUtils.getUniqueTimeStamp();
        TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper();
        if (map != null) {
            taskFlowContextWrapper.setParams(map);
        }
        if (l2 != null) {
            taskFlowContextWrapper.put(ContextKey.FROM_TASK_ID, l2);
        }
        if (l3 != null) {
            taskFlowContextWrapper.put(ContextKey.TO_TASK_ID, l3);
        }
        taskFlowContextWrapper.put("logId", Long.valueOf(uniqueTimeStamp));
        TaskFlowStatus taskFlowStatus = new TaskFlowStatus();
        taskFlowStatus.setTaskFlow(taskFlow);
        taskFlowStatus.setTaskFlowContext(taskFlowContextWrapper.getContext());
        taskFlowStatus.setStatus(TaskFlowStatusEnum.WAIT_FOR_EXECUTE.getCode());
        taskFlowStatus.setMessage(null);
        try {
            boolean tryLock = this.clusterController.tryLock(ClusterConstants.TASK_FLOW_LOG_LOCK, 10L, 15L, TimeUnit.SECONDS);
            if (!tryLock) {
                throw new TryLockException("获取任务流日志记录锁失败！");
            }
            TaskFlowLog last = this.taskFlowLogger.last(taskFlow.getId());
            if (last != null && this.taskFlowLogger.isInProgress(last)) {
                throw new TaskFlowTriggerException("不允许同时多次执行！");
            }
            TaskFlowLog taskFlowLog = new TaskFlowLog();
            taskFlowLog.setLogId(Long.valueOf(uniqueTimeStamp));
            taskFlowLog.setTaskFlowId(taskFlow.getId());
            taskFlowLog.setTaskFlow(taskFlowStatus.getTaskFlow());
            taskFlowLog.setTaskFlowContext(taskFlowStatus.getTaskFlowContext());
            taskFlowLog.setStatus(taskFlowStatus.getStatus());
            taskFlowLog.setMessage(taskFlowStatus.getMessage());
            this.taskFlowLogger.add(taskFlowLog);
            if (tryLock) {
                this.clusterController.unlock(ClusterConstants.TASK_FLOW_LOG_LOCK);
            }
            this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContextWrapper.getContext(), TaskFlowStatusEnum.WAIT_FOR_EXECUTE.getCode(), null, false);
            if (!this.clusterController.execRequestOffer(new TaskFlowExecRequest(taskFlow, taskFlowContextWrapper.getContext()))) {
                this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContextWrapper.getContext(), TaskFlowStatusEnum.EXECUTE_FAILURE.getCode(), "插入执行请求队列失败", true);
            }
            return uniqueTimeStamp;
        } catch (Throwable th) {
            if (0 != 0) {
                this.clusterController.unlock(ClusterConstants.TASK_FLOW_LOG_LOCK);
            }
            throw th;
        }
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public void stop(Long l) throws TaskFlowStopException {
        boolean z = false;
        try {
            z = this.clusterController.tryLock(ClusterConstants.TASK_FLOW_LOG_LOCK, 10L, 15L, TimeUnit.SECONDS);
            if (!z) {
                throw new TryLockException("获取任务流日志记录锁失败！");
            }
            TaskFlowLog taskFlowLog = this.taskFlowLogger.get(l);
            if (taskFlowLog != null && this.taskFlowLogger.isInProgress(taskFlowLog)) {
                if (this.clusterController.isNodeAlive((String) new TaskFlowContextWrapper(taskFlowLog.getTaskFlowContext()).getValue(ContextKey.EXECUTED_BY_NODE, String.class))) {
                    if (!this.clusterController.stopRequestContains(l).booleanValue()) {
                        this.clusterController.stopRequestAdd(l);
                    }
                    taskFlowLog.setStatus(TaskFlowStatusEnum.STOPPING.getCode());
                    taskFlowLog.setMessage(null);
                } else {
                    try {
                        boolean tryLock = this.clusterController.tryLock(ClusterConstants.TASK_LOG_LOCK, 10L, 15L, TimeUnit.SECONDS);
                        if (!tryLock) {
                            throw new TryLockException("获取任务日志记录锁失败！");
                        }
                        List<TaskLog> list = this.taskLogger.list(l);
                        if (list != null) {
                            for (TaskLog taskLog : list) {
                                if (this.taskLogger.isInProgress(taskLog)) {
                                    taskLog.setStatus(TaskStatusEnum.EXECUTE_FAILURE.getCode());
                                    taskLog.setMessage("任务被终止执行");
                                    this.taskLogger.update(taskLog);
                                    this.taskStatusEventPublisher.publishEvent(taskLog.getTask(), taskLog.getTaskFlowId(), taskLog.getTaskFlowContext(), taskLog.getStatus(), taskLog.getMessage(), false);
                                }
                            }
                        }
                        if (tryLock) {
                            this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
                        }
                        taskFlowLog.setStatus(TaskFlowStatusEnum.EXECUTE_FAILURE.getCode());
                        taskFlowLog.setMessage("任务流被终止执行");
                    } catch (Throwable th) {
                        if (0 != 0) {
                            this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
                        }
                        throw th;
                    }
                }
                this.taskFlowLogger.update(taskFlowLog);
                this.taskFlowStatusEventPublisher.publishEvent(taskFlowLog.getTaskFlow(), taskFlowLog.getTaskFlowContext(), taskFlowLog.getStatus(), taskFlowLog.getMessage(), false);
            }
            if (z) {
                this.clusterController.unlock(ClusterConstants.TASK_FLOW_LOG_LOCK);
            }
        } catch (Throwable th2) {
            if (z) {
                this.clusterController.unlock(ClusterConstants.TASK_FLOW_LOG_LOCK);
            }
            throw th2;
        }
    }
}
