package me.kpali.wolfflow.core.scheduler.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.kpali.wolfflow.core.cluster.IClusterController;
import me.kpali.wolfflow.core.config.ClusterConfig;
import me.kpali.wolfflow.core.config.SchedulerConfig;
import me.kpali.wolfflow.core.enums.TaskFlowScheduleStatusEnum;
import me.kpali.wolfflow.core.enums.TaskFlowStatusEnum;
import me.kpali.wolfflow.core.enums.TaskStatusEnum;
import me.kpali.wolfflow.core.event.ScheduleStatusChangeEvent;
import me.kpali.wolfflow.core.event.ScheduleStatusEventPublisher;
import me.kpali.wolfflow.core.event.TaskFlowStatusEventPublisher;
import me.kpali.wolfflow.core.event.TaskStatusEventPublisher;
import me.kpali.wolfflow.core.exception.InvalidCronExpressionException;
import me.kpali.wolfflow.core.exception.TaskFlowStopException;
import me.kpali.wolfflow.core.exception.TaskFlowTriggerException;
import me.kpali.wolfflow.core.exception.TryLockException;
import me.kpali.wolfflow.core.executor.ITaskFlowExecutor;
import me.kpali.wolfflow.core.logger.ITaskFlowLogger;
import me.kpali.wolfflow.core.logger.ITaskLogger;
import me.kpali.wolfflow.core.model.ClusterConstants;
import me.kpali.wolfflow.core.model.ContextKey;
import me.kpali.wolfflow.core.model.TaskFlow;
import me.kpali.wolfflow.core.model.TaskFlowExecRequest;
import me.kpali.wolfflow.core.model.TaskFlowLog;
import me.kpali.wolfflow.core.model.TaskFlowStatus;
import me.kpali.wolfflow.core.model.TaskLog;
import me.kpali.wolfflow.core.monitor.IMonitor;
import me.kpali.wolfflow.core.querier.ITaskFlowQuerier;
import me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler;
import me.kpali.wolfflow.core.scheduler.impl.quartz.MyDynamicScheduler;
import me.kpali.wolfflow.core.util.IdGenerator;
import me.kpali.wolfflow.core.util.context.TaskFlowContextWrapper;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:me/kpali/wolfflow/core/scheduler/impl/DefaultTaskFlowScheduler.class */
public class DefaultTaskFlowScheduler implements ITaskFlowScheduler {
    private static final Logger logger = LoggerFactory.getLogger(DefaultTaskFlowScheduler.class);

    @Autowired
    private SchedulerConfig schedulerConfig;

    @Autowired
    private ClusterConfig clusterConfig;

    @Autowired
    private TaskFlowStatusEventPublisher taskFlowStatusEventPublisher;

    @Autowired
    private TaskStatusEventPublisher taskStatusEventPublisher;

    @Autowired
    private ScheduleStatusEventPublisher scheduleStatusEventPublisher;

    @Autowired
    private ITaskFlowQuerier taskFlowQuerier;
    private ExecutorService schedulerThreadPool;

    @Autowired
    private ITaskFlowExecutor taskFlowExecutor;

    @Autowired
    private ITaskFlowLogger taskFlowLogger;

    @Autowired
    private ITaskLogger taskLogger;

    @Autowired
    private IClusterController clusterController;

    @Autowired
    private IdGenerator idGenerator;

    @Autowired
    private IMonitor monitor;
    private boolean started = false;
    private final ThreadFactory schedulerThreadFactory = new ThreadFactoryBuilder().setNameFormat("task-flow-scheduler-pool-%d").build();

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public void startup() {
        if (this.started) {
            return;
        }
        logger.info("Starting task flow scheduler, execRequestScanInterval: {}s, cronScanInterval: {}s, cronScanLockWaitTime: {}s, cronScanLockLeaseTime: {}s, corePoolSize: {}, maximumPoolSize: {}", new Object[]{this.schedulerConfig.getExecRequestScanInterval(), this.schedulerConfig.getCronScanInterval(), this.schedulerConfig.getCronScanLockWaitTime(), this.schedulerConfig.getCronScanLockLeaseTime(), this.schedulerConfig.getCorePoolSize(), this.schedulerConfig.getMaximumPoolSize()});
        this.started = true;
        startTaskFlowScaner();
    }

    private void startTaskFlowScaner() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new ThreadFactoryBuilder().setNameFormat("taskFlowScaner-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
        logger.info("Start scanning task flow execute request...");
        threadPoolExecutor.execute(() -> {
            while (true) {
                try {
                    Thread.sleep(this.schedulerConfig.getExecRequestScanInterval().intValue() * 1000);
                    ThreadPoolExecutor threadPoolExecutor2 = this.schedulerThreadPool != null ? (ThreadPoolExecutor) this.schedulerThreadPool : null;
                    if (threadPoolExecutor2 == null || threadPoolExecutor2.getActiveCount() < threadPoolExecutor2.getMaximumPoolSize()) {
                        TaskFlowExecRequest execRequestPoll = this.clusterController.execRequestPoll();
                        if (execRequestPoll != null) {
                            Long taskFlowId = execRequestPoll.getTaskFlowId();
                            TaskFlow taskFlow = this.taskFlowQuerier.getTaskFlow(taskFlowId);
                            ConcurrentHashMap<String, Object> context = execRequestPoll.getContext();
                            TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper(context);
                            Long l = (Long) taskFlowContextWrapper.getValue("logId", Long.class);
                            Boolean bool = (Boolean) taskFlowContextWrapper.getValue(ContextKey.IS_ROLLBACK, Boolean.class);
                            logger.info("New task flow execute request was scanned, id: {}, log id: {}, rollback: {}", new Object[]{taskFlowId, l, bool});
                            taskFlowContextWrapper.put(ContextKey.EXECUTED_BY_NODE, this.clusterController.getNodeId());
                            if (this.schedulerThreadPool == null) {
                                synchronized (this.schedulerThreadFactory) {
                                    if (this.schedulerThreadPool == null) {
                                        this.schedulerThreadPool = new ThreadPoolExecutor(this.schedulerConfig.getCorePoolSize().intValue(), this.schedulerConfig.getMaximumPoolSize().intValue(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1024), this.schedulerThreadFactory, new ThreadPoolExecutor.AbortPolicy());
                                        this.monitor.monitor(this.schedulerThreadPool, "task-flow-scheduler");
                                    }
                                }
                            }
                            if (bool.booleanValue()) {
                                this.schedulerThreadPool.execute(() -> {
                                    try {
                                        this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.ROLLING_BACK.getCode(), null, true);
                                        this.taskFlowExecutor.beforeRollback(taskFlow, context);
                                        this.taskFlowExecutor.rollback(taskFlow, context);
                                        this.taskFlowExecutor.afterRollback(taskFlow, context);
                                        this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.ROLLBACK_SUCCESS.getCode(), null, true);
                                    } catch (Exception e) {
                                        String message = e.getMessage();
                                        if (message == null) {
                                            message = e.toString();
                                        }
                                        logger.error("Task flow [" + taskFlow.getId() + "] rollback failed! cause: " + message, e);
                                        try {
                                            this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.ROLLBACK_FAILURE.getCode(), message, true);
                                        } catch (Exception e2) {
                                            logger.error("Failed to publish task flow status event! " + e2.getMessage(), e2);
                                        }
                                    }
                                });
                            } else {
                                this.schedulerThreadPool.execute(() -> {
                                    try {
                                        this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.EXECUTING.getCode(), null, true);
                                        this.taskFlowExecutor.beforeExecute(taskFlow, context);
                                        this.taskFlowExecutor.execute(taskFlow, context);
                                        this.taskFlowExecutor.afterExecute(taskFlow, context);
                                        this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.EXECUTE_SUCCESS.getCode(), null, true);
                                    } catch (Exception e) {
                                        String message = e.getMessage();
                                        if (message == null) {
                                            message = e.toString();
                                        }
                                        logger.error("Task flow [" + taskFlow.getId() + "] execution failed! cause: " + e.getMessage(), e);
                                        try {
                                            this.taskFlowStatusEventPublisher.publishEvent(taskFlow, context, TaskFlowStatusEnum.EXECUTE_FAILURE.getCode(), message, true);
                                        } catch (Exception e2) {
                                            logger.error("Failed to publish task flow status event! " + e2.getMessage(), e2);
                                        }
                                    }
                                });
                            }
                        }
                    } else {
                        logger.debug("Task flow scheduler at full load, stop receiving new requests until there are idle threads.");
                    }
                } catch (Exception e) {
                    logger.error("Failed to scan task flow execute request! " + e.getMessage(), e);
                }
            }
        });
        logger.info("Start scanning cron task flow...");
        threadPoolExecutor.execute(() -> {
            String valueOf;
            String cron;
            while (true) {
                try {
                    Thread.sleep(this.schedulerConfig.getCronScanInterval().intValue() * 1000);
                    if (this.clusterController.tryLock(ClusterConstants.CRON_TASK_FLOW_SCAN_LOCK, this.schedulerConfig.getCronScanLockWaitTime().intValue(), this.schedulerConfig.getCronScanLockLeaseTime().intValue(), TimeUnit.SECONDS)) {
                        logger.info("Acquire cron task flow scan lock success");
                        List<TaskFlow> listCronTaskFlow = this.taskFlowQuerier.listCronTaskFlow();
                        List<TaskFlow> arrayList = listCronTaskFlow == null ? new ArrayList<>() : listCronTaskFlow;
                        logger.info("{} cron task flows were scanned", Integer.valueOf(arrayList.size()));
                        ArrayList<JobKey> arrayList2 = new ArrayList();
                        for (JobKey jobKey : MyDynamicScheduler.getJobKeysGroupEquals("DefaultJobGroup")) {
                            boolean z = false;
                            Iterator<TaskFlow> it = arrayList.iterator();
                            while (true) {
                                if (it.hasNext()) {
                                    if (String.valueOf(it.next().getId()).equals(jobKey.getName())) {
                                        z = true;
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            if (!z) {
                                arrayList2.add(jobKey);
                            }
                        }
                        for (JobKey jobKey2 : arrayList2) {
                            MyDynamicScheduler.removeJob(jobKey2.getName(), jobKey2.getGroup());
                        }
                        for (TaskFlow taskFlow : arrayList) {
                            try {
                                valueOf = String.valueOf(taskFlow.getId());
                                cron = taskFlow.getCron();
                            } catch (Exception e) {
                                logger.error("Failed to scheduling task flow [" + taskFlow.getId() + "], cause: " + e.getMessage());
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.FAIL.getCode()));
                            }
                            if (cron == null || cron.length() == 0) {
                                throw new InvalidCronExpressionException("Cron expression cannot be null or empty");
                            }
                            HashMap hashMap = new HashMap();
                            if (taskFlow.getFromTaskId() != null) {
                                hashMap.put(ContextKey.FROM_TASK_ID, taskFlow.getFromTaskId());
                            }
                            if (taskFlow.getToTaskId() != null) {
                                hashMap.put(ContextKey.TO_TASK_ID, taskFlow.getToTaskId());
                            }
                            if (MyDynamicScheduler.checkExists(valueOf, "DefaultJobGroup")) {
                                MyDynamicScheduler.updateJobCron(valueOf, "DefaultJobGroup", cron, hashMap);
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.UPDATE.getCode()));
                            } else {
                                MyDynamicScheduler.addJob(valueOf, "DefaultJobGroup", cron, hashMap);
                                this.scheduleStatusEventPublisher.publishEvent(new ScheduleStatusChangeEvent(this, taskFlow.getId(), taskFlow.getCron(), TaskFlowScheduleStatusEnum.JOIN.getCode()));
                            }
                        }
                    } else {
                        logger.info("Acquire cron task flow scan lock failed");
                        MyDynamicScheduler.clear();
                    }
                } catch (Exception e2) {
                    logger.error("Scan cron task flow failed, cause: " + e2.getMessage(), e2);
                }
            }
        });
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long execute(Long l, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        return trigger(l, false, null, null, concurrentHashMap);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long execute(Long l, Long l2, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        return trigger(l, false, l2, l2, concurrentHashMap);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long executeFrom(Long l, Long l2, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        return trigger(l, false, l2, null, concurrentHashMap);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long executeTo(Long l, Long l2, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        return trigger(l, false, null, l2, concurrentHashMap);
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public long rollback(Long l, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        return trigger(l, true, null, null, concurrentHashMap);
    }

    private long trigger(Long l, Boolean bool, Long l2, Long l3, ConcurrentHashMap<String, Object> concurrentHashMap) throws TaskFlowTriggerException {
        try {
            TaskFlow taskFlow = this.taskFlowQuerier.getTaskFlow(l);
            long nextId = this.idGenerator.nextId();
            String code = bool.booleanValue() ? TaskFlowStatusEnum.WAIT_FOR_ROLLBACK.getCode() : TaskFlowStatusEnum.WAIT_FOR_EXECUTE.getCode();
            String code2 = bool.booleanValue() ? TaskFlowStatusEnum.ROLLBACK_FAILURE.getCode() : TaskFlowStatusEnum.EXECUTE_FAILURE.getCode();
            TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper();
            taskFlowContextWrapper.put(ContextKey.TASK_FLOW_ID, l);
            taskFlowContextWrapper.put(ContextKey.IS_ROLLBACK, bool);
            if (l2 != null) {
                taskFlowContextWrapper.put(ContextKey.FROM_TASK_ID, l2);
            }
            if (l3 != null) {
                taskFlowContextWrapper.put(ContextKey.TO_TASK_ID, l3);
            }
            if (concurrentHashMap != null) {
                taskFlowContextWrapper.setParams(concurrentHashMap);
            }
            taskFlowContextWrapper.put("logId", Long.valueOf(nextId));
            String taskFlowLogLock = ClusterConstants.getTaskFlowLogLock(l);
            try {
                boolean tryLock = this.clusterController.tryLock(taskFlowLogLock, this.clusterConfig.getTaskFlowLogLockWaitTime().intValue(), this.clusterConfig.getTaskFlowLogLockLeaseTime().intValue(), TimeUnit.SECONDS);
                if (!tryLock) {
                    throw new TryLockException("Acquire the task flow log lock failed!");
                }
                TaskFlowLog last = this.taskFlowLogger.last(taskFlow.getId());
                if (last != null) {
                    if (this.taskFlowLogger.isInProgress(last)) {
                        throw new TaskFlowTriggerException("Task flow is running!");
                    }
                    if ((l2 != null || l3 != null) && bool.equals(Boolean.valueOf(last.getRollback())) && last.getContext() != null && last.getContext().containsKey(ContextKey.DELIVERY_CONTEXT)) {
                        taskFlowContextWrapper.put(ContextKey.DELIVERY_CONTEXT, last.getContext().get(ContextKey.DELIVERY_CONTEXT));
                    }
                }
                TaskFlowStatus taskFlowStatus = new TaskFlowStatus();
                taskFlowStatus.setTaskFlow(taskFlow);
                taskFlowStatus.setContext(taskFlowContextWrapper.getTaskFlowContext());
                taskFlowStatus.setStatus(code);
                taskFlowStatus.setMessage(null);
                TaskFlowLog taskFlowLog = new TaskFlowLog();
                taskFlowLog.setLogId(Long.valueOf(nextId));
                taskFlowLog.setTaskFlowId(taskFlow.getId());
                taskFlowLog.setTaskFlow(taskFlowStatus.getTaskFlow());
                taskFlowLog.setContext(taskFlowStatus.getContext());
                taskFlowLog.setStatus(taskFlowStatus.getStatus());
                taskFlowLog.setMessage(taskFlowStatus.getMessage());
                taskFlowLog.setRollback(bool.booleanValue());
                this.taskFlowLogger.add(taskFlowLog);
                if (tryLock) {
                    this.clusterController.unlock(taskFlowLogLock);
                }
                this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContextWrapper.getContext(), code, null, false);
                if (!this.clusterController.execRequestOffer(new TaskFlowExecRequest(taskFlow.getId(), taskFlowContextWrapper.getContext()))) {
                    this.taskFlowStatusEventPublisher.publishEvent(taskFlow, taskFlowContextWrapper.getContext(), code2, "Failed to insert into task flow execution request queue", true);
                }
                return nextId;
            } catch (Throwable th) {
                if (0 != 0) {
                    this.clusterController.unlock(taskFlowLogLock);
                }
                throw th;
            }
        } catch (Exception e) {
            throw new TaskFlowTriggerException(e);
        }
    }

    @Override // me.kpali.wolfflow.core.scheduler.ITaskFlowScheduler
    public void stop(Long l) throws TaskFlowStopException {
        try {
            TaskFlowLog taskFlowLog = this.taskFlowLogger.get(l);
            if (taskFlowLog == null) {
                return;
            }
            String taskFlowLogLock = ClusterConstants.getTaskFlowLogLock(taskFlowLog.getTaskFlowId());
            try {
                boolean tryLock = this.clusterController.tryLock(taskFlowLogLock, this.clusterConfig.getTaskFlowLogLockWaitTime().intValue(), this.clusterConfig.getTaskFlowLogLockLeaseTime().intValue(), TimeUnit.SECONDS);
                if (!tryLock) {
                    throw new TryLockException("Acquire the task flow log lock failed!");
                }
                if (this.taskFlowLogger.isInProgress(taskFlowLog)) {
                    if (this.clusterController.isNodeAlive((Long) new TaskFlowContextWrapper(taskFlowLog.getContext()).getValue(ContextKey.EXECUTED_BY_NODE, Long.class))) {
                        if (!this.clusterController.stopRequestContains(l).booleanValue()) {
                            this.clusterController.stopRequestAdd(l);
                        }
                        taskFlowLog.setStatus(TaskFlowStatusEnum.STOPPING.getCode());
                        taskFlowLog.setMessage(null);
                    } else {
                        List<TaskLog> list = this.taskLogger.list(l);
                        if (list != null) {
                            for (TaskLog taskLog : list) {
                                if (this.taskLogger.isInProgress(taskLog)) {
                                    taskLog.setStatus(TaskStatusEnum.EXECUTE_FAILURE.getCode());
                                    taskLog.setMessage("Task execution is terminated");
                                    this.taskLogger.update(taskLog);
                                    this.taskStatusEventPublisher.publishEvent(taskLog.getTask(), taskLog.getTaskFlowId(), taskLog.getContext(), taskLog.getStatus(), taskLog.getMessage(), false);
                                }
                            }
                        }
                        taskFlowLog.setStatus(TaskFlowStatusEnum.EXECUTE_FAILURE.getCode());
                        taskFlowLog.setMessage("Task flow execution is terminated");
                    }
                    this.taskFlowLogger.update(taskFlowLog);
                    this.taskFlowStatusEventPublisher.publishEvent(taskFlowLog.getTaskFlow(), taskFlowLog.getContext(), taskFlowLog.getStatus(), taskFlowLog.getMessage(), false);
                }
                if (tryLock) {
                    this.clusterController.unlock(taskFlowLogLock);
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    this.clusterController.unlock(taskFlowLogLock);
                }
                throw th;
            }
        } catch (Exception e) {
            throw new TaskFlowStopException(e);
        }
    }
}
